Skip to main content
Glama
server.py19.2 kB
import asyncio import traceback import json from dataclasses import dataclass from typing import Any import aiohttp import httpx from tenacity import retry, stop_after_attempt, wait_exponential from fastmcp import FastMCP, Context from mcp.types import ToolAnnotations from fastmcp.exceptions import ToolError from aiohttp import ClientTimeout from markdownify import markdownify from lxml.html import defs, fromstring, tostring from lxml.html.clean import Cleaner import os from datetime import datetime from pydantic import BaseModel, Field from smithery.decorators import smithery import scrapermcp.params as params # 代理配置数据模型 class ConfigSchema(BaseModel): # 带有默认值的可选字段 - 用户可以自定义或使用默认值 default_proxy_url: str = Field("", description="默认代理URL") default_proxy_login: str = Field("", description="默认代理登录名") default_proxy_password: str = Field("", description="默认代理密码") unlocker_proxy_url: str = Field("", description="解锁器代理URL") unlocker_proxy_login: str = Field("", description="解锁器代理登录名") unlocker_proxy_password: str = Field("", description="解锁器代理密码") @smithery.server(config_schema=ConfigSchema) def create_server(): """创建并返回 FastMCP 服务器实例""" # 创建 FastMCP 服务器实例 mcp = FastMCP( name="Scrape", instructions=""" parse_with_ai_selectors方法是使用代理或者解锁器爬取并解析网页按照用户需要反馈数据,格式可选择"html", "links", "Markdown" """ ) @dataclass class ProxyConfig: """代理配置数据类,包含代理服务器的连接信息""" proxy_url: str login: str password: str class ScrapeRetryException(Exception): """网页抓取重试异常,用于在抓取失败时触发重试机制""" pass @mcp.tool(annotations=ToolAnnotations(readOnlyHint=True), enabled=True, meta={"作者": "tom", "版本": "v1.0"}) async def parse_with_ai_selectors( url: params.URL, render: params.RENDER, output_format: params.OUTPUT_FORMAT, ctx: Context ) -> str: """ 使用代理或者解锁器爬取并解析网页 参数: url: 要解析的网页 URL render: 代理配置选择器 ("Unlocker"为解锁器,其他值为普通代理) output_format: 输出格式 ("html", "links", "MarkDown") """ try: # 从会话配置获取代理配置 thor_mcp_config = ctx.session_config if render == "Unlocker": thor_mcp_myProxyConfig = ProxyConfig( proxy_url=thor_mcp_config.unlocker_proxy_url, login=thor_mcp_config.unlocker_proxy_login, password=thor_mcp_config.unlocker_proxy_password, ) else: thor_mcp_myProxyConfig = ProxyConfig( proxy_url=thor_mcp_config.default_proxy_url, login=thor_mcp_config.default_proxy_login, password=thor_mcp_config.default_proxy_password, ) # 验证代理配置参数不能为空 if not thor_mcp_myProxyConfig.proxy_url or not thor_mcp_myProxyConfig.login or not thor_mcp_myProxyConfig.password: raise ToolError(f"代理配置参数不能为空,注意:如果是解锁器和代理账号不通用") thor_mcp_html = "" thor_mcp_isCatch=False #默认不开启缓存,缓存仅用于调试使用,因为在AI上使用反馈不稳定 if thor_mcp_isCatch: # 先检查是否有当天的HTML文件 thor_mcp_today = datetime.now().strftime("%Y%m%d") thor_mcp_save_dir = "html_snapshots" os.makedirs(thor_mcp_save_dir, exist_ok=True) # 清理URL中的特殊字符并限制文件名长度 thor_mcp_clean_url = url.split("//")[-1] for char in [ "?", ",", "/", "\\", ":", "*", '"', "<", ">", "|", "%", "=", "&", "+", ";", "@", "#", "$", "^", "`", "{", "}", "[", "]", "'", ]: thor_mcp_clean_url = thor_mcp_clean_url.replace(char, "_") # 限制总文件名长度不超过200字符 thor_mcp_max_length = 200 - len(thor_mcp_today) - 1 # 减去日期和分隔符长度 thor_mcp_htmlName = f"{thor_mcp_today}_{thor_mcp_clean_url[:thor_mcp_max_length]}" thor_mcp_filename = f"{thor_mcp_save_dir}/{thor_mcp_htmlName}.html" if os.path.exists(thor_mcp_filename): try: with open(thor_mcp_filename, "r", encoding="utf-8") as f: thor_mcp_html = f.read() print(f"从本地缓存读取HTML: {thor_mcp_filename}") except IOError as e: raise ToolError(f"读取缓存文件失败") else: thor_mcp_html = await scrape(url, thor_mcp_myProxyConfig) if not thor_mcp_html: raise ToolError(f"网页抓取失败,无法获取内容") try: with open(thor_mcp_filename, "w", encoding="utf-8") as f: f.write(thor_mcp_html) print(f"HTML已保存到 {thor_mcp_filename}") except IOError as e: raise ToolError(f"保存HTML文件失败") else: # 不开启缓存时也保存HTML,但使用不同目录和秒级时间戳文件名 thor_mcp_now = datetime.now().strftime("%Y%m%d%H%M%S") thor_mcp_save_dir = "html_temp" os.makedirs(thor_mcp_save_dir, exist_ok=True) # 清理URL中的特殊字符并限制文件名长度 thor_mcp_clean_url = url.split("//")[-1] for char in [ "?", ",", "/", "\\", ":", "*", '"', "<", ">", "|", "%", "=", "&", "+", ";", "@", "#", "$", "^", "`", "{", "}", "[", "]", "'", ]: thor_mcp_clean_url = thor_mcp_clean_url.replace(char, "_") # 限制总文件名长度不超过200字符 thor_mcp_max_length = 200 - len(thor_mcp_now) - 1 # 减去时间戳和分隔符长度 thor_mcp_htmlName = f"{thor_mcp_now}_{thor_mcp_clean_url[:thor_mcp_max_length]}" thor_mcp_filename = f"{thor_mcp_save_dir}/{thor_mcp_htmlName}.html" thor_mcp_html = await scrape(url, thor_mcp_myProxyConfig) if not thor_mcp_html: raise ToolError(f"网页抓取失败,无法获取内容") try: with open(thor_mcp_filename, "w", encoding="utf-8") as f: f.write(thor_mcp_html) print(f"HTML已临时保存到 {thor_mcp_filename}") except IOError as e: raise ToolError(f"保存临时HTML文件失败") # 处理内容并返回结果 try: thor_mcp_result = get_content(thor_mcp_html, output_format) if not thor_mcp_result: raise ToolError(f"内容处理失败: 无法将内容转换为 {output_format} 格式") return thor_mcp_result except Exception as e: raise ToolError(f"内容处理过程中发生错误") except Exception as e: # 捕获其他未预期的异常 raise ToolError(f"解析网页时发生未预期错误") @retry( # 当发生异常时重新抛出异常 reraise=True, # 最多尝试 3 次 stop=stop_after_attempt(3), # 指数退避算法,乘数为 1,最小等待时间为 4 秒,最大等待时间为 10 秒 wait=wait_exponential(multiplier=1, min=4, max= 5), ) async def scrape_with_retry(url: str, myProxyConfig: ProxyConfig) -> str: """ 带有重试机制的网页抓取方法,会记录每次重试的详细信息 参数: url: 要抓取的URL地址 myProxyConfig: 代理配置对象 返回: 成功时返回网页内容文本,失败时抛出ScrapeRetryException 异常: ScrapeRetryException: 当请求失败时抛出 """ # 从代理配置对象获取代理URL proxy = myProxyConfig.proxy_url # 创建代理认证对象,使用代理配置中的登录名和密码 proxy_auth = aiohttp.BasicAuth( login=myProxyConfig.login, password=myProxyConfig.password, ) # 设置请求头,指定渲染类型为HTML,等待时间为10秒 headers = {"X-Render-Type": "html", "X-Wait-Second": "10"} # 设置客户端超时时间为120秒 timeout = ClientTimeout(total=120) # 创建异步HTTP客户端会话 async with aiohttp.ClientSession( headers=headers, # 使用上面定义的请求头 timeout=timeout, # 使用上面定义的超时时间 connector=aiohttp.TCPConnector(), # 使用TCP连接器 max_field_size=32768, # 设置最大字段大小为32KB ) as session: try: # 使用会话发起GET请求 async with session.get( url, # 目标URL proxy=proxy, # 使用代理 proxy_auth=proxy_auth, # 使用代理认证 ssl=False, # 禁用SSL验证 ) as response: # 检查响应状态码是否为200(成功) if response.status == 200: # 返回响应文本内容 return await response.text() else: # 构造错误消息,包含状态码和URL error_msg = f"状态码: {response.status}, URL: {url}" # 抛出重试异常 raise ScrapeRetryException(error_msg) except aiohttp.ClientError as e: error_msg = f"HTTP客户端错误" raise ScrapeRetryException(error_msg) except asyncio.TimeoutError: error_msg = ( f"请求超时: 60秒" ) raise ScrapeRetryException(error_msg) except Exception as e: error_msg = f"未知错误:" raise ScrapeRetryException(error_msg) async def scrape(url: str, myProxyConfig: ProxyConfig) -> str: """ 网页抓取方法 参数: url: 要抓取的URL地址 myProxyConfig: 代理配置对象 返回: 成功时返回网页内容文本,失败时返回空字符串 """ try: result = await scrape_with_retry(url, myProxyConfig) return result except ScrapeRetryException: return "" def clean_html(html: str) -> str: """清理HTML字符串""" cleaner = Cleaner( scripts=True, kill_tags=["nav", "svg", "footer", "noscript", "script", "form"], style=True, remove_tags=[], safe_attrs=list(defs.safe_attrs) + ["idx"], inline_style=True, links=True, meta=False, embedded=True, frames=False, forms=False, annoying_tags=False, page_structure=False, javascript=True, comments=True, ) return cleaner.clean_html(html) def strip_html(thor_mcp_html: str) -> str: """简化HTML字符串,移除不需要的元素、属性和冗余内容""" # 导入正则表达式模块(仅在函数作用域内) import re # 调用clean_html函数进行初步清理(假设该函数已在外部定义) thor_mcp_cleaned_html = clean_html(thor_mcp_html) # 将清理后的HTML字符串解析为XML树结构 thor_mcp_html_tree = fromstring(thor_mcp_cleaned_html) # 遍历HTML树中的所有元素(包括嵌套的子孙元素) for thor_mcp_element in thor_mcp_html_tree.iter(): # 移除所有元素的style属性(内联样式) if "style" in thor_mcp_element.attrib: del thor_mcp_element.attrib["style"] # 使用del语句删除元素属性 # 检查并移除空元素或无用元素的复合条件: # 1. 没有属性 或 只有一个idx属性(特殊属性,可能用于临时标记) # 2. 没有子元素 # 3. 没有文本内容(或仅空白文本) # 4. 没有尾部文本(或仅空白文本) if ( ( not thor_mcp_element.attrib # 无任何属性 or (len(thor_mcp_element.attrib) == 1 and "idx" in thor_mcp_element.attrib) # 或仅含idx属性 ) and not thor_mcp_element.getchildren() # 没有子元素 and (not thor_mcp_element.text or not thor_mcp_element.text.strip()) # 无文本或空白文本 and (not thor_mcp_element.tail or not thor_mcp_element.tail.strip()) # 无尾部文本或空白尾部 ): # 获取父元素(可能为None,如果是根元素) thor_mcp_parent = thor_mcp_element.getparent() # 只有在存在父元素的情况下才执行移除 if thor_mcp_parent is not None: # 从父元素的树结构中移除当前元素 thor_mcp_parent.remove(thor_mcp_element) # 将处理后的XML树转换回HTML字符串 return tostring(thor_mcp_html_tree, encoding='unicode') # 移除class或id中包含"footer"或"hidden"的元素 thor_mcp_xpath_query = ( ".//*[contains(@class, 'footer') or contains(@id, 'footer') or " "contains(@class, 'hidden') or contains(@id, 'hidden')]" ) # 使用XPath查询找到所有需要移除的元素 thor_mcp_elements_to_remove = thor_mcp_html_tree.xpath(thor_mcp_xpath_query) # 遍历所有需要移除的元素 for thor_mcp_element in thor_mcp_elements_to_remove: # 获取当前元素的父元素 thor_mcp_parent = thor_mcp_element.getparent() # 只有在父元素存在的情况下才执行移除操作 if thor_mcp_parent is not None: # 从父元素中移除当前元素 thor_mcp_parent.remove(thor_mcp_element) # 将HTML树重新序列化为字符串 thor_mcp_stripped_html = tostring(thor_mcp_html_tree, encoding="unicode") # 替换多个空格为单个空格 thor_mcp_stripped_html = re.sub(r"\s{2,}", " ", thor_mcp_stripped_html) # 替换连续换行符为空字符串 thor_mcp_stripped_html = re.sub(r"\n{2,}", "", thor_mcp_stripped_html) return thor_mcp_stripped_html def extract_links_with_text(thor_mcp_html: str, thor_mcp_base_url: str | None = None) -> list[str]: """ 从HTML中提取带有显示文本的链接 参数: thor_mcp_html (str): 输入的HTML字符串 thor_mcp_base_url (str | None): 用于将相对URL转换为绝对URL的基础URL 如果为None,相对URL将保持不变 返回: list[str]: 格式为[显示文本] URL的链接列表 """ # 使用lxml的fromstring函数将HTML字符串解析为XML树结构 thor_mcp_html_tree = fromstring(thor_mcp_html) # 初始化空列表,用于存储格式化后的链接 thor_mcp_links = [] # 遍历所有包含href属性的<a>标签(XPath选择器) for thor_mcp_link in thor_mcp_html_tree.xpath("//a[@href]"): # 获取href属性的值(链接目标地址) thor_mcp_href = thor_mcp_link.get("href") # 获取标签内所有文本内容(包括子标签文本),并移除首尾空白 thor_mcp_text = thor_mcp_link.text_content().strip() # 仅当href和text都存在时处理(过滤空链接或空文本) if thor_mcp_href and thor_mcp_text: # 跳过空文本或纯空白文本(虽已strip(),但防止出现特殊空白字符) if not thor_mcp_text: continue # 跳过页面内锚点链接(以#开头) if thor_mcp_href.startswith("#"): continue # 跳过JavaScript伪链接 if thor_mcp_href.startswith("javascript:"): continue # 当提供了base_url且是相对路径(以/开头)时转换URL if thor_mcp_base_url and thor_mcp_href.startswith("/"): # 移除base_url末尾的斜杠,避免双斜杠问题 thor_mcp_base = thor_mcp_base_url.rstrip("/") # 拼接成绝对URL thor_mcp_href = f"{thor_mcp_base}{thor_mcp_href}" # 将格式化后的链接加入结果列表:[文本] URL thor_mcp_links.append(f"[{thor_mcp_text}] {thor_mcp_href}") # 返回所有符合条件的链接列表 return thor_mcp_links def get_content(thor_mcp_content: str, thor_mcp_output_format: str) -> str: """ 从响应内容中提取内容并转换为适当的格式 参数: thor_mcp_content: 响应内容字符串 thor_mcp_output_format: 输出格式 ("html", "links", 或其他格式转换为markdown) 返回: 格式化后的内容字符串 """ if thor_mcp_output_format == "html": return thor_mcp_content if thor_mcp_output_format == "links": thor_mcp_links = extract_links_with_text(thor_mcp_content) return "\n".join(thor_mcp_links) thor_mcp_stripped_html = strip_html(thor_mcp_content) # 精简 HTML 内容 return markdownify(thor_mcp_stripped_html) # 对于其他格式,返回原始内容字符串 return mcp

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/xja1023789-collab/ScraperMcpCeshi'

If you have feedback or need assistance with the MCP directory API, please join our Discord server