url_fetcher_server.py•14.6 kB
"""
URL Fetcher MCP Server
一个通过网址获取网页内容的 MCP 服务器
使用方法:
1. 安装依赖: pip install fastmcp aiohttp beautifulsoup4
2. 运行服务器: python url_fetcher_server.py
3. 在 MCP 客户端中调用 fetch_web_content 工具
"""
import asyncio
import aiohttp
from typing import Dict, Any, Optional
from urllib.parse import urlparse
from fastmcp import FastMCP, Context
from bs4 import BeautifulSoup
import re
# 创建 FastMCP 服务器实例
mcp = FastMCP("URL Fetcher 🌐")
@mcp.tool
async def fetch_web_content(url: str, ctx: Context) -> Dict[str, Any]:
"""
获取指定网址的网页内容
Args:
url (str): 要获取内容的完整网址 (必须包含 http:// 或 https://)
ctx (Context): MCP 上下文对象
Returns:
Dict[str, Any]: 包含网页内容和元数据的字典
"""
# 验证 URL 格式
if not url.startswith(('http://', 'https://')):
raise ValueError("URL 必须以 http:// 或 https:// 开头")
# 解析 URL 获取域名
parsed_url = urlparse(url)
domain = parsed_url.netloc
await ctx.info(f"开始获取网页内容: {url}")
try:
# 设置请求头,模拟浏览器访问
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
# 创建 HTTP 会话
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session:
async with session.get(url) as response:
# 检查响应状态
if response.status != 200:
raise Exception(f"HTTP 错误: {response.status} {response.reason}")
# 获取内容类型
content_type = response.headers.get('content-type', '').lower()
# 检查是否为 HTML 内容
if 'text/html' not in content_type:
# 对于非 HTML 内容,直接返回原始内容
content = await response.text()
return {
"url": url,
"domain": domain,
"title": "",
"content": content,
"content_type": content_type,
"status_code": response.status,
"content_length": len(content),
"is_html": False,
"error": None
}
# 读取 HTML 内容
html_content = await response.text()
# 使用 BeautifulSoup 解析 HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 提取页面标题
title_tag = soup.find('title')
title = title_tag.get_text().strip() if title_tag else ""
# 移除脚本和样式标签
for script in soup(["script", "style", "nav", "footer", "header"]):
script.decompose()
# 提取主要内容
# 尝试找到主要内容区域
main_content = None
# 常见的内容选择器
content_selectors = [
'main', 'article', '.content', '.main-content',
'#content', '#main', '.post-content', '.entry-content'
]
for selector in content_selectors:
element = soup.select_one(selector)
if element:
main_content = element
break
# 如果没有找到主要内容区域,使用 body
if not main_content:
main_content = soup.find('body') or soup
# 提取文本内容
text_content = main_content.get_text()
# 清理文本内容
# 移除多余的空白字符
text_content = re.sub(r'\s+', ' ', text_content)
# 移除行首行尾空白
text_content = text_content.strip()
# 提取元数据
meta_data = {}
# 提取描述
description_tag = soup.find('meta', attrs={'name': 'description'})
if description_tag:
meta_data['description'] = description_tag.get('content', '')
# 提取关键词
keywords_tag = soup.find('meta', attrs={'name': 'keywords'})
if keywords_tag:
meta_data['keywords'] = keywords_tag.get('content', '')
# 提取作者
author_tag = soup.find('meta', attrs={'name': 'author'})
if author_tag:
meta_data['author'] = author_tag.get('content', '')
# 提取语言
lang_tag = soup.find('html', attrs={'lang': True})
if lang_tag:
meta_data['language'] = lang_tag.get('lang', '')
await ctx.info(f"成功获取网页内容,长度: {len(text_content)} 字符")
return {
"url": url,
"domain": domain,
"title": title,
"content": text_content,
"content_type": content_type,
"status_code": response.status,
"content_length": len(text_content),
"is_html": True,
"meta_data": meta_data,
"error": None
}
except asyncio.TimeoutError:
error_msg = "请求超时"
await ctx.error(error_msg)
return {
"url": url,
"domain": domain,
"title": "",
"content": "",
"content_type": "",
"status_code": 0,
"content_length": 0,
"is_html": False,
"error": error_msg
}
except Exception as e:
error_msg = f"获取网页内容失败: {str(e)}"
await ctx.error(error_msg)
return {
"url": url,
"domain": domain,
"title": "",
"content": "",
"content_type": "",
"status_code": 0,
"content_length": 0,
"is_html": False,
"error": error_msg
}
@mcp.tool
async def fetch_web_content_summary(url: str, ctx: Context, max_length: int = 2000) -> Dict[str, Any]:
"""
获取网页内容的摘要版本
Args:
url (str): 要获取内容的完整网址
max_length (int): 摘要的最大长度,默认 2000 字符
ctx (Context): MCP 上下文对象
Returns:
Dict[str, Any]: 包含网页摘要和元数据的字典
"""
# 先获取完整内容
full_result = await fetch_web_content(url, ctx)
if full_result["error"]:
return full_result
# 截取内容到指定长度
content = full_result["content"]
if len(content) > max_length:
content = content[:max_length] + "...(内容已截断)"
# 创建摘要结果
summary_result = full_result.copy()
summary_result["content"] = content
summary_result["content_length"] = len(content)
summary_result["is_summary"] = True
summary_result["original_length"] = full_result["content_length"]
await ctx.info(f"生成内容摘要,原长度: {full_result['content_length']}, 摘要长度: {len(content)}")
return summary_result
@mcp.tool
async def fetch_multiple_web_content(urls: list, ctx: Context, max_concurrent: int = 5) -> Dict[str, Any]:
"""
批量获取多个网址的网页内容
Args:
urls (list): 要获取内容的网址列表
max_concurrent (int): 最大并发请求数,默认5
ctx (Context): MCP 上下文对象
Returns:
Dict[str, Any]: 包含所有网页内容和统计信息的字典
"""
if not urls:
return {
"results": [],
"total_count": 0,
"success_count": 0,
"error_count": 0,
"errors": []
}
if len(urls) > 20:
raise ValueError("单次最多支持20个网址")
await ctx.info(f"开始批量获取 {len(urls)} 个网址的内容")
# 创建信号量控制并发数
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_single_url(url):
async with semaphore:
return await fetch_web_content(url, ctx)
# 并发获取所有网址内容
tasks = [fetch_single_url(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 统计结果
success_results = []
errors = []
success_count = 0
error_count = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append({
"url": urls[i],
"error": f"处理异常: {str(result)}"
})
error_count += 1
elif result.get("error"):
errors.append({
"url": urls[i],
"error": result["error"]
})
error_count += 1
else:
success_results.append(result)
success_count += 1
await ctx.info(f"批量获取完成: 成功 {success_count} 个,失败 {error_count} 个")
return {
"results": success_results,
"errors": errors,
"total_count": len(urls),
"success_count": success_count,
"error_count": error_count
}
@mcp.tool
async def fetch_multiple_web_content_summary(urls: list, ctx: Context, max_length: int = 2000, max_concurrent: int = 5) -> Dict[str, Any]:
"""
批量获取多个网址的内容摘要
Args:
urls (list): 要获取内容的网址列表
max_length (int): 每个网址摘要的最大长度,默认2000字符
max_concurrent (int): 最大并发请求数,默认5
ctx (Context): MCP 上下文对象
Returns:
Dict[str, Any]: 包含所有网页摘要和统计信息的字典
"""
if not urls:
return {
"results": [],
"total_count": 0,
"success_count": 0,
"error_count": 0,
"errors": []
}
if len(urls) > 20:
raise ValueError("单次最多支持20个网址")
await ctx.info(f"开始批量获取 {len(urls)} 个网址的内容摘要")
# 创建信号量控制并发数
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_single_summary(url):
async with semaphore:
return await fetch_web_content_summary(url, ctx, max_length)
# 并发获取所有网址摘要
tasks = [fetch_single_summary(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 统计结果
success_results = []
errors = []
success_count = 0
error_count = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append({
"url": urls[i],
"error": f"处理异常: {str(result)}"
})
error_count += 1
elif result.get("error"):
errors.append({
"url": urls[i],
"error": result["error"]
})
error_count += 1
else:
success_results.append(result)
success_count += 1
await ctx.info(f"批量摘要获取完成: 成功 {success_count} 个,失败 {error_count} 个")
return {
"results": success_results,
"errors": errors,
"total_count": len(urls),
"success_count": success_count,
"error_count": error_count
}
@mcp.tool
async def validate_url(url: str) -> Dict[str, Any]:
"""
验证 URL 格式并返回基本信息
Args:
url (str): 要验证的网址
Returns:
Dict[str, Any]: URL 验证结果和基本信息
"""
try:
parsed = urlparse(url)
# 检查基本格式
if not all([parsed.scheme, parsed.netloc]):
return {
"url": url,
"is_valid": False,
"error": "URL 格式不正确,缺少协议或域名",
"scheme": parsed.scheme,
"domain": parsed.netloc,
"path": parsed.path,
"query": parsed.query,
"fragment": parsed.fragment
}
# 检查协议
if parsed.scheme not in ['http', 'https']:
return {
"url": url,
"is_valid": False,
"error": "仅支持 http 和 https 协议",
"scheme": parsed.scheme,
"domain": parsed.netloc,
"path": parsed.path,
"query": parsed.query,
"fragment": parsed.fragment
}
return {
"url": url,
"is_valid": True,
"error": None,
"scheme": parsed.scheme,
"domain": parsed.netloc,
"path": parsed.path,
"query": parsed.query,
"fragment": parsed.fragment,
"is_https": parsed.scheme == 'https',
"has_subdomain": len(parsed.netloc.split('.')) > 2
}
except Exception as e:
return {
"url": url,
"is_valid": False,
"error": f"URL 解析失败: {str(e)}",
"scheme": "",
"domain": "",
"path": "",
"query": "",
"fragment": ""
}
if __name__ == "__main__":
# 运行 MCP 服务器
mcp.run()