Skip to main content
Glama
fengin

Search MCP Server

by fengin
client.py16.9 kB
""" 秘塔AI客户端 作者: 凌封(微信:fengin) 网站: AI全书(https://aibook.ren) """ from typing import AsyncGenerator, Optional, Dict, Any, List from playwright.async_api import async_playwright, Page, CDPSession, Browser, BrowserContext import asyncio import json import time import re from .constants import * from .exceptions import * from .response_handler import MetasoResponseHandler class MetasoClient: """秘塔AI客户端""" def __init__(self, uid: str, sid: str, browser_data_dir: str = "tmp/browser"): """初始化客户端 Args: uid: 用户ID sid: 会话ID browser_data_dir: 浏览器数据目录,默认为 "tmp/browser" """ self._uid = uid self._sid = sid self._browser_data_dir = browser_data_dir self._browser: Optional[Browser] = None self._context: Optional[BrowserContext] = None self._page: Optional[Page] = None self._client: Optional[CDPSession] = None self._meta_token: Optional[str] = None self._response_handler = MetasoResponseHandler() async def __aenter__(self): """异步上下文管理器入口""" await self._init_browser() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" await self.close() async def close(self): """关闭客户端""" try: if self._client: await self._client.detach() self._client = None if self._page: await self._page.close() self._page = None if self._context: await self._context.close() self._context = None if self._browser: await self._browser.close() self._browser = None except Exception as e: print(f"Error during cleanup: {e}") async def _init_browser(self): """初始化浏览器""" playwright = await async_playwright().start() self._context = await playwright.chromium.launch_persistent_context( self._browser_data_dir, headless=True, ignore_https_errors=True, user_agent=FAKE_HEADERS["User-Agent"], viewport=None, args=[ "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-extensions", "--hide-scrollbars", "--mute-audio", "--disable-gpu", "--disable-web-security", "--process-per-tab" ] ) self._page = await self._context.new_page() # 隐藏webdriver特征 await self._page.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => false, }); window.navigator.chrome = { runtime: {} }; delete navigator.__proto__.webdriver; """) # 设置User-Agent await self._page.set_extra_http_headers(FAKE_HEADERS) # 设置cookies await self._context.add_cookies([ { "name": "uid", "value": self._uid, "url": "https://metaso.cn" }, { "name": "sid", "value": self._sid, "url": "https://metaso.cn" } ]) # 创建CDP会话 self._client = await self._page.context.new_cdp_session(self._page) await self._client.send("Fetch.enable", { "patterns": [{ "urlPattern": "https://metaso.cn/api/searchV2*", "requestStage": "Response" }] }) # 获取初始meta token self._meta_token = await self._get_meta_token() async def _get_meta_token(self) -> str: """获取meta token Returns: str: meta token """ if self._meta_token: return self._meta_token # 导航到首页 await self._page.goto(BASE_URL) # 等待meta-token元素加载,不要求可见 meta_token_element = await self._page.wait_for_selector('meta#meta-token', state='attached') if not meta_token_element: raise MetasoException(*API_REQUEST_FAILED) # 获取content属性 meta_token = await meta_token_element.get_attribute('content') if not meta_token: raise MetasoException(*API_REQUEST_FAILED) return meta_token async def _create_conversation(self, content: str, model: str = DEFAULT_MODEL) -> str: """创建会话 Args: content: 对话内容 model: 模型名称,默认为detail Returns: str: 会话ID """ headers = { **FAKE_HEADERS, 'Content-Type': 'application/json', 'Token': self._meta_token, 'Is-Mini-Webview': '0', 'Cookie': f'uid={self._uid}; sid={self._sid}', 'Origin': 'https://metaso.cn', 'Referer': 'https://metaso.cn/', 'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Dest': 'empty' } data = { "question": content, "mode": model, "engineType": "", "scholarSearchDomain": "all" } # 发送请求 js_code = """ async (args) => { try { console.log('Sending request with args:', JSON.stringify(args)); const response = await fetch(args.url, { method: 'POST', headers: args.headers, body: JSON.stringify(args.data), credentials: 'include' }); console.log('Response status:', response.status); if (!response.ok) { console.error('Response not ok:', response.status); return null; } const result = await response.json(); console.log('Response:', JSON.stringify(result)); if (result.errCode) { console.error('Error code:', result.errCode, result.errMsg); return null; } return result; } catch (error) { console.error('Error:', error); return null; } } """ args = { "url": API_SESSION_URL, "headers": headers, "data": data } response = await self._page.evaluate(js_code, args) # 检查响应 if not response: raise MetasoException(*API_REQUEST_FAILED) if 'data' not in response or 'id' not in response['data']: print(f"Response missing id: {response}") raise MetasoException(*API_REQUEST_FAILED) return response['data']['id'] async def get_completion(self, content: str, model: str = DEFAULT_MODEL) -> Dict: """非流式对话 Args: content: 对话内容 model: 使用的模型名称 Returns: Dict: 包含处理后的完整响应 """ try: # 创建新的响应处理器实例 self._response_handler = MetasoResponseHandler() # 创建会话 conv_id = await self._create_conversation(content, model) # 构造搜索URL search_url = f"{BASE_URL}/search/{conv_id}?q={content}" # 创建队列和事件 queue = asyncio.Queue() response_event = asyncio.Event() # 定义元信息 meta_info = { "model": model, "conversation_id": conv_id, "query": content, "timestamp": int(time.time()) } async def handle_response(event): try: request_id = event["requestId"] # 获取响应流 stream_result = await self._client.send("Fetch.takeResponseBodyAsStream", {"requestId": request_id}) stream_handle = stream_result["stream"] if not stream_handle: return try: buffer = "" while True: read_result = await self._client.send("IO.read", {"handle": stream_handle, "size": 256}) if not read_result: break data = read_result.get("data") eof = read_result.get("eof") if data: buffer += data # 按data:分割处理 parts = buffer.split("data:") # 处理完整的部分 for part in parts[:-1]: if part.strip(): cleaned = self._response_handler.clean_response(f"data:{part}") if cleaned: await queue.put(cleaned) # 保留最后一个可能不完整的部分 buffer = "data:" + parts[-1] if parts else "" if eof: # 处理剩余数据 if buffer.strip(): cleaned = self._response_handler.clean_response(buffer) if cleaned: await queue.put(cleaned) break except Exception as e: print(f"Error reading stream: {e}") finally: await self._client.send("IO.close", {"handle": stream_handle}) response_event.set() except Exception as e: print(f"Error processing response: {e}") response_event.set() # 监听响应 self._client.on("Fetch.requestPaused", handle_response) try: # 导航到搜索页面 await self._page.goto(search_url) # 等待响应完成 await asyncio.wait_for(response_event.wait(), timeout=30) finally: # 移除监听器 self._client.remove_listener("Fetch.requestPaused", handle_response) # 构建返回结果 result = { "content": self._response_handler.content, "references": self._response_handler.references, "images": self._response_handler.images, "tables": self._response_handler.tables, "recommended_questions": self._response_handler.recommended_questions, "highlights": self._response_handler.highlights, "query_info": self._response_handler.query_info, "markdown": self._response_handler.format_markdown({}), "meta": meta_info } return result except Exception as e: print(f"Error in get_completion: {e}") raise MetasoException(*API_REQUEST_FAILED) async def get_completion_stream(self, content: str, model: str = DEFAULT_MODEL) -> AsyncGenerator[str, None]: """获取流式补全 Args: content: 对话内容 model: 模型名称,默认为detail Yields: str: 清理后的补全内容片段 """ try: # 创建会话 conv_id = await self._create_conversation(content, model) # 构造搜索URL search_url = f"{BASE_URL}/search/{conv_id}?q={content}" # 创建队列用于存储响应片段 queue = asyncio.Queue() response_event = asyncio.Event() async def handle_response(event): try: request_id = event["requestId"] # 获取响应流 stream_result = await self._client.send("Fetch.takeResponseBodyAsStream", {"requestId": request_id}) stream_handle = stream_result["stream"] if not stream_handle: return try: buffer = "" while True: read_result = await self._client.send("IO.read", {"handle": stream_handle, "size": 256}) if not read_result: break data = read_result.get("data") eof = read_result.get("eof") if data: buffer += data # 按data:分割处理 parts = buffer.split("data:") # 处理完整的部分 for part in parts[:-1]: if part.strip(): cleaned = self._response_handler.clean_response(f"data:{part}") if cleaned: await queue.put(cleaned) # 保留最后一个可能不完整的部分 buffer = "data:" + parts[-1] if parts else "" if eof: # 处理剩余数据 if buffer.strip(): cleaned = self._response_handler.clean_response(buffer) if cleaned: await queue.put(cleaned) break except Exception as e: print(f"Error reading stream: {e}") finally: # 关闭流 await self._client.send("IO.close", {"handle": stream_handle}) response_event.set() except Exception as e: print(f"Error processing response: {e}") response_event.set() # 监听响应 self._client.on("Fetch.requestPaused", handle_response) try: # 导航到搜索页面 await self._page.goto(search_url) # 持续获取响应片段 while True: try: chunk = await asyncio.wait_for(queue.get(), timeout=1.0) if chunk: # 只返回非空内容 yield chunk except asyncio.TimeoutError: # 检查是否完成 if response_event.is_set(): break finally: # 移除监听器 self._client.remove_listener("Fetch.requestPaused", handle_response) except Exception as e: print(f"Error in get_completion_stream: {e}") raise MetasoException(*API_REQUEST_FAILED)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fengin/search-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server