tool.py•11.7 kB
import getpass
import subprocess
import sys
import time
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse, quote
import pandas as pd
from loguru import logger
from playwright.async_api import async_playwright, Playwright, Page
from playwright_stealth import Stealth
from mcp_query_table.enums import QueryType, Site, Provider
def create_detached_process(command):
# 设置通用参数
kwargs = {}
if sys.platform == 'win32':
kwargs.update({
# 在PyCharm中运行还是会出现新建进程被关闭
'creationflags': subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP
})
else:
# Unix-like 系统(Linux, macOS)特定设置
kwargs.update({
'start_new_session': True # 创建新的会话
})
logger.info(f"Popen: {command}")
return subprocess.Popen(command, **kwargs)
def is_local_url(url: str) -> bool:
"""判断url是否是本地地址"""
for local in ('localhost', '127.0.0.1'):
if local in url.lower():
return True
return False
def is_cdp_url(url: str) -> bool:
"""判断url是否是CDP地址"""
if url.startswith('ws://') or url.startswith('wss://'):
return False
return True
def get_executable_path(executable_path) -> Optional[str]:
"""获取浏览器可执行文件路径"""
browsers = {
"default": executable_path,
"chrome.exe": r"C:\Program Files\Google\Chrome\Application\chrome.exe",
"msedge.exe": r"C:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe",
}
for k, v in browsers.items():
if v is None:
continue
if Path(v).exists():
return v
return None
def get_user_data_dir(user_data_dir) -> Optional[str]:
"""获取浏览器可用户目录"""
browsers = {
"default": user_data_dir,
"chrome.exe": rf'C:\Users\{getpass.getuser()}\AppData\Local\Google\Chrome\User Data', # 使用默认配置文件时无法创建CDP
"msedge.exe": rf"C:\Users\{getpass.getuser()}\AppData\Local\Microsoft\Edge\User Data",
}
for k, v in browsers.items():
if v is None:
continue
if Path(v).exists():
return v
return None
class BrowserManager:
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.cleanup()
def __init__(self,
endpoint: Optional[str],
executable_path: Optional[str] = None,
devtools: bool = False,
headless: bool = True,
user_data_dir: Optional[str] = None):
"""
Parameters
----------
endpoint:str or None
浏览器CDP地址/WS地址。
如果为None,则直接启动浏览器实例。可用无头模式。建议指定用户数据目录,否则可能无法使用某些需要登录的网站
executable_path:str
浏览器可执行文件路径。推荐使用chrome,因为Microsoft Edge必须在任务管理器中完全退出才能启动调试端口
devtools:bool
是否显示开发者工具
headless:bool
是否无头模式启动浏览器
user_data_dir:str
浏览器用户数据目录。无头模式。强烈建议指定用户数据目录,否则可能无法使用某些需要登录的网站
"""
if devtools:
headless = False
self.endpoint = endpoint
self.executable_path = executable_path
self.devtools = devtools
self.headless = headless
self.user_data_dir = user_data_dir
self.playwright: Optional[Playwright] = None
self.browser = None
self.context = None
# 空闲page池
self.pages = []
async def cleanup(self):
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
async def _connect_to_local(self) -> None:
"""连接本地浏览器"""
port = urlparse(self.endpoint).port
executable_path = get_executable_path(self.executable_path)
name = Path(executable_path).name
command = [executable_path, f'--remote-debugging-port={port}', '--start-maximized']
if self.devtools:
command.append('--auto-open-devtools-for-tabs')
if self.user_data_dir:
command.append(f'--user-data-dir={self.user_data_dir}')
else:
logger.warning('Chrome必须另行指定`--user-data-dir`才能创建CDP连接')
for i in range(2):
try:
self.browser = await self.playwright.chromium.connect_over_cdp(self.endpoint,
timeout=10000, slow_mo=1000)
break
except:
if i == 0:
create_detached_process(command)
time.sleep(5)
continue
if i == 1:
raise ConnectionError(
f"已提前打开了浏览器,但未开启远程调试端口?请关闭浏览器全部进程后重试 `taskkill /f /im {name}`")
async def _connect_to_remote(self) -> None:
"""连接远程浏览器"""
try:
if is_cdp_url(self.endpoint):
self.browser = await self.playwright.chromium.connect_over_cdp(self.endpoint,
timeout=10000, slow_mo=1000)
else:
self.browser = await self.playwright.chromium.connect(self.endpoint,
timeout=10000, slow_mo=1000)
except:
raise ConnectionError(f"连接远程浏览器失败,请检查CDP/WS地址和端口是否正确。{self.endpoint}")
async def _connect_to_launch(self) -> None:
logger.info("executable_path={}", self.executable_path)
if self.user_data_dir:
logger.info("user_data_dir={}", self.user_data_dir)
try:
self.context = await self.playwright.chromium.launch_persistent_context(
user_data_dir=self.user_data_dir,
executable_path=self.executable_path,
headless=self.headless,
devtools=self.devtools,
timeout=10000, slow_mo=1000)
except:
raise ConnectionError(f"launch失败,可能已经有浏览器已经打开了数据目录。{self.user_data_dir}")
else:
logger.warning("未指定浏览器用户数据目录,部分需要的网站可能无法使用")
self.browser = await self.playwright.chromium.launch(
executable_path=self.executable_path,
headless=self.headless,
devtools=self.devtools)
async def _launch(self) -> None:
"""启动浏览器,并连接CDP协议
References
----------
https://blog.csdn.net/qq_30576521/article/details/142370538
"""
self.playwright = await async_playwright().start()
if self.endpoint is None:
await self._connect_to_launch()
elif is_local_url(self.endpoint) and is_cdp_url(self.endpoint):
await self._connect_to_local()
else:
await self._connect_to_remote()
if self.browser is None:
pass
elif len(self.browser.contexts) == 0:
self.context = await self.browser.new_context()
else:
self.context = self.browser.contexts[0]
# 爱问财,无头模式,需要使用 stealth 插件
await Stealth().apply_stealth_async(self.context)
# 复用打开的page
for page in self.context.pages:
# 防止开发者工具被使用
if page.url.startswith("devtools://"):
continue
# 防止chrome扩展被使用
if page.url.startswith("chrome-extension://"):
continue
# 防止edge扩展被使用
if page.url.startswith("extension://"):
continue
self.pages.append(page)
async def get_page(self) -> Page:
"""获取可用Page。无空闲标签时会打开新标签"""
if self.context is None:
await self._launch()
# 反复取第一个tab
while len(self.pages) > 0:
page = self.pages.pop()
if page.is_closed():
continue
return page
# 不够,新建一个
return await self.context.new_page()
def release_page(self, page) -> None:
"""用完的Page释放到池中。如果用完不放回,get_page会一直打开新标签"""
if page.is_closed():
return
# 放回
self.pages.append(page)
async def query(
page: Page,
query_input: str = "收盘价>100元",
query_type: QueryType = QueryType.CNStock,
max_page: int = 5,
rename: bool = False,
site: Site = Site.THS,
) -> pd.DataFrame:
"""查询表格
Parameters
----------
page : playwright.sync_api.Page
页面
query_input : str, optional
查询条件, by default "收盘价>100元"
query_type : QueryType, optional
查询类型, by default QueryType.astock
max_page : int, optional
最大页数, by default 5
rename: bool
是否重命名列名, by default False
site : Site, optional
站点, by default Site.iwencai
Returns
-------
pd.DataFrame
查询结果
"""
query_input = quote(query_input.strip(), safe='')
if site == Site.EastMoney:
from mcp_query_table.sites.eastmoney import query
return await query(page, query_input, query_type, max_page, rename)
if site == Site.THS:
from mcp_query_table.sites.iwencai import query
return await query(page, query_input, query_type, max_page, rename)
if site == Site.TDX:
from mcp_query_table.sites.tdx import query
return await query(page, query_input, query_type, max_page, rename)
raise ValueError(f"未支持的站点:{site}")
async def chat(
page: Page,
prompt: str = "9.9大还是9.11大?",
create: bool = False,
files: list[str] | None = None,
provider: Provider = Provider.Nami) -> str:
"""大语言对话
Parameters
----------
page : playwright.sync_api.Page
页面
prompt : str, optional
对话内容, by default "9.9大还是9.11大?"
create : bool, optional
是否创建新对话, by default False
files : list[str] | None, optional
上传的文件列表。不同网站支持程度不同
provider : Provider, optional
提供商, by default Provider.N
Returns
-------
str
对话结果
"""
# 空列表转None
if files is None:
files = []
if provider == Provider.Nami:
from mcp_query_table.providers.n import chat
return await chat(page, prompt, create, files)
if provider == Provider.YuanBao:
from mcp_query_table.providers.yuanbao import chat
return await chat(page, prompt, create, files)
if provider == Provider.BaiDu:
from mcp_query_table.providers.baidu import chat
return await chat(page, prompt, create, files)
raise ValueError(f"未支持的提供商:{provider}")