content_parser.py•20.4 kB
"""
内容解析器模块
提供HTML解析功能,用于从微信公众号文章页面中提取结构化数据。
"""
import re
import html
from typing import Dict, List, Optional, Tuple
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup, Tag
import base64
import mimetypes
from ..models import (
WeChatArticle, ArticleContent, ArticleSummary, SearchResult, ResponseMetadata,
ArticleMetadata, ContentFormat, ImageMetadata, generate_request_id
)
from .errors import ParseError
class WeChatArticleParser:
"""微信公众号文章解析器"""
def __init__(self):
"""初始化解析器"""
pass
def parse_article(self, html_content: str, url: str) -> WeChatArticle:
"""
解析微信公众号文章HTML内容
Args:
html_content: 文章HTML内容
url: 文章URL
Returns:
解析后的文章对象
Raises:
ParseError: 解析失败
"""
try:
soup = BeautifulSoup(html_content, 'html.parser')
# 提取文章元数据
metadata = self._extract_metadata(soup, url)
# 提取文章内容
content = self._extract_content(soup, url)
# 创建文章对象
article = WeChatArticle(
url=url,
title=metadata.title,
author=metadata.author,
publish_time=metadata.publish_time,
account_name=metadata.account_name,
account_id=metadata.account_id,
content=content,
images=metadata.images,
read_count=metadata.read_count,
like_count=metadata.like_count,
comment_count=metadata.comment_count,
content_hash=self._generate_content_hash(content),
extracted_at=self._get_current_timestamp()
)
return article
except Exception as e:
raise ParseError(f"解析文章失败: {str(e)}")
def _extract_metadata(self, soup: BeautifulSoup, url: str) -> ArticleMetadata:
"""
提取文章元数据
Args:
soup: BeautifulSoup对象
url: 文章URL
Returns:
文章元数据对象
"""
# 提取标题
title = self._extract_title(soup)
# 提取作者
author = self._extract_author(soup)
# 提取发布时间
publish_time = self._extract_publish_time(soup)
# 提取公众号名称
account_name = self._extract_account_name(soup)
# 提取公众号ID
account_id = self._extract_account_id(soup)
# 提取图片
images = self._extract_images(soup, url)
# 提取阅读数
read_count = self._extract_read_count(soup)
# 提取点赞数
like_count = self._extract_like_count(soup)
# 提取评论数
comment_count = self._extract_comment_count(soup)
return ArticleMetadata(
title=title,
author=author,
publish_time=publish_time,
account_name=account_name,
account_id=account_id,
images=images,
read_count=read_count,
like_count=like_count,
comment_count=comment_count
)
def _extract_title(self, soup: BeautifulSoup) -> str:
"""提取文章标题"""
# 尝试多种方式提取标题
selectors = [
"h1.rich_media_title",
"h1#activity-name",
".rich_media_title",
"#activity-name",
"h1",
"title"
]
for selector in selectors:
element = soup.select_one(selector)
if element:
title = element.get_text(strip=True)
if title:
return html.unescape(title)
return "未知标题"
def _extract_author(self, soup: BeautifulSoup) -> str:
"""提取文章作者"""
# 尝试多种方式提取作者
selectors = [
".rich_media_meta_text",
".rich_media_meta.rich_media_meta_text",
"#meta_content .rich_media_meta_text",
".rich_media_meta_list .rich_media_meta_text"
]
for selector in selectors:
elements = soup.select(selector)
for element in elements:
text = element.get_text(strip=True)
# 过滤掉日期等非作者信息
if text and not re.search(r'\d{4}-\d{2}-\d{2}|\d{2}:\d{2}', text):
return html.unescape(text)
return "未知作者"
def _extract_publish_time(self, soup: BeautifulSoup) -> Optional[str]:
"""提取文章发布时间"""
# 尝试多种方式提取发布时间
selectors = [
".rich_media_meta_text",
".rich_media_meta.rich_media_meta_text",
"#meta_content .rich_media_meta_text",
".rich_media_meta_list .rich_media_meta_text"
]
for selector in selectors:
elements = soup.select(selector)
for element in elements:
text = element.get_text(strip=True)
# 匹配日期时间格式
if re.search(r'\d{4}-\d{2}-\d{2}|\d{2}:\d{2}', text):
return html.unescape(text)
return None
def _extract_account_name(self, soup: BeautifulSoup) -> str:
"""提取公众号名称"""
# 尝试多种方式提取公众号名称
selectors = [
".rich_media_meta_link",
"#js_profile_qrcode > div > strong",
".profile_nickname",
"#profileBt > a > span"
]
for selector in selectors:
element = soup.select_one(selector)
if element:
text = element.get_text(strip=True)
if text:
return html.unescape(text)
return "未知公众号"
def _extract_account_id(self, soup: BeautifulSoup) -> str:
"""提取公众号ID"""
# 尝试多种方式提取公众号ID
selectors = [
".rich_media_meta_link",
"#js_profile_qrcode > div > strong",
".profile_nickname",
"#profileBt > a > span"
]
for selector in selectors:
element = soup.select_one(selector)
if element and element.get('href'):
# 从链接中提取公众号ID
href = element.get('href', '')
match = re.search(r'__biz=([^&]+)', href)
if match:
return match.group(1)
return "未知ID"
def _extract_images(self, soup: BeautifulSoup, base_url: str) -> List[ImageMetadata]:
"""提取文章中的图片信息"""
images = []
# 查找所有图片元素
img_elements = soup.select("img")
for i, img in enumerate(img_elements):
src = img.get('src', '')
data_src = img.get('data-src', '')
data_type = img.get('data-type', '')
data_w = img.get('data-w', '')
data_ratio = img.get('data-ratio', '')
# 微信文章中的图片通常使用data-src属性
img_url = data_src if data_src else src
if not img_url:
continue
# 处理相对URL
if img_url.startswith('//'):
img_url = f'https:{img_url}'
elif img_url.startswith('/'):
img_url = urljoin(base_url, img_url)
# 获取图片alt文本
alt = img.get('alt', '')
# 获取图片尺寸
width = None
height = None
if data_w and data_ratio:
try:
width = int(data_w)
height = int(width * float(data_ratio))
except (ValueError, TypeError):
pass
# 获取图片类型
img_type = data_type if data_type else self._guess_image_type(img_url)
# 创建图片元数据
image_metadata = ImageMetadata(
url=img_url,
alt_text=alt,
width=width,
height=height,
type=img_type,
index=i
)
images.append(image_metadata)
return images
def _extract_read_count(self, soup: BeautifulSoup) -> Optional[int]:
"""提取文章阅读数"""
# 微信文章的阅读数通常在页面底部
selectors = [
"#readNum3",
".rich_media_extra_text",
"#js_bottom_ad_area .rich_media_extra_text"
]
for selector in selectors:
element = soup.select_one(selector)
if element:
text = element.get_text(strip=True)
# 提取数字
match = re.search(r'(\d+)', text)
if match:
try:
return int(match.group(1))
except ValueError:
pass
return None
def _extract_like_count(self, soup: BeautifulSoup) -> Optional[int]:
"""提取文章点赞数"""
# 微信文章的点赞数通常在页面底部
selectors = [
"#likeNum3",
".rich_media_extra_text",
"#js_bottom_ad_area .rich_media_extra_text"
]
for selector in selectors:
element = soup.select_one(selector)
if element:
text = element.get_text(strip=True)
# 提取数字
match = re.search(r'(\d+)', text)
if match:
try:
return int(match.group(1))
except ValueError:
pass
return None
def _extract_comment_count(self, soup: BeautifulSoup) -> Optional[int]:
"""提取文章评论数"""
# 微信文章的评论数通常在页面底部
selectors = [
"#comment_num",
".rich_media_extra_text",
"#js_bottom_ad_area .rich_media_extra_text"
]
for selector in selectors:
element = soup.select_one(selector)
if element:
text = element.get_text(strip=True)
# 提取数字
match = re.search(r'(\d+)', text)
if match:
try:
return int(match.group(1))
except ValueError:
pass
return None
def _extract_content(self, soup: BeautifulSoup, base_url: str) -> Dict[str, str]:
"""
提取文章内容,返回多种格式
Args:
soup: BeautifulSoup对象
base_url: 基础URL
Returns:
包含多种格式内容的字典
"""
# 查找文章内容容器
content_container = soup.select_one(".rich_media_content")
if not content_container:
# 如果找不到标准容器,尝试其他可能的容器
alternative_selectors = [
"#js_content",
".content",
".article-content",
"div[id*='content']"
]
for selector in alternative_selectors:
content_container = soup.select_one(selector)
if content_container:
break
if not content_container:
# 如果仍然找不到容器,使用整个body
content_container = soup.find('body') or soup
# 提取不同格式的内容
content = {
ContentFormat.HTML.value: self._extract_html_content(content_container, base_url),
ContentFormat.MARKDOWN.value: self._extract_markdown_content(content_container, base_url),
ContentFormat.TEXT.value: self._extract_text_content(content_container)
}
return content
def _extract_html_content(self, container: Tag, base_url: str) -> str:
"""
提取HTML格式内容
Args:
container: 内容容器
base_url: 基础URL
Returns:
HTML格式内容
"""
# 克隆容器以避免修改原始DOM
container_clone = container.__copy__()
# 处理图片URL
for img in container_clone.find_all('img'):
src = img.get('src', '')
data_src = img.get('data-src', '')
# 使用data-src作为图片URL
if data_src:
img['src'] = data_src
# 处理相对URL
if img['src'].startswith('//'):
img['src'] = f'https:{img["src"]}'
elif img['src'].startswith('/'):
img['src'] = urljoin(base_url, img['src'])
# 返回HTML内容
return str(container_clone)
def _extract_markdown_content(self, container: Tag, base_url: str) -> str:
"""
提取Markdown格式内容
Args:
container: 内容容器
base_url: 基础URL
Returns:
Markdown格式内容
"""
markdown_lines = []
# 处理每个子元素
for element in container.children:
if isinstance(element, Tag):
markdown_lines.append(self._convert_element_to_markdown(element, base_url))
elif element.strip():
markdown_lines.append(element.strip())
return "\n\n".join(markdown_lines)
def _convert_element_to_markdown(self, element: Tag, base_url: str) -> str:
"""
将HTML元素转换为Markdown
Args:
element: HTML元素
base_url: 基础URL
Returns:
Markdown字符串
"""
tag_name = element.name.lower()
if tag_name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
level = int(tag_name[1])
text = element.get_text(strip=True)
return f"{'#' * level} {text}"
elif tag_name == 'p':
return self._process_paragraph(element, base_url)
elif tag_name == 'strong' or tag_name == 'b':
text = element.get_text(strip=True)
return f"**{text}**"
elif tag_name == 'em' or tag_name == 'i':
text = element.get_text(strip=True)
return f"*{text}*"
elif tag_name == 'a':
href = element.get('href', '')
text = element.get_text(strip=True)
return f"[{text}]({href})"
elif tag_name == 'img':
return self._process_image(element, base_url)
elif tag_name == 'blockquote':
text = element.get_text(strip=True)
return f"> {text}"
elif tag_name == 'code':
text = element.get_text(strip=True)
return f"`{text}`"
elif tag_name == 'pre':
text = element.get_text()
return f"```\n{text}\n```"
elif tag_name == 'ul':
items = []
for li in element.find_all('li', recursive=False):
item_text = self._process_paragraph(li, base_url)
items.append(f"- {item_text}")
return "\n".join(items)
elif tag_name == 'ol':
items = []
for i, li in enumerate(element.find_all('li', recursive=False), 1):
item_text = self._process_paragraph(li, base_url)
items.append(f"{i}. {item_text}")
return "\n".join(items)
elif tag_name == 'br':
return "\n"
else:
# 对于其他标签,递归处理子元素
result = []
for child in element.children:
if isinstance(child, Tag):
result.append(self._convert_element_to_markdown(child, base_url))
elif child.strip():
result.append(child.strip())
return "".join(result)
def _process_paragraph(self, element: Tag, base_url: str) -> str:
"""
处理段落元素
Args:
element: 段落元素
base_url: 基础URL
Returns:
处理后的文本
"""
result = []
for child in element.children:
if isinstance(child, Tag):
result.append(self._convert_element_to_markdown(child, base_url))
elif child.strip():
result.append(child.strip())
return "".join(result)
def _process_image(self, img: Tag, base_url: str) -> str:
"""
处理图片元素
Args:
img: 图片元素
base_url: 基础URL
Returns:
Markdown图片语法
"""
src = img.get('src', '')
data_src = img.get('data-src', '')
alt = img.get('alt', '')
# 使用data-src作为图片URL
img_url = data_src if data_src else src
if not img_url:
return ""
# 处理相对URL
if img_url.startswith('//'):
img_url = f'https:{img_url}'
elif img_url.startswith('/'):
img_url = urljoin(base_url, img_url)
return f""
def _extract_text_content(self, container: Tag) -> str:
"""
提取纯文本内容
Args:
container: 内容容器
Returns:
纯文本内容
"""
return container.get_text(separator='\n', strip=True)
def _guess_image_type(self, url: str) -> str:
"""
根据URL猜测图片类型
Args:
url: 图片URL
Returns:
图片类型
"""
# 从URL中提取文件扩展名
parsed_url = urlparse(url)
path = parsed_url.path.lower()
if path.endswith('.jpg') or path.endswith('.jpeg'):
return 'image/jpeg'
elif path.endswith('.png'):
return 'image/png'
elif path.endswith('.gif'):
return 'image/gif'
elif path.endswith('.webp'):
return 'image/webp'
else:
# 默认返回jpeg
return 'image/jpeg'
def _generate_content_hash(self, content: Dict[str, str]) -> str:
"""
生成内容哈希
Args:
content: 内容字典
Returns:
内容哈希值
"""
import hashlib
# 使用文本内容生成哈希
text_content = content.get(ContentFormat.TEXT.value, '')
return hashlib.md5(text_content.encode('utf-8')).hexdigest()
def _get_current_timestamp(self) -> str:
"""
获取当前时间戳
Returns:
当前时间戳字符串
"""
from datetime import datetime
return datetime.now().isoformat()
def parse_wechat_article(html_content: str, url: str) -> WeChatArticle:
"""
便捷函数:解析微信公众号文章
Args:
html_content: 文章HTML内容
url: 文章URL
Returns:
解析后的文章对象
"""
parser = WeChatArticleParser()
return parser.parse_article(html_content, url)