Skip to main content
Glama
by wukan1986
tool.py11.7 kB
import getpass import subprocess import sys import time from pathlib import Path from typing import Optional from urllib.parse import urlparse, quote import pandas as pd from loguru import logger from playwright.async_api import async_playwright, Playwright, Page from playwright_stealth import Stealth from mcp_query_table.enums import QueryType, Site, Provider def create_detached_process(command): # 设置通用参数 kwargs = {} if sys.platform == 'win32': kwargs.update({ # 在PyCharm中运行还是会出现新建进程被关闭 'creationflags': subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP }) else: # Unix-like 系统(Linux, macOS)特定设置 kwargs.update({ 'start_new_session': True # 创建新的会话 }) logger.info(f"Popen: {command}") return subprocess.Popen(command, **kwargs) def is_local_url(url: str) -> bool: """判断url是否是本地地址""" for local in ('localhost', '127.0.0.1'): if local in url.lower(): return True return False def is_cdp_url(url: str) -> bool: """判断url是否是CDP地址""" if url.startswith('ws://') or url.startswith('wss://'): return False return True def get_executable_path(executable_path) -> Optional[str]: """获取浏览器可执行文件路径""" browsers = { "default": executable_path, "chrome.exe": r"C:\Program Files\Google\Chrome\Application\chrome.exe", "msedge.exe": r"C:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe", } for k, v in browsers.items(): if v is None: continue if Path(v).exists(): return v return None def get_user_data_dir(user_data_dir) -> Optional[str]: """获取浏览器可用户目录""" browsers = { "default": user_data_dir, "chrome.exe": rf'C:\Users\{getpass.getuser()}\AppData\Local\Google\Chrome\User Data', # 使用默认配置文件时无法创建CDP "msedge.exe": rf"C:\Users\{getpass.getuser()}\AppData\Local\Microsoft\Edge\User Data", } for k, v in browsers.items(): if v is None: continue if Path(v).exists(): return v return None class BrowserManager: async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.cleanup() def __init__(self, endpoint: Optional[str], executable_path: Optional[str] = None, devtools: bool = False, headless: bool = True, user_data_dir: Optional[str] = None): """ Parameters ---------- endpoint:str or None 浏览器CDP地址/WS地址。 如果为None,则直接启动浏览器实例。可用无头模式。建议指定用户数据目录,否则可能无法使用某些需要登录的网站 executable_path:str 浏览器可执行文件路径。推荐使用chrome,因为Microsoft Edge必须在任务管理器中完全退出才能启动调试端口 devtools:bool 是否显示开发者工具 headless:bool 是否无头模式启动浏览器 user_data_dir:str 浏览器用户数据目录。无头模式。强烈建议指定用户数据目录,否则可能无法使用某些需要登录的网站 """ if devtools: headless = False self.endpoint = endpoint self.executable_path = executable_path self.devtools = devtools self.headless = headless self.user_data_dir = user_data_dir self.playwright: Optional[Playwright] = None self.browser = None self.context = None # 空闲page池 self.pages = [] async def cleanup(self): if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop() async def _connect_to_local(self) -> None: """连接本地浏览器""" port = urlparse(self.endpoint).port executable_path = get_executable_path(self.executable_path) name = Path(executable_path).name command = [executable_path, f'--remote-debugging-port={port}', '--start-maximized'] if self.devtools: command.append('--auto-open-devtools-for-tabs') if self.user_data_dir: command.append(f'--user-data-dir={self.user_data_dir}') else: logger.warning('Chrome必须另行指定`--user-data-dir`才能创建CDP连接') for i in range(2): try: self.browser = await self.playwright.chromium.connect_over_cdp(self.endpoint, timeout=10000, slow_mo=1000) break except: if i == 0: create_detached_process(command) time.sleep(5) continue if i == 1: raise ConnectionError( f"已提前打开了浏览器,但未开启远程调试端口?请关闭浏览器全部进程后重试 `taskkill /f /im {name}`") async def _connect_to_remote(self) -> None: """连接远程浏览器""" try: if is_cdp_url(self.endpoint): self.browser = await self.playwright.chromium.connect_over_cdp(self.endpoint, timeout=10000, slow_mo=1000) else: self.browser = await self.playwright.chromium.connect(self.endpoint, timeout=10000, slow_mo=1000) except: raise ConnectionError(f"连接远程浏览器失败,请检查CDP/WS地址和端口是否正确。{self.endpoint}") async def _connect_to_launch(self) -> None: logger.info("executable_path={}", self.executable_path) if self.user_data_dir: logger.info("user_data_dir={}", self.user_data_dir) try: self.context = await self.playwright.chromium.launch_persistent_context( user_data_dir=self.user_data_dir, executable_path=self.executable_path, headless=self.headless, devtools=self.devtools, timeout=10000, slow_mo=1000) except: raise ConnectionError(f"launch失败,可能已经有浏览器已经打开了数据目录。{self.user_data_dir}") else: logger.warning("未指定浏览器用户数据目录,部分需要的网站可能无法使用") self.browser = await self.playwright.chromium.launch( executable_path=self.executable_path, headless=self.headless, devtools=self.devtools) async def _launch(self) -> None: """启动浏览器,并连接CDP协议 References ---------- https://blog.csdn.net/qq_30576521/article/details/142370538 """ self.playwright = await async_playwright().start() if self.endpoint is None: await self._connect_to_launch() elif is_local_url(self.endpoint) and is_cdp_url(self.endpoint): await self._connect_to_local() else: await self._connect_to_remote() if self.browser is None: pass elif len(self.browser.contexts) == 0: self.context = await self.browser.new_context() else: self.context = self.browser.contexts[0] # 爱问财,无头模式,需要使用 stealth 插件 await Stealth().apply_stealth_async(self.context) # 复用打开的page for page in self.context.pages: # 防止开发者工具被使用 if page.url.startswith("devtools://"): continue # 防止chrome扩展被使用 if page.url.startswith("chrome-extension://"): continue # 防止edge扩展被使用 if page.url.startswith("extension://"): continue self.pages.append(page) async def get_page(self) -> Page: """获取可用Page。无空闲标签时会打开新标签""" if self.context is None: await self._launch() # 反复取第一个tab while len(self.pages) > 0: page = self.pages.pop() if page.is_closed(): continue return page # 不够,新建一个 return await self.context.new_page() def release_page(self, page) -> None: """用完的Page释放到池中。如果用完不放回,get_page会一直打开新标签""" if page.is_closed(): return # 放回 self.pages.append(page) async def query( page: Page, query_input: str = "收盘价>100元", query_type: QueryType = QueryType.CNStock, max_page: int = 5, rename: bool = False, site: Site = Site.THS, ) -> pd.DataFrame: """查询表格 Parameters ---------- page : playwright.sync_api.Page 页面 query_input : str, optional 查询条件, by default "收盘价>100元" query_type : QueryType, optional 查询类型, by default QueryType.astock max_page : int, optional 最大页数, by default 5 rename: bool 是否重命名列名, by default False site : Site, optional 站点, by default Site.iwencai Returns ------- pd.DataFrame 查询结果 """ query_input = quote(query_input.strip(), safe='') if site == Site.EastMoney: from mcp_query_table.sites.eastmoney import query return await query(page, query_input, query_type, max_page, rename) if site == Site.THS: from mcp_query_table.sites.iwencai import query return await query(page, query_input, query_type, max_page, rename) if site == Site.TDX: from mcp_query_table.sites.tdx import query return await query(page, query_input, query_type, max_page, rename) raise ValueError(f"未支持的站点:{site}") async def chat( page: Page, prompt: str = "9.9大还是9.11大?", create: bool = False, files: list[str] | None = None, provider: Provider = Provider.Nami) -> str: """大语言对话 Parameters ---------- page : playwright.sync_api.Page 页面 prompt : str, optional 对话内容, by default "9.9大还是9.11大?" create : bool, optional 是否创建新对话, by default False files : list[str] | None, optional 上传的文件列表。不同网站支持程度不同 provider : Provider, optional 提供商, by default Provider.N Returns ------- str 对话结果 """ # 空列表转None if files is None: files = [] if provider == Provider.Nami: from mcp_query_table.providers.n import chat return await chat(page, prompt, create, files) if provider == Provider.YuanBao: from mcp_query_table.providers.yuanbao import chat return await chat(page, prompt, create, files) if provider == Provider.BaiDu: from mcp_query_table.providers.baidu import chat return await chat(page, prompt, create, files) raise ValueError(f"未支持的提供商:{provider}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wukan1986/query_table'

If you have feedback or need assistance with the MCP directory API, please join our Discord server