Skip to main content
Glama

MCP微信公众号爬虫

by ditingdapeng
server.py12.1 kB
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ MCP微信公众号文章爬虫服务器 - FastMCP版本 基于MCP标准实现,使用FastMCP高级封装 提供以下功能: 1. 爬取微信公众号文章内容 2. 下载文章中的图片 3. 返回结构化的文章数据 4. 提供文章内容分析 """ import asyncio import json import logging import sys import os from pathlib import Path from typing import Any, Dict, List, Optional # 导入MCP FastMCP try: from mcp.server.fastmcp import FastMCP except ImportError: # 如果FastMCP不可用,回退到标准实现 print("FastMCP不可用,请使用标准server.py") sys.exit(1) # 添加项目根目录到Python路径 project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) sys.path.append(project_root) try: # 使用简化版爬虫 from weixin_spider_simple import WeixinSpiderWithImages logging.info("使用简化版爬虫模块") except ImportError as e: logging.error(f"导入简化版爬虫模块失败: {e}") WeixinSpiderWithImages = None # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 创建FastMCP应用实例 app = FastMCP("mcp-weixin-spider") # 全局爬虫实例 spider_instance: Optional[WeixinSpiderWithImages] = None def get_spider_instance() -> WeixinSpiderWithImages: """获取爬虫实例(单例模式)""" global spider_instance if spider_instance is None: if WeixinSpiderWithImages is None: raise RuntimeError("爬虫模块未正确导入") try: spider_instance = WeixinSpiderWithImages( headless=True, # MCP服务器中使用无头模式 wait_time=10, download_images=True ) logger.info("爬虫实例初始化成功") except Exception as e: logger.error(f"爬虫实例初始化失败: {e}") raise RuntimeError(f"无法初始化爬虫实例: {e}") # 检查驱动是否仍然有效 if spider_instance.driver is None: logger.warning("检测到驱动已失效,重新初始化...") try: spider_instance.setup_driver(headless=True) logger.info("驱动重新初始化成功") except Exception as e: logger.error(f"驱动重新初始化失败: {e}") # 创建新的爬虫实例 try: spider_instance = WeixinSpiderWithImages( headless=True, wait_time=10, download_images=True ) logger.info("创建新的爬虫实例成功") except Exception as new_e: logger.error(f"创建新爬虫实例失败: {new_e}") raise RuntimeError(f"无法创建爬虫实例: {new_e}") return spider_instance @app.tool() def crawl_weixin_article(url: str, download_images: bool = True, custom_filename: str = None) -> str: """ 爬取微信公众号文章内容和图片 Args: url: 微信公众号文章的URL链接 download_images: 是否下载文章中的图片 custom_filename: 自定义文件名(可选) Returns: 爬取结果的JSON字符串 """ try: # 验证URL if not url or not isinstance(url, str) or not url.startswith("https://mp.weixin.qq.com/"): raise ValueError("无效的微信文章URL,必须以 https://mp.weixin.qq.com/ 开头") logger.info(f"开始爬取文章: {url}") # 获取爬虫实例 spider = get_spider_instance() # 设置是否下载图片 spider.download_images = download_images # 爬取文章 article_data = spider.crawl_article_by_url(url) if not article_data: raise RuntimeError("无法获取文章内容") # 保存文章到文件 success = spider.save_article_to_file(article_data, custom_filename) if success: # 构建返回结果 result = { "status": "success", "message": "文章爬取成功", "article": { "title": article_data.get("title", ""), "author": article_data.get("author", ""), "publish_time": article_data.get("publish_time", ""), "url": article_data.get("url", ""), "content_length": len(article_data.get("content", "")), "images_count": len(article_data.get("images", [])), "crawl_time": article_data.get("crawl_time", "") }, "files_saved": { "json": True, "txt": True, "images": download_images } } if download_images: images = article_data.get("images", []) success_count = sum(1 for img in images if img.get("download_success", False)) result["article"]["images_downloaded"] = f"{success_count}/{len(images)}" return json.dumps(result, ensure_ascii=False, indent=2) else: raise RuntimeError("保存文件时出错") except Exception as e: logger.error(f"爬取文章失败: {e}") error_result = { "status": "error", "message": f"爬取失败: {str(e)}", "url": url } return json.dumps(error_result, ensure_ascii=False, indent=2) @app.tool() def analyze_article_content(article_data: dict, analysis_type: str = "full") -> str: """ 分析已爬取的文章内容,提取关键信息 Args: article_data: 文章数据对象 analysis_type: 分析类型:summary(摘要), keywords(关键词), images(图片信息), full(完整分析) Returns: 分析结果的JSON字符串 """ try: if not article_data or not isinstance(article_data, dict): raise ValueError("article_data 必须是字典格式的文章数据") # 检查文章数据的基本字段 required_fields = ["title", "content"] missing_fields = [field for field in required_fields if field not in article_data] if missing_fields: logger.warning(f"文章数据缺少字段: {missing_fields}") logger.info(f"分析文章内容: analysis_type={analysis_type}, 文章标题={article_data.get('title', 'N/A')[:30]}...") result = {"analysis_type": analysis_type} if analysis_type in ["summary", "full"]: content = article_data.get("content", "") result["summary"] = { "title": article_data.get("title", ""), "author": article_data.get("author", ""), "publish_time": article_data.get("publish_time", ""), "content_preview": content[:200] + "..." if len(content) > 200 else content, "word_count": len(content), "paragraph_count": len(content.split("\n\n")) if content else 0 } if analysis_type in ["keywords", "full"]: content = article_data.get("content", "") # 简单的关键词提取 words = content.split() word_freq = {} for word in words: if len(word) > 1: # 过滤单字符 word_freq[word] = word_freq.get(word, 0) + 1 # 获取前10个高频词 top_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:10] result["keywords"] = [word for word, freq in top_words] if analysis_type in ["images", "full"]: images = article_data.get("images", []) result["images_analysis"] = { "total_count": len(images), "downloaded_count": sum(1 for img in images if img.get("download_success", False)), "failed_count": sum(1 for img in images if not img.get("download_success", False)), "image_details": [ { "filename": img.get("filename", ""), "alt_text": img.get("alt", ""), "download_success": img.get("download_success", False) } for img in images[:5] # 只显示前5张图片的详情 ] } return json.dumps(result, ensure_ascii=False, indent=2) except Exception as e: logger.error(f"分析文章内容失败: {e}") error_result = { "status": "error", "message": f"分析失败: {str(e)}" } return json.dumps(error_result, ensure_ascii=False, indent=2) @app.tool() def get_article_statistics(article_data: dict) -> str: """ 获取文章统计信息(字数、图片数量等) Args: article_data: 文章数据对象 Returns: 统计信息的JSON字符串 """ try: if not article_data or not isinstance(article_data, dict): raise ValueError("article_data 必须是字典格式的文章数据") content = article_data.get("content", "") images = article_data.get("images", []) stats = { "basic_info": { "title": article_data.get("title", ""), "author": article_data.get("author", ""), "publish_time": article_data.get("publish_time", ""), "crawl_time": article_data.get("crawl_time", "") }, "content_statistics": { "total_characters": len(content), "total_words": len(content.split()), "paragraphs": len(content.split("\n\n")) if content else 0, "lines": len(content.split("\n")) if content else 0 }, "image_statistics": { "total_images": len(images), "downloaded_successfully": sum(1 for img in images if img.get("download_success", False)), "download_failed": sum(1 for img in images if not img.get("download_success", False)), "download_success_rate": f"{(sum(1 for img in images if img.get('download_success', False)) / len(images) * 100):.1f}%" if images else "0%" } } return json.dumps(stats, ensure_ascii=False, indent=2) except Exception as e: logger.error(f"获取统计信息失败: {e}") error_result = { "status": "error", "message": f"统计失败: {str(e)}" } return json.dumps(error_result, ensure_ascii=False, indent=2) def cleanup(): """清理资源""" global spider_instance if spider_instance: try: spider_instance.close() logger.info("爬虫实例已关闭") except Exception as e: logger.error(f"关闭爬虫实例时出错: {e}") finally: spider_instance = None def main(): """启动MCP服务器""" try: logger.info("启动MCP微信爬虫服务器 (FastMCP版本)") if WeixinSpiderWithImages is None: logger.error("爬虫模块未正确导入,无法启动服务器") return logger.info("爬虫模块导入成功") logger.info("MCP微信爬虫服务器启动") # 运行FastMCP应用 app.run(transport="stdio") except KeyboardInterrupt: logger.info("收到中断信号,正在关闭服务器...") except Exception as e: logger.error(f"服务器运行出错: {e}") finally: cleanup() logger.info("MCP微信爬虫服务器已关闭") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ditingdapeng/MCPWeChatOfficialAccounts'

If you have feedback or need assistance with the MCP directory API, please join our Discord server