basic_main.py•8.91 kB
#!/usr/bin/env python3
"""
微信公众号文章MCP服务器 - 最简版本
"""
import logging
from typing import Any, Dict, List, Optional
import bs4
import requests
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import TextContent
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# 创建服务器实例
server = Server("wechat-article-server")
# 全局会话存储
sessions: Dict[str, requests.Session] = {}
@server.list_tools()
async def list_tools() -> List[Dict[str, Any]]:
"""列出所有可用工具"""
return [
{
"name": "create_session",
"description": "创建一个新的会话",
"inputSchema": {
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "会话ID"
}
},
"required": ["session_id"]
}
},
{
"name": "list_sessions",
"description": "列出所有活跃的会话",
"inputSchema": {
"type": "object",
"properties": {}
}
},
{
"name": "get_article_content",
"description": "获取微信公众号文章内容",
"inputSchema": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "微信公众号文章URL"
},
"session_id": {
"type": "string",
"description": "可选的会话ID,用于保持请求状态"
}
},
"required": ["url"]
}
},
{
"name": "extract_article_links",
"description": "从微信公众号页面提取文章链接",
"inputSchema": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "微信公众号页面URL"
},
"session_id": {
"type": "string",
"description": "可选的会话ID,用于保持请求状态"
}
},
"required": ["url"]
}
},
{
"name": "close_session",
"description": "关闭指定的会话",
"inputSchema": {
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "要关闭的会话ID"
}
},
"required": ["session_id"]
}
}
]
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""调用工具"""
try:
if name == "create_session":
session_id = arguments.get("session_id")
if session_id in sessions:
result = f"会话 {session_id} 已存在"
else:
sessions[session_id] = requests.Session()
logger.info(f"创建新会话: {session_id}")
result = f"成功创建会话: {session_id}"
return [TextContent(type="text", text=result)]
elif name == "list_sessions":
session_list = list(sessions.keys())
return [TextContent(type="text", text=str(session_list))]
elif name == "get_article_content":
url = arguments.get("url")
session_id = arguments.get("session_id")
session = sessions.get(session_id) if session_id else requests
try:
response = session.get(url, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
})
response.raise_for_status()
soup = bs4.BeautifulSoup(response.text, 'html.parser')
# 提取文章标题
title_elem = soup.find('h1', class_='rich_media_title')
title = title_elem.get_text().strip() if title_elem else "未找到标题"
# 提取文章内容
content_elem = soup.find('div', class_='rich_media_content')
if not content_elem:
content_elem = soup.find('div', id='js_content')
content = content_elem.get_text().strip() if content_elem else "未找到内容"
# 提取作者信息
author_elem = soup.find('a', class_='rich_media_meta_link')
author = author_elem.get_text().strip() if author_elem else "未知作者"
# 提取发布时间
time_elem = soup.find('em', class_='rich_media_meta_text')
publish_time = time_elem.get_text().strip() if time_elem else "未知时间"
result = f"标题: {title}\n"
result += f"作者: {author}\n"
result += f"发布时间: {publish_time}\n"
result += f"URL: {url}\n\n"
result += "内容:\n"
result += content
return [TextContent(type="text", text=result)]
except Exception as e:
logger.error(f"获取文章内容失败: {e}")
return [TextContent(type="text", text=f"获取文章内容失败: {str(e)}")]
elif name == "extract_article_links":
url = arguments.get("url")
session_id = arguments.get("session_id")
session = sessions.get(session_id) if session_id else requests
try:
response = session.get(url, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
})
response.raise_for_status()
soup = bs4.BeautifulSoup(response.text, 'html.parser')
links = []
# 查找所有文章链接
for link in soup.find_all('a', href=True):
href = link['href']
if 'mp.weixin.qq.com' in href and 's' in href:
if not href.startswith('http'):
href = f"https://mp.weixin.qq.com{href}"
links.append(href)
return [TextContent(type="text", text=str(links))]
except Exception as e:
logger.error(f"提取文章链接失败: {e}")
return [TextContent(type="text", text=f"提取文章链接失败: {str(e)}")]
elif name == "close_session":
session_id = arguments.get("session_id")
if session_id not in sessions:
result = f"会话 {session_id} 不存在"
else:
sessions[session_id].close()
del sessions[session_id]
logger.info(f"关闭会话: {session_id}")
result = f"成功关闭会话: {session_id}"
return [TextContent(type="text", text=result)]
else:
return [TextContent(type="text", text=f"未知工具: {name}")]
except Exception as e:
logger.error(f"调用工具 {name} 失败: {e}")
return [TextContent(type="text", text=f"调用工具 {name} 失败: {str(e)}")]
async def main():
"""主函数"""
logger.info("启动微信公众号文章MCP服务器")
# 使用stdio_server运行服务器
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="wechat-article-server",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=None,
experimental_capabilities={}
)
)
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())