Skip to main content
Glama

URL Fetcher MCP Server

by lucoo01
url_fetcher_server.py14.6 kB
""" URL Fetcher MCP Server 一个通过网址获取网页内容的 MCP 服务器 使用方法: 1. 安装依赖: pip install fastmcp aiohttp beautifulsoup4 2. 运行服务器: python url_fetcher_server.py 3. 在 MCP 客户端中调用 fetch_web_content 工具 """ import asyncio import aiohttp from typing import Dict, Any, Optional from urllib.parse import urlparse from fastmcp import FastMCP, Context from bs4 import BeautifulSoup import re # 创建 FastMCP 服务器实例 mcp = FastMCP("URL Fetcher 🌐") @mcp.tool async def fetch_web_content(url: str, ctx: Context) -> Dict[str, Any]: """ 获取指定网址的网页内容 Args: url (str): 要获取内容的完整网址 (必须包含 http:// 或 https://) ctx (Context): MCP 上下文对象 Returns: Dict[str, Any]: 包含网页内容和元数据的字典 """ # 验证 URL 格式 if not url.startswith(('http://', 'https://')): raise ValueError("URL 必须以 http:// 或 https:// 开头") # 解析 URL 获取域名 parsed_url = urlparse(url) domain = parsed_url.netloc await ctx.info(f"开始获取网页内容: {url}") try: # 设置请求头,模拟浏览器访问 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } # 创建 HTTP 会话 timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session: async with session.get(url) as response: # 检查响应状态 if response.status != 200: raise Exception(f"HTTP 错误: {response.status} {response.reason}") # 获取内容类型 content_type = response.headers.get('content-type', '').lower() # 检查是否为 HTML 内容 if 'text/html' not in content_type: # 对于非 HTML 内容,直接返回原始内容 content = await response.text() return { "url": url, "domain": domain, "title": "", "content": content, "content_type": content_type, "status_code": response.status, "content_length": len(content), "is_html": False, "error": None } # 读取 HTML 内容 html_content = await response.text() # 使用 BeautifulSoup 解析 HTML soup = BeautifulSoup(html_content, 'html.parser') # 提取页面标题 title_tag = soup.find('title') title = title_tag.get_text().strip() if title_tag else "" # 移除脚本和样式标签 for script in soup(["script", "style", "nav", "footer", "header"]): script.decompose() # 提取主要内容 # 尝试找到主要内容区域 main_content = None # 常见的内容选择器 content_selectors = [ 'main', 'article', '.content', '.main-content', '#content', '#main', '.post-content', '.entry-content' ] for selector in content_selectors: element = soup.select_one(selector) if element: main_content = element break # 如果没有找到主要内容区域,使用 body if not main_content: main_content = soup.find('body') or soup # 提取文本内容 text_content = main_content.get_text() # 清理文本内容 # 移除多余的空白字符 text_content = re.sub(r'\s+', ' ', text_content) # 移除行首行尾空白 text_content = text_content.strip() # 提取元数据 meta_data = {} # 提取描述 description_tag = soup.find('meta', attrs={'name': 'description'}) if description_tag: meta_data['description'] = description_tag.get('content', '') # 提取关键词 keywords_tag = soup.find('meta', attrs={'name': 'keywords'}) if keywords_tag: meta_data['keywords'] = keywords_tag.get('content', '') # 提取作者 author_tag = soup.find('meta', attrs={'name': 'author'}) if author_tag: meta_data['author'] = author_tag.get('content', '') # 提取语言 lang_tag = soup.find('html', attrs={'lang': True}) if lang_tag: meta_data['language'] = lang_tag.get('lang', '') await ctx.info(f"成功获取网页内容,长度: {len(text_content)} 字符") return { "url": url, "domain": domain, "title": title, "content": text_content, "content_type": content_type, "status_code": response.status, "content_length": len(text_content), "is_html": True, "meta_data": meta_data, "error": None } except asyncio.TimeoutError: error_msg = "请求超时" await ctx.error(error_msg) return { "url": url, "domain": domain, "title": "", "content": "", "content_type": "", "status_code": 0, "content_length": 0, "is_html": False, "error": error_msg } except Exception as e: error_msg = f"获取网页内容失败: {str(e)}" await ctx.error(error_msg) return { "url": url, "domain": domain, "title": "", "content": "", "content_type": "", "status_code": 0, "content_length": 0, "is_html": False, "error": error_msg } @mcp.tool async def fetch_web_content_summary(url: str, ctx: Context, max_length: int = 2000) -> Dict[str, Any]: """ 获取网页内容的摘要版本 Args: url (str): 要获取内容的完整网址 max_length (int): 摘要的最大长度,默认 2000 字符 ctx (Context): MCP 上下文对象 Returns: Dict[str, Any]: 包含网页摘要和元数据的字典 """ # 先获取完整内容 full_result = await fetch_web_content(url, ctx) if full_result["error"]: return full_result # 截取内容到指定长度 content = full_result["content"] if len(content) > max_length: content = content[:max_length] + "...(内容已截断)" # 创建摘要结果 summary_result = full_result.copy() summary_result["content"] = content summary_result["content_length"] = len(content) summary_result["is_summary"] = True summary_result["original_length"] = full_result["content_length"] await ctx.info(f"生成内容摘要,原长度: {full_result['content_length']}, 摘要长度: {len(content)}") return summary_result @mcp.tool async def fetch_multiple_web_content(urls: list, ctx: Context, max_concurrent: int = 5) -> Dict[str, Any]: """ 批量获取多个网址的网页内容 Args: urls (list): 要获取内容的网址列表 max_concurrent (int): 最大并发请求数,默认5 ctx (Context): MCP 上下文对象 Returns: Dict[str, Any]: 包含所有网页内容和统计信息的字典 """ if not urls: return { "results": [], "total_count": 0, "success_count": 0, "error_count": 0, "errors": [] } if len(urls) > 20: raise ValueError("单次最多支持20个网址") await ctx.info(f"开始批量获取 {len(urls)} 个网址的内容") # 创建信号量控制并发数 semaphore = asyncio.Semaphore(max_concurrent) async def fetch_single_url(url): async with semaphore: return await fetch_web_content(url, ctx) # 并发获取所有网址内容 tasks = [fetch_single_url(url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True) # 统计结果 success_results = [] errors = [] success_count = 0 error_count = 0 for i, result in enumerate(results): if isinstance(result, Exception): errors.append({ "url": urls[i], "error": f"处理异常: {str(result)}" }) error_count += 1 elif result.get("error"): errors.append({ "url": urls[i], "error": result["error"] }) error_count += 1 else: success_results.append(result) success_count += 1 await ctx.info(f"批量获取完成: 成功 {success_count} 个,失败 {error_count} 个") return { "results": success_results, "errors": errors, "total_count": len(urls), "success_count": success_count, "error_count": error_count } @mcp.tool async def fetch_multiple_web_content_summary(urls: list, ctx: Context, max_length: int = 2000, max_concurrent: int = 5) -> Dict[str, Any]: """ 批量获取多个网址的内容摘要 Args: urls (list): 要获取内容的网址列表 max_length (int): 每个网址摘要的最大长度,默认2000字符 max_concurrent (int): 最大并发请求数,默认5 ctx (Context): MCP 上下文对象 Returns: Dict[str, Any]: 包含所有网页摘要和统计信息的字典 """ if not urls: return { "results": [], "total_count": 0, "success_count": 0, "error_count": 0, "errors": [] } if len(urls) > 20: raise ValueError("单次最多支持20个网址") await ctx.info(f"开始批量获取 {len(urls)} 个网址的内容摘要") # 创建信号量控制并发数 semaphore = asyncio.Semaphore(max_concurrent) async def fetch_single_summary(url): async with semaphore: return await fetch_web_content_summary(url, ctx, max_length) # 并发获取所有网址摘要 tasks = [fetch_single_summary(url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True) # 统计结果 success_results = [] errors = [] success_count = 0 error_count = 0 for i, result in enumerate(results): if isinstance(result, Exception): errors.append({ "url": urls[i], "error": f"处理异常: {str(result)}" }) error_count += 1 elif result.get("error"): errors.append({ "url": urls[i], "error": result["error"] }) error_count += 1 else: success_results.append(result) success_count += 1 await ctx.info(f"批量摘要获取完成: 成功 {success_count} 个,失败 {error_count} 个") return { "results": success_results, "errors": errors, "total_count": len(urls), "success_count": success_count, "error_count": error_count } @mcp.tool async def validate_url(url: str) -> Dict[str, Any]: """ 验证 URL 格式并返回基本信息 Args: url (str): 要验证的网址 Returns: Dict[str, Any]: URL 验证结果和基本信息 """ try: parsed = urlparse(url) # 检查基本格式 if not all([parsed.scheme, parsed.netloc]): return { "url": url, "is_valid": False, "error": "URL 格式不正确,缺少协议或域名", "scheme": parsed.scheme, "domain": parsed.netloc, "path": parsed.path, "query": parsed.query, "fragment": parsed.fragment } # 检查协议 if parsed.scheme not in ['http', 'https']: return { "url": url, "is_valid": False, "error": "仅支持 http 和 https 协议", "scheme": parsed.scheme, "domain": parsed.netloc, "path": parsed.path, "query": parsed.query, "fragment": parsed.fragment } return { "url": url, "is_valid": True, "error": None, "scheme": parsed.scheme, "domain": parsed.netloc, "path": parsed.path, "query": parsed.query, "fragment": parsed.fragment, "is_https": parsed.scheme == 'https', "has_subdomain": len(parsed.netloc.split('.')) > 2 } except Exception as e: return { "url": url, "is_valid": False, "error": f"URL 解析失败: {str(e)}", "scheme": "", "domain": "", "path": "", "query": "", "fragment": "" } if __name__ == "__main__": # 运行 MCP 服务器 mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/lucoo01/url-fetcher'

If you have feedback or need assistance with the MCP directory API, please join our Discord server