Skip to main content
Glama

WeChat Article Reader MCP Server

by whbfxy
content_parser.py20.4 kB
""" 内容解析器模块 提供HTML解析功能,用于从微信公众号文章页面中提取结构化数据。 """ import re import html from typing import Dict, List, Optional, Tuple from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup, Tag import base64 import mimetypes from ..models import ( WeChatArticle, ArticleContent, ArticleSummary, SearchResult, ResponseMetadata, ArticleMetadata, ContentFormat, ImageMetadata, generate_request_id ) from .errors import ParseError class WeChatArticleParser: """微信公众号文章解析器""" def __init__(self): """初始化解析器""" pass def parse_article(self, html_content: str, url: str) -> WeChatArticle: """ 解析微信公众号文章HTML内容 Args: html_content: 文章HTML内容 url: 文章URL Returns: 解析后的文章对象 Raises: ParseError: 解析失败 """ try: soup = BeautifulSoup(html_content, 'html.parser') # 提取文章元数据 metadata = self._extract_metadata(soup, url) # 提取文章内容 content = self._extract_content(soup, url) # 创建文章对象 article = WeChatArticle( url=url, title=metadata.title, author=metadata.author, publish_time=metadata.publish_time, account_name=metadata.account_name, account_id=metadata.account_id, content=content, images=metadata.images, read_count=metadata.read_count, like_count=metadata.like_count, comment_count=metadata.comment_count, content_hash=self._generate_content_hash(content), extracted_at=self._get_current_timestamp() ) return article except Exception as e: raise ParseError(f"解析文章失败: {str(e)}") def _extract_metadata(self, soup: BeautifulSoup, url: str) -> ArticleMetadata: """ 提取文章元数据 Args: soup: BeautifulSoup对象 url: 文章URL Returns: 文章元数据对象 """ # 提取标题 title = self._extract_title(soup) # 提取作者 author = self._extract_author(soup) # 提取发布时间 publish_time = self._extract_publish_time(soup) # 提取公众号名称 account_name = self._extract_account_name(soup) # 提取公众号ID account_id = self._extract_account_id(soup) # 提取图片 images = self._extract_images(soup, url) # 提取阅读数 read_count = self._extract_read_count(soup) # 提取点赞数 like_count = self._extract_like_count(soup) # 提取评论数 comment_count = self._extract_comment_count(soup) return ArticleMetadata( title=title, author=author, publish_time=publish_time, account_name=account_name, account_id=account_id, images=images, read_count=read_count, like_count=like_count, comment_count=comment_count ) def _extract_title(self, soup: BeautifulSoup) -> str: """提取文章标题""" # 尝试多种方式提取标题 selectors = [ "h1.rich_media_title", "h1#activity-name", ".rich_media_title", "#activity-name", "h1", "title" ] for selector in selectors: element = soup.select_one(selector) if element: title = element.get_text(strip=True) if title: return html.unescape(title) return "未知标题" def _extract_author(self, soup: BeautifulSoup) -> str: """提取文章作者""" # 尝试多种方式提取作者 selectors = [ ".rich_media_meta_text", ".rich_media_meta.rich_media_meta_text", "#meta_content .rich_media_meta_text", ".rich_media_meta_list .rich_media_meta_text" ] for selector in selectors: elements = soup.select(selector) for element in elements: text = element.get_text(strip=True) # 过滤掉日期等非作者信息 if text and not re.search(r'\d{4}-\d{2}-\d{2}|\d{2}:\d{2}', text): return html.unescape(text) return "未知作者" def _extract_publish_time(self, soup: BeautifulSoup) -> Optional[str]: """提取文章发布时间""" # 尝试多种方式提取发布时间 selectors = [ ".rich_media_meta_text", ".rich_media_meta.rich_media_meta_text", "#meta_content .rich_media_meta_text", ".rich_media_meta_list .rich_media_meta_text" ] for selector in selectors: elements = soup.select(selector) for element in elements: text = element.get_text(strip=True) # 匹配日期时间格式 if re.search(r'\d{4}-\d{2}-\d{2}|\d{2}:\d{2}', text): return html.unescape(text) return None def _extract_account_name(self, soup: BeautifulSoup) -> str: """提取公众号名称""" # 尝试多种方式提取公众号名称 selectors = [ ".rich_media_meta_link", "#js_profile_qrcode > div > strong", ".profile_nickname", "#profileBt > a > span" ] for selector in selectors: element = soup.select_one(selector) if element: text = element.get_text(strip=True) if text: return html.unescape(text) return "未知公众号" def _extract_account_id(self, soup: BeautifulSoup) -> str: """提取公众号ID""" # 尝试多种方式提取公众号ID selectors = [ ".rich_media_meta_link", "#js_profile_qrcode > div > strong", ".profile_nickname", "#profileBt > a > span" ] for selector in selectors: element = soup.select_one(selector) if element and element.get('href'): # 从链接中提取公众号ID href = element.get('href', '') match = re.search(r'__biz=([^&]+)', href) if match: return match.group(1) return "未知ID" def _extract_images(self, soup: BeautifulSoup, base_url: str) -> List[ImageMetadata]: """提取文章中的图片信息""" images = [] # 查找所有图片元素 img_elements = soup.select("img") for i, img in enumerate(img_elements): src = img.get('src', '') data_src = img.get('data-src', '') data_type = img.get('data-type', '') data_w = img.get('data-w', '') data_ratio = img.get('data-ratio', '') # 微信文章中的图片通常使用data-src属性 img_url = data_src if data_src else src if not img_url: continue # 处理相对URL if img_url.startswith('//'): img_url = f'https:{img_url}' elif img_url.startswith('/'): img_url = urljoin(base_url, img_url) # 获取图片alt文本 alt = img.get('alt', '') # 获取图片尺寸 width = None height = None if data_w and data_ratio: try: width = int(data_w) height = int(width * float(data_ratio)) except (ValueError, TypeError): pass # 获取图片类型 img_type = data_type if data_type else self._guess_image_type(img_url) # 创建图片元数据 image_metadata = ImageMetadata( url=img_url, alt_text=alt, width=width, height=height, type=img_type, index=i ) images.append(image_metadata) return images def _extract_read_count(self, soup: BeautifulSoup) -> Optional[int]: """提取文章阅读数""" # 微信文章的阅读数通常在页面底部 selectors = [ "#readNum3", ".rich_media_extra_text", "#js_bottom_ad_area .rich_media_extra_text" ] for selector in selectors: element = soup.select_one(selector) if element: text = element.get_text(strip=True) # 提取数字 match = re.search(r'(\d+)', text) if match: try: return int(match.group(1)) except ValueError: pass return None def _extract_like_count(self, soup: BeautifulSoup) -> Optional[int]: """提取文章点赞数""" # 微信文章的点赞数通常在页面底部 selectors = [ "#likeNum3", ".rich_media_extra_text", "#js_bottom_ad_area .rich_media_extra_text" ] for selector in selectors: element = soup.select_one(selector) if element: text = element.get_text(strip=True) # 提取数字 match = re.search(r'(\d+)', text) if match: try: return int(match.group(1)) except ValueError: pass return None def _extract_comment_count(self, soup: BeautifulSoup) -> Optional[int]: """提取文章评论数""" # 微信文章的评论数通常在页面底部 selectors = [ "#comment_num", ".rich_media_extra_text", "#js_bottom_ad_area .rich_media_extra_text" ] for selector in selectors: element = soup.select_one(selector) if element: text = element.get_text(strip=True) # 提取数字 match = re.search(r'(\d+)', text) if match: try: return int(match.group(1)) except ValueError: pass return None def _extract_content(self, soup: BeautifulSoup, base_url: str) -> Dict[str, str]: """ 提取文章内容,返回多种格式 Args: soup: BeautifulSoup对象 base_url: 基础URL Returns: 包含多种格式内容的字典 """ # 查找文章内容容器 content_container = soup.select_one(".rich_media_content") if not content_container: # 如果找不到标准容器,尝试其他可能的容器 alternative_selectors = [ "#js_content", ".content", ".article-content", "div[id*='content']" ] for selector in alternative_selectors: content_container = soup.select_one(selector) if content_container: break if not content_container: # 如果仍然找不到容器,使用整个body content_container = soup.find('body') or soup # 提取不同格式的内容 content = { ContentFormat.HTML.value: self._extract_html_content(content_container, base_url), ContentFormat.MARKDOWN.value: self._extract_markdown_content(content_container, base_url), ContentFormat.TEXT.value: self._extract_text_content(content_container) } return content def _extract_html_content(self, container: Tag, base_url: str) -> str: """ 提取HTML格式内容 Args: container: 内容容器 base_url: 基础URL Returns: HTML格式内容 """ # 克隆容器以避免修改原始DOM container_clone = container.__copy__() # 处理图片URL for img in container_clone.find_all('img'): src = img.get('src', '') data_src = img.get('data-src', '') # 使用data-src作为图片URL if data_src: img['src'] = data_src # 处理相对URL if img['src'].startswith('//'): img['src'] = f'https:{img["src"]}' elif img['src'].startswith('/'): img['src'] = urljoin(base_url, img['src']) # 返回HTML内容 return str(container_clone) def _extract_markdown_content(self, container: Tag, base_url: str) -> str: """ 提取Markdown格式内容 Args: container: 内容容器 base_url: 基础URL Returns: Markdown格式内容 """ markdown_lines = [] # 处理每个子元素 for element in container.children: if isinstance(element, Tag): markdown_lines.append(self._convert_element_to_markdown(element, base_url)) elif element.strip(): markdown_lines.append(element.strip()) return "\n\n".join(markdown_lines) def _convert_element_to_markdown(self, element: Tag, base_url: str) -> str: """ 将HTML元素转换为Markdown Args: element: HTML元素 base_url: 基础URL Returns: Markdown字符串 """ tag_name = element.name.lower() if tag_name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']: level = int(tag_name[1]) text = element.get_text(strip=True) return f"{'#' * level} {text}" elif tag_name == 'p': return self._process_paragraph(element, base_url) elif tag_name == 'strong' or tag_name == 'b': text = element.get_text(strip=True) return f"**{text}**" elif tag_name == 'em' or tag_name == 'i': text = element.get_text(strip=True) return f"*{text}*" elif tag_name == 'a': href = element.get('href', '') text = element.get_text(strip=True) return f"[{text}]({href})" elif tag_name == 'img': return self._process_image(element, base_url) elif tag_name == 'blockquote': text = element.get_text(strip=True) return f"> {text}" elif tag_name == 'code': text = element.get_text(strip=True) return f"`{text}`" elif tag_name == 'pre': text = element.get_text() return f"```\n{text}\n```" elif tag_name == 'ul': items = [] for li in element.find_all('li', recursive=False): item_text = self._process_paragraph(li, base_url) items.append(f"- {item_text}") return "\n".join(items) elif tag_name == 'ol': items = [] for i, li in enumerate(element.find_all('li', recursive=False), 1): item_text = self._process_paragraph(li, base_url) items.append(f"{i}. {item_text}") return "\n".join(items) elif tag_name == 'br': return "\n" else: # 对于其他标签,递归处理子元素 result = [] for child in element.children: if isinstance(child, Tag): result.append(self._convert_element_to_markdown(child, base_url)) elif child.strip(): result.append(child.strip()) return "".join(result) def _process_paragraph(self, element: Tag, base_url: str) -> str: """ 处理段落元素 Args: element: 段落元素 base_url: 基础URL Returns: 处理后的文本 """ result = [] for child in element.children: if isinstance(child, Tag): result.append(self._convert_element_to_markdown(child, base_url)) elif child.strip(): result.append(child.strip()) return "".join(result) def _process_image(self, img: Tag, base_url: str) -> str: """ 处理图片元素 Args: img: 图片元素 base_url: 基础URL Returns: Markdown图片语法 """ src = img.get('src', '') data_src = img.get('data-src', '') alt = img.get('alt', '') # 使用data-src作为图片URL img_url = data_src if data_src else src if not img_url: return "" # 处理相对URL if img_url.startswith('//'): img_url = f'https:{img_url}' elif img_url.startswith('/'): img_url = urljoin(base_url, img_url) return f"![{alt}]({img_url})" def _extract_text_content(self, container: Tag) -> str: """ 提取纯文本内容 Args: container: 内容容器 Returns: 纯文本内容 """ return container.get_text(separator='\n', strip=True) def _guess_image_type(self, url: str) -> str: """ 根据URL猜测图片类型 Args: url: 图片URL Returns: 图片类型 """ # 从URL中提取文件扩展名 parsed_url = urlparse(url) path = parsed_url.path.lower() if path.endswith('.jpg') or path.endswith('.jpeg'): return 'image/jpeg' elif path.endswith('.png'): return 'image/png' elif path.endswith('.gif'): return 'image/gif' elif path.endswith('.webp'): return 'image/webp' else: # 默认返回jpeg return 'image/jpeg' def _generate_content_hash(self, content: Dict[str, str]) -> str: """ 生成内容哈希 Args: content: 内容字典 Returns: 内容哈希值 """ import hashlib # 使用文本内容生成哈希 text_content = content.get(ContentFormat.TEXT.value, '') return hashlib.md5(text_content.encode('utf-8')).hexdigest() def _get_current_timestamp(self) -> str: """ 获取当前时间戳 Returns: 当前时间戳字符串 """ from datetime import datetime return datetime.now().isoformat() def parse_wechat_article(html_content: str, url: str) -> WeChatArticle: """ 便捷函数:解析微信公众号文章 Args: html_content: 文章HTML内容 url: 文章URL Returns: 解析后的文章对象 """ parser = WeChatArticleParser() return parser.parse_article(html_content, url)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whbfxy/MCP101Demo'

If you have feedback or need assistance with the MCP directory API, please join our Discord server