We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/herofox2024/xiayan-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""WeChat Official Account publisher."""
import os
import asyncio
import logging
from typing import Dict, Optional, List, Union, Tuple
import aiohttp
import json
import mimetypes
import tempfile
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger = logging.getLogger(__name__)
# 微信API错误码映射表
WECHAT_ERROR_CODES = {
40001: "无效的凭证(access_token)",
40002: "不合法的凭证类型",
40003: "不合法的OpenID",
40004: "不合法的媒体文件类型",
40005: "不合法的文件类型",
40006: "不合法的文件大小",
40007: "不合法的媒体文件id",
40008: "不合法的消息类型",
40009: "不合法的图片文件大小",
40010: "不合法的语音文件大小",
40011: "不合法的视频文件大小",
40012: "不合法的缩略图文件大小",
40013: "不合法的AppID",
40014: "不合法的access_token",
40015: "不合法的菜单类型",
40016: "不合法的按钮个数",
40017: "不合法的按钮个数",
40018: "不合法的按钮名字长度",
40019: "不合法的按钮KEY长度",
40020: "不合法的按钮URL长度",
40021: "不合法的菜单版本号",
40022: "不合法的子菜单级数",
40023: "不合法的子菜单按钮个数",
40024: "不合法的子菜单按钮类型",
40025: "不合法的子菜单按钮名字长度",
40026: "不合法的子菜单按钮KEY长度",
40027: "不合法的子菜单按钮URL长度",
40028: "不合法的自定义菜单使用用户",
40029: "不合法的oauth_code",
40030: "不合法的refresh_token",
40031: "不合法的openid列表",
40032: "不合法的openid列表长度",
40033: "不合法的请求字符",
40035: "不合法的参数",
40038: "不合法的请求格式",
40039: "不合法的URL长度",
40050: "不合法的分组id",
40051: "分组名字不合法",
40060: "删除单篇图文时,指定的article_idx不合法",
40117: "分组名字不合法",
40118: "media_id大小不合法",
40119: "button类型错误",
40120: "button名字错误",
40121: "button KEY错误",
40122: "button URL错误",
40132: "不合法的媒体文件类型",
40137: "不合法的APPID",
40155: "不合法的素材类型",
41001: "缺少access_token参数",
41002: "缺少appid参数",
41003: "缺少refresh_token参数",
41004: "缺少secret参数",
41005: "缺少多媒体文件数据",
41006: "缺少media_id参数",
41007: "缺少子菜单数据",
41008: "缺少oauth code",
41009: "缺少openid",
42001: "access_token超时",
42002: "refresh_token超时",
42003: "oauth_code超时",
42007: "用户修改微信密码,accesstoken和refreshtoken失效",
43002: "需要GET请求",
43003: "需要POST请求",
43004: "需要HTTPS请求",
43005: "需要接收者关注",
43006: "需要好友关系",
44001: "多媒体文件为空",
44002: "POST的数据包为空",
44003: "图文消息内容为空",
44004: "文本消息内容为空",
45001: "多媒体文件大小超过限制",
45002: "消息内容超过限制",
45003: "标题字段超过限制",
45004: "描述字段超过限制",
45005: "链接字段超过限制",
45006: "图片链接字段超过限制",
45007: "语音播放时间超过限制",
45008: "图文消息超过限制",
45009: "接口调用超过限制",
45010: "创建菜单个数超过限制",
45011: "API调用太频繁,请稍候再试",
45015: "回复时间超过限制",
45016: "系统繁忙",
45017: "媒体文件格式不正确",
45018: "上传的文件不是合法的媒体文件",
45019: "上传的缩略图不是合法的图片文件",
45022: "图片大小超过限制",
45026: "文字消息超过限制",
45038: "图文消息title字段超过限制",
47001: "解析JSON/XML内容错误",
48001: "api功能未授权",
50001: "用户未授权该api",
50002: "用户受限,可能是违规后接口被封禁",
61451: "参数错误(invalid parameter)",
61452: "无效客服账号(invalid kf_account)",
61453: "客服帐号已存在(kf_account exsited)",
61454: "客服帐号名长度超过限制(仅允许10个英文字符,不包括@及@后的公众号的微信号)",
61455: "客服帐号名包含非法字符(仅允许英文+数字)",
61456: "客服帐号个数超过限制(10个客服账号)",
61457: "无效头像文件类型",
61450: "系统错误(sys error)",
61500: "日期格式错误",
61501: "日期范围错误",
9001001: "POST数据参数不合法",
9001002: "远端服务不可用",
9001003: "Ticket不合法",
9001004: "获取摇周边ticket失败",
9001005: "获取设备ID失败",
9001006: "获取openid失败",
9001007: "上传文件缺失",
9001008: "上传素材的文件类型不合法",
9001009: "上传素材的文件尺寸不合法",
9001010: "上传失败",
9001020: "帐号不合法",
9001021: "已有设备激活率低于50%,不能新增设备",
9001022: "设备申请数不合法,必须为大于0的数字",
9001023: "已存在审核中的设备ID申请",
9001024: "一次查询设备ID数量不能超过50",
9001025: "设备ID不合法",
9001026: "页面ID不合法",
9001027: "页面参数不合法",
9001028: "一次删除页面ID数量不能超过10",
9001029: "页面已应用在设备中,请先解除应用关系",
9001030: "一次查询页面ID数量不能超过50",
9001031: "时间区间不合法",
9001032: "保存设备与页面的绑定关系参数错误",
9001033: "门店ID不合法",
9001034: "设备备注信息过长",
9001035: "设备申请参数不合法",
9001036: "查询起始值begin不合法"
}
class WeChatPublisher:
"""Publisher for WeChat Official Account."""
def __init__(self):
"""Initialize the publisher with WeChat API credentials."""
self.app_id = os.getenv('WECHAT_APP_ID', '')
self.app_secret = os.getenv('WECHAT_APP_SECRET', '')
self.base_url = 'https://api.weixin.qq.com/cgi-bin'
self.access_token: Optional[str] = None
self.token_expires_at: Optional[int] = None
def _handle_wechat_api_error(self, data: Dict, context: str = "操作") -> None:
"""Handle WeChat API error responses.
Args:
data: API response data
context: Operation context for error message
Raises:
Exception: With friendly error message
"""
if 'access_token' in data or 'media_id' in data:
return # No error, upload successful
errcode = data.get('errcode')
errmsg = data.get('errmsg', 'Unknown error')
# Get friendly error message from mapping table
friendly_msg = WECHAT_ERROR_CODES.get(errcode, errmsg)
raise Exception(f"{context}失败: {friendly_msg} (错误码: {errcode})")
async def _get_access_token(self) -> str:
"""Get WeChat API access token using the new stable access token API."""
import time
# Check if token is still valid
if self.access_token and self.token_expires_at and time.time() < self.token_expires_at:
return self.access_token
if not self.app_id or not self.app_secret:
raise ValueError("WECHAT_APP_ID and WECHAT_APP_SECRET environment variables are required")
# Use the new stable access token API
url = f"{self.base_url}/token"
params = {
'grant_type': 'client_credential',
'appid': self.app_id,
'secret': self.app_secret
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
# First check response status
if response.status != 200:
error_text = await response.text()
raise Exception(f"HTTP error {response.status}: {error_text}")
# Get response text and try to parse as JSON regardless of content type
response_text = await response.text()
# Debug: Log response for troubleshooting
logger.debug(f"Token response: {response_text[:200]}")
try:
data = json.loads(response_text)
except json.JSONDecodeError as e:
content_type = response.headers.get('content-type', '')
raise Exception(f"Failed to parse JSON response from {content_type}: {e}\nResponse content: {response_text[:500]}")
# Handle API errors
self._handle_wechat_api_error(data, "获取访问令牌")
self.access_token = data['access_token']
expires_in = data.get('expires_in', 7200)
self.token_expires_at = time.time() + expires_in - 300 # Refresh 5 minutes before expiry
return self.access_token
except Exception as e:
# If the old API fails, try the new stable access token API
logger.info("Falling back to stable access token API...")
return await self._get_stable_access_token()
async def _get_stable_access_token(self) -> str:
"""Get stable access token using the new API."""
import time
# New stable access token API
url = f"{self.base_url}/stable_token"
data = {
"grant_type": "client_credential",
"appid": self.app_id,
"secret": self.app_secret,
"force_refresh": False
}
async with aiohttp.ClientSession() as session:
headers = {'Content-Type': 'application/json; charset=utf-8'}
# 手动序列化JSON,确保中文字符不被转义
json_data = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
async with session.post(url, data=json_data, headers=headers) as response:
# First check response status
if response.status != 200:
error_text = await response.text()
raise Exception(f"HTTP error {response.status}: {error_text}")
# Get response text and try to parse as JSON regardless of content type
response_text = await response.text()
# Debug: Log response for troubleshooting
logger.debug(f"Stable token response: {response_text[:200]}")
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
content_type = response.headers.get('content-type', '')
raise Exception(f"Failed to parse JSON response from {content_type}: {e}\nResponse content: {response_text[:500]}")
# Handle API errors
self._handle_wechat_api_error(result, "获取稳定访问令牌")
if 'access_token' not in result or 'expires_in' not in result:
raise Exception(f"Invalid response from stable token API: 缺少必要字段")
self.access_token = result['access_token']
expires_in = result['expires_in']
self.token_expires_at = time.time() + expires_in - 300 # Refresh 5 minutes before expiry
return self.access_token
async def _resize_image_for_thumb(self, image_path: str, max_size_kb: int = 64) -> str:
"""
Resize image to meet WeChat thumb requirements (max 64KB).
Args:
image_path: Path to original image
max_size_kb: Maximum size in KB (default: 64)
Returns:
Path to resized temporary image file
"""
try:
from PIL import Image
import tempfile
# Open image
img = Image.open(image_path)
# Convert to RGB if necessary
if img.mode != 'RGB':
img = img.convert('RGB')
# Calculate target size (keeping aspect ratio)
width, height = img.size
max_dimension = 400 # Recommended size for thumb
if width > max_dimension or height > max_dimension:
# Calculate new size maintaining aspect ratio
ratio = min(max_dimension / width, max_dimension / height)
new_width = int(width * ratio)
new_height = int(height * ratio)
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# Save to temporary file with compression
temp_file = tempfile.NamedTemporaryFile(suffix='.jpg', delete=False)
temp_path = temp_file.name
temp_file.close()
# Try different quality levels to meet size requirement
quality = 85
while quality > 10:
img.save(temp_path, 'JPEG', quality=quality, optimize=True)
# Check file size
file_size_kb = os.path.getsize(temp_path) / 1024
if file_size_kb <= max_size_kb:
return temp_path
quality -= 10
# If still too large, try smaller dimensions
max_dimension = 300
if width > max_dimension or height > max_dimension:
ratio = min(max_dimension / width, max_dimension / height)
new_width = int(width * ratio)
new_height = int(height * ratio)
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
quality = 85
while quality > 10:
img.save(temp_path, 'JPEG', quality=quality, optimize=True)
file_size_kb = os.path.getsize(temp_path) / 1024
if file_size_kb <= max_size_kb:
return temp_path
quality -= 10
# Return the resized image even if slightly over limit
return temp_path
except ImportError:
# If PIL is not available, return original path
logger.warning("PIL not available, cannot resize image. Install Pillow for image resizing.")
return image_path
except Exception as e:
logger.error(f"Error resizing image: {e}")
return image_path
async def _upload_media(self, media_path: str, media_type: str = 'image', permanent: bool = False,
description: Optional[Dict] = None) -> str:
"""
Upload media file to WeChat server.
Args:
media_path: Path to media file (local or remote)
media_type: Type of media ('image', 'voice', 'video', 'thumb')
permanent: Whether to upload as permanent material (default: False for temporary)
description: Additional description for video material (required for permanent video)
Returns:
Media ID from WeChat server
"""
access_token = await self._get_access_token()
# Choose API endpoint based on permanent or temporary
if permanent:
url = f"{self.base_url}/material/add_material?access_token={access_token}&type={media_type}"
else:
url = f"{self.base_url}/media/upload?access_token={access_token}&type={media_type}"
# Handle different media sources
original_media_path = media_path
# Special handling for thumb type to ensure size requirements
if media_type == 'thumb':
# For thumb images, we need to resize to meet WeChat requirements (64KB max)
if media_path.startswith(('http://', 'https://')):
# Download remote image first
media_data, filename = await self._download_media(media_path)
# Save to temporary file
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file:
temp_file.write(media_data)
temp_path = temp_file.name
media_path = temp_path
else:
filename = os.path.basename(media_path)
temp_path = None
# Resize image to meet thumb requirements
resized_path = await self._resize_image_for_thumb(media_path)
media_path = resized_path
# Read the resized file
with open(media_path, 'rb') as f:
media_data = f.read()
# Clean up temporary files
if temp_path and os.path.exists(temp_path):
os.unlink(temp_path)
if resized_path != original_media_path and os.path.exists(resized_path):
# We'll clean up after upload
pass
else:
# Handle other media types
if media_path.startswith(('http://', 'https://')):
# Download remote image first
media_data, filename = await self._download_media(media_path)
else:
# Read local file
if not os.path.exists(media_path):
raise FileNotFoundError(f"Media file not found: {media_path}")
filename = os.path.basename(media_path)
with open(media_path, 'rb') as f:
media_data = f.read()
# Determine content type
content_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
# Upload to WeChat
try:
data = aiohttp.FormData()
if permanent and media_type == 'video' and description:
# For permanent video, add description
data.add_field('description', json.dumps(description),
content_type='application/json')
data.add_field('media', media_data, filename=filename, content_type=content_type)
async with aiohttp.ClientSession() as session:
data = aiohttp.FormData()
if permanent and media_type == 'video' and description:
# For permanent video, add description
data.add_field('description', json.dumps(description),
content_type='application/json')
data.add_field('media', media_data, filename=filename, content_type=content_type)
async with session.post(url, data=data) as response:
# First check response status
if response.status != 200:
error_text = await response.text()
raise Exception(f"HTTP error {response.status}: {error_text}")
# Get response text and try to parse as JSON regardless of content type
response_text = await response.text()
# Debug: Log first 200 chars of response for troubleshooting
logger.debug(f"Upload response (first 200 chars): {response_text[:200]}")
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
content_type = response.headers.get('content-type', '')
raise Exception(f"Failed to parse JSON response from {content_type}: {e}\nResponse content: {response_text[:500]}")
# Handle API errors
self._handle_wechat_api_error(result, "上传媒体文件")
if 'media_id' not in result:
raise Exception(f"Failed to upload media: 缺少media_id字段")
return result['media_id']
except Exception as e:
# Clean up temporary files on error
if media_type == 'thumb' and 'original_media_path' in locals() and media_path != original_media_path and os.path.exists(media_path):
os.unlink(media_path)
raise e
finally:
# Clean up temporary files
if media_type == 'thumb' and 'original_media_path' in locals() and media_path != original_media_path and os.path.exists(media_path):
os.unlink(media_path)
async def _download_media(self, url: str) -> Tuple[bytes, str]:
"""Download media from remote URL and return data with filename."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status != 200:
raise Exception(f"Failed to download media: HTTP {response.status}")
# Try to get filename from URL or Content-Disposition header
filename = "media"
if 'Content-Disposition' in response.headers:
disposition = response.headers['Content-Disposition']
if 'filename=' in disposition:
filename = disposition.split('filename=')[-1].strip('"')
else:
# Extract filename from URL
from urllib.parse import unquote, urlparse
parsed_url = urlparse(url)
path_filename = unquote(os.path.basename(parsed_url.path))
if path_filename:
filename = path_filename
# Ensure filename has extension
if not mimetypes.guess_type(filename)[0]:
content_type = response.headers.get('content-type', '')
extension = mimetypes.guess_extension(content_type)
if extension:
filename += extension
return await response.read(), filename
async def _add_draft(self, title: str, content: str, cover_media_id: str = '') -> str:
"""
Add article to WeChat draft using the new API.
Args:
title: Article title
content: HTML content
cover_media_id: Media ID for cover image
Returns:
Media ID of the created draft
"""
access_token = await self._get_access_token()
url = f"{self.base_url}/draft/add?access_token={access_token}"
# Build article object according to WeChat API requirements
article = {
"title": title,
"content": content,
"digest": "", # Auto-generated by WeChat
"author": "Xiayan MCP",
"content_source_url": "",
"show_cover_pic": 0,
"need_open_comment": 0,
"only_fans_can_comment": 0
}
# Only add cover media if provided and valid
if cover_media_id:
article["show_cover_pic"] = 1
article["thumb_media_id"] = cover_media_id
articles = [article]
data = {"articles": articles}
async with aiohttp.ClientSession() as session:
headers = {'Content-Type': 'application/json; charset=utf-8'}
# 手动序列化JSON,确保中文字符不被转义
json_data = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
async with session.post(url, data=json_data, headers=headers) as response:
# First check response status
if response.status != 200:
error_text = await response.text()
raise Exception(f"HTTP error {response.status}: {error_text}")
# Get response text and try to parse as JSON regardless of content type
response_text = await response.text()
# Debug: Log first 200 chars of response for troubleshooting
logger.debug(f"Response (first 200 chars): {response_text[:200]}")
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
content_type = response.headers.get('content-type', '')
raise Exception(f"Failed to parse JSON response from {content_type}: {e}\nResponse content: {response_text[:500]}")
# Handle API errors
self._handle_wechat_api_error(result, "添加草稿")
if 'media_id' not in result:
raise Exception(f"Failed to add draft: 缺少media_id字段")
return result['media_id']
async def upload_cover_image(self, image_path: str) -> str:
"""
Upload cover image specifically for WeChat articles.
This method ensures the image meets WeChat's requirements for article covers.
Args:
image_path: Path to image file (local or remote URL)
Returns:
Media ID of uploaded thumbnail
"""
try:
# Always upload as permanent thumb material for article covers
# This ensures the cover image is permanently available
media_id = await self.upload_permanent_material(image_path, 'thumb')
return media_id
except Exception as e:
raise Exception(f"Failed to upload cover image: {str(e)}")
async def _add_draft_with_options(self, title: str, content: str, cover_media_id: str = '',
author: str = "Xiayan MCP", need_open_comment: int = 0,
only_fans_can_comment: int = 0) -> str:
"""
Add article to WeChat draft using the new API with extended options.
Args:
title: Article title
content: HTML content
cover_media_id: Media ID for cover image
author: Article author
need_open_comment: Enable open comments (0/1)
only_fans_can_comment: Only fans can comment (0/1)
Returns:
Media ID of the created draft
"""
access_token = await self._get_access_token()
url = f"{self.base_url}/draft/add?access_token={access_token}"
# Build article object according to WeChat API requirements
article = {
"title": title,
"content": content,
"digest": "", # Auto-generated by WeChat
"author": author,
"content_source_url": "",
"show_cover_pic": 0,
"need_open_comment": need_open_comment,
"only_fans_can_comment": only_fans_can_comment
}
# Only add cover media if provided and valid
if cover_media_id:
article["show_cover_pic"] = 1
article["thumb_media_id"] = cover_media_id
articles = [article]
data = {"articles": articles}
async with aiohttp.ClientSession() as session:
headers = {'Content-Type': 'application/json; charset=utf-8'}
# 手动序列化JSON,确保中文字符不被转义
json_data = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
async with session.post(url, data=json_data, headers=headers) as response:
# First check response status
if response.status != 200:
error_text = await response.text()
raise Exception(f"HTTP error {response.status}: {error_text}")
# Get response text and try to parse as JSON regardless of content type
response_text = await response.text()
# Debug: Log first 200 chars of response for troubleshooting
logger.debug(f"Response (first 200 chars): {response_text[:200]}")
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
content_type = response.headers.get('content-type', '')
raise Exception(f"Failed to parse JSON response from {content_type}: {e}\nResponse content: {response_text[:500]}")
# Handle API errors
self._handle_wechat_api_error(result, "添加草稿")
if 'media_id' not in result:
raise Exception(f"Failed to add draft: 缺少media_id字段")
return result['media_id']
async def publish_to_draft(self, title: str, content: str, cover: str = '',
permanent_cover: bool = False, author: str = "Xiayan MCP",
need_open_comment: int = 0, only_fans_can_comment: int = 0) -> Dict[str, str]:
"""
Publish article to WeChat draft box.
Args:
title: Article title
content: Formatted HTML content
cover: Cover image URL or path
permanent_cover: Whether to upload cover as permanent material
author: Article author
need_open_comment: Enable open comments (0/1)
only_fans_can_comment: Only fans can comment (0/1)
Returns:
Dictionary with media_id and other response data
"""
try:
logger.info(f"开始发布文章到草稿箱")
logger.info(f"文章标题: {title}")
logger.info(f"是否有封面: {'是' if cover else '否'}")
logger.info(f"作者: {author}")
logger.info(f"内容长度: {len(content)} 字符")
# Upload cover image if provided
logger.info(f"开始处理封面图片...")
cover_media_id = await self._get_or_create_cover(cover, content)
logger.info(f"封面处理完成,media_id: {cover_media_id}")
# Add as draft using new API
logger.info(f"开始添加到草稿箱...")
media_id = await self._add_draft_with_options(
title, content, cover_media_id, author,
need_open_comment, only_fans_can_comment
)
logger.info(f"草稿添加成功,media_id: {media_id}")
result = self._build_publish_result(media_id, title, cover_media_id)
logger.info(f"发布结果: {result}")
return result
except Exception as e:
logger.error(f"发布到草稿箱失败: {str(e)}")
import traceback
logger.error(f"错误堆栈: {traceback.format_exc()}")
raise
async def _get_or_create_cover(self, cover: str, content: str) -> str:
"""Get cover media ID from provided cover or create one."""
if cover:
# Always upload cover as permanent thumb material
# This ensures it's available for future use and meets WeChat requirements
return await self.upload_permanent_material(cover, 'thumb')
elif self._extract_first_image(content):
# Use first image in content as cover if no cover specified
first_image = self._extract_first_image(content)
return await self.upload_permanent_material(first_image, 'thumb')
else:
# Create a default cover if no image provided (WeChat API requires thumb_media_id)
return await self._create_default_cover()
def _build_publish_result(self, media_id: str, title: str, cover_media_id: str) -> Dict[str, str]:
"""Build publish result dictionary."""
return {
'media_id': media_id,
'title': title,
'status': 'success',
'cover_media_id': cover_media_id
}
def _extract_first_image(self, html_content: str) -> Optional[str]:
"""Extract first image URL from HTML content."""
import re
img_pattern = r'<img[^>]+src="([^"]+)"'
match = re.search(img_pattern, html_content)
return match.group(1) if match else None
async def _create_default_cover(self) -> str:
"""
Create a default cover image for articles without images.
Returns:
Media ID of the created default cover
"""
try:
# Create a simple solid color image as default cover
from PIL import Image, ImageDraw, ImageFont
import tempfile
import os
# Create a 400x400 image with a gradient background
width, height = 400, 400
img = Image.new('RGB', (width, height), color=(240, 240, 240))
draw = ImageDraw.Draw(img)
# Create gradient effect
for y in range(height):
color_value = int(240 - (y / height) * 50) # Light gradient
draw.line([(0, y), (width, y)], fill=(color_value, color_value, color_value + 20))
# Add text to the image
try:
# Try to use a font, fall back to default if not available
font = ImageFont.truetype("arial.ttf", 40)
except:
font = ImageFont.load_default()
# Add text
text = "文颜书评"
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# Center the text
x = (width - text_width) // 2
y = (height - text_height) // 2
draw.text((x, y), text, fill=(51, 51, 51), font=font)
# Save to temporary file
temp_file = tempfile.NamedTemporaryFile(suffix='.jpg', delete=False)
temp_path = temp_file.name
temp_file.close()
img.save(temp_path, 'JPEG', quality=85, optimize=True)
# Upload as permanent thumb material
media_id = await self.upload_permanent_material(temp_path, 'thumb')
# Clean up temporary file
os.unlink(temp_path)
return media_id
except ImportError:
# If PIL is not available, use a simple 1x1 pixel image
import tempfile
import os
# Create minimal JPEG file (1x1 pixel gray image)
temp_file = tempfile.NamedTemporaryFile(suffix='.jpg', delete=False)
temp_path = temp_file.name
# Minimal JPEG header for 1x1 gray pixel
jpeg_data = bytes.fromhex(
'ffd8ffe000104a4649460001010100h00h0000ffdb00c308090b0c090c0b0a0c0d10'
'0e0c0d1114151413131418191e1d1a1a1d1e1d1a1c1c2024262e2720222c231c1c28'
'37292c3031343434343434343434343434343434343434343434343434343434343434'
'3434343434343434343434343434343434343434343434343434343434343434343434'
'3434343434343434343434343434343434343434343434343434343434343434343434'
'34ffc00011080001000103012200021101031101ffc40101000101010101000000000000'
'0000000000000000000affc4001b0100010101010101010100000000000000000000000000'
'00000000000003ffd9'
)
with open(temp_path, 'wb') as f:
f.write(jpeg_data)
# Upload as permanent thumb material
media_id = await self.upload_permanent_material(temp_path, 'thumb')
# Clean up temporary file
os.unlink(temp_path)
return media_id
except Exception as e:
# If all else fails, raise an exception
raise Exception(f"Failed to create default cover: {str(e)}")
async def get_draft_list(self) -> Dict:
"""Get list of drafts from WeChat."""
import html
import logging
# 设置日志
logger = logging.getLogger(__name__)
access_token = await self._get_access_token()
url = f"{self.base_url}/draft/batchget?access_token={access_token}"
data = {
"offset": 0,
"count": 20,
"no_content": 0
}
async with aiohttp.ClientSession() as session:
headers = {'Content-Type': 'application/json; charset=utf-8'}
# 手动序列化JSON,确保中文字符不被转义
json_data = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
async with session.post(url, data=json_data, headers=headers) as response:
# First check response status
if response.status != 200:
error_text = await response.text()
logger.error(f"HTTP error {response.status}: {error_text}")
raise Exception(f"HTTP error {response.status}: {error_text}")
# Get response text and try to parse as JSON regardless of content type
response_text = await response.text()
# Debug: Print first 200 chars of response for troubleshooting
logger.debug(f"Draft list response (first 200 chars): {response_text[:200]}")
try:
result = json.loads(response_text)
# Fix content encoding issues - 更谨慎的策略
if 'item' in result:
logger.info(f"Found {len(result['item'])} drafts")
for i, item in enumerate(result['item']):
logger.debug(f"Processing draft {i+1}: {item.get('media_id', 'unknown')}")
if 'content' in item and 'news_item' in item['content']:
for news_item in item['content']['news_item']:
if 'content' in news_item:
# 检查是否真的需要修复
original_content = news_item['content']
if self._needs_encoding_fix(original_content):
try:
# 谨慎修复编码问题
fixed_content = self._fix_encoding_carefully(original_content)
if fixed_content != original_content:
news_item['content'] = fixed_content
logger.debug(f"已修复草稿 {i+1} 的编码问题")
else:
logger.debug(f"草稿 {i+1} 编码正常,无需修复")
except Exception as fix_error:
logger.warning(f"修复草稿 {i+1} 编码时出错: {fix_error}")
# 保持原始内容如果修复失败
pass
logger.info("Content encoding fix completed")
return result
except json.JSONDecodeError as e:
content_type = response.headers.get('content-type', '')
logger.error(f"Failed to parse JSON response from {content_type}: {e}")
logger.error(f"Response content: {response_text[:500]}")
raise Exception(f"Failed to parse JSON response from {content_type}: {e}\nResponse content: {response_text[:500]}")
def _fix_common_encoding_issues(self, content):
"""修复常见编码问题"""
# 修复常见的编码错误
issues = [
('\\x3c', '<'), # < 被错误编码
('\\x3e', '>'), # > 被错误编码
('\\x22', '"'), # " 被错误编码
('\\x27', "'"), # ' 被错误编码
('\\x5c', '\\'), # \ 被错误编码
]
for wrong, right in issues:
content = content.replace(wrong, right)
return content
def _needs_encoding_fix(self, content):
"""检测是否需要编码修复"""
from ..utils.encoding import enconding_utils
return enconding_utils.needs_encoding_fix(content)
def _fix_encoding_carefully(self, content):
"""谨慎修复编码问题"""
from ..utils.encoding import enconding_utils
return enconding_utils.fix_encoding(content)
# ========== Extended Media Upload Methods ==========
async def upload_temp_media(self, media_path: str, media_type: str = 'image') -> str:
"""
Upload temporary media file to WeChat server (valid for 3 days).
Args:
media_path: Path to media file (local or remote)
media_type: Type of media ('image', 'voice', 'video', 'thumb')
Returns:
Media ID from WeChat server
"""
return await self._upload_media(media_path, media_type, permanent=False)
async def upload_permanent_material(self, media_path: str, media_type: str = 'image',
description: Optional[Dict] = None) -> str:
"""
Upload permanent material to WeChat server.
Args:
media_path: Path to media file (local or remote)
media_type: Type of media ('image', 'voice', 'video', 'thumb')
description: Description for video material (required for video)
Format: {"title": "Video Title", "introduction": "Video Intro"}
Returns:
Media ID from WeChat server
"""
if media_type == 'video' and not description:
raise ValueError("Description is required for permanent video material")
return await self._upload_media(media_path, media_type, permanent=True, description=description)
async def upload_news_material(self, articles: List[Dict]) -> str:
"""
Upload permanent news material (articles) to WeChat server.
Args:
articles: List of article dictionaries with keys:
- title: Article title
- content: HTML content
- digest: Article summary (optional)
- author: Article author (optional)
- source_url: Source URL (optional)
- thumb_media_id: Cover image media ID
- show_cover_pic: Show cover pic (0/1)
- need_open_comment: Enable comments (0/1)
- only_fans_can_comment: Only fans can comment (0/1)
Returns:
Media ID from WeChat server
"""
access_token = await self._get_access_token()
url = f"{self.base_url}/material/add_news?access_token={access_token}"
data = {"articles": articles}
async with aiohttp.ClientSession() as session:
headers = {'Content-Type': 'application/json; charset=utf-8'}
# 手动序列化JSON,确保中文字符不被转义
json_data = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
async with session.post(url, data=json_data, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"HTTP error {response.status}: {error_text}")
response_text = await response.text()
# Debug: Log first 200 chars of response for troubleshooting
logger.debug(f"News material upload response (first 200 chars): {response_text[:200]}")
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
raise Exception(f"Failed to parse JSON response: {e}\nResponse: {response_text[:500]}")
# Handle API errors
self._handle_wechat_api_error(result, "上传图文素材")
if 'media_id' not in result:
raise Exception(f"Failed to upload news material: 缺少media_id字段")
return result['media_id']
async def upload_image_for_news(self, image_path: str) -> str:
"""
Upload image specifically for use in news articles.
This API returns a URL that can be used directly in news content.
Args:
image_path: Path to image file (local or remote)
Returns:
Image URL that can be used in news content
"""
access_token = await self._get_access_token()
url = f"{self.base_url}/media/uploadimg?access_token={access_token}"
# Handle different media sources
if image_path.startswith(('http://', 'https://')):
# Download remote image first
image_data, filename = await self._download_media(image_path)
else:
# Read local file
if not os.path.exists(image_path):
raise FileNotFoundError(f"Image file not found: {image_path}")
filename = os.path.basename(image_path)
with open(image_path, 'rb') as f:
image_data = f.read()
# Determine content type
content_type = mimetypes.guess_type(filename)[0] or 'image/jpeg'
# Upload to WeChat
async with aiohttp.ClientSession() as session:
data = aiohttp.FormData()
data.add_field('media', image_data, filename=filename, content_type=content_type)
async with session.post(url, data=data) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"HTTP error {response.status}: {error_text}")
response_text = await response.text()
# Debug: Log first 200 chars of response for troubleshooting
logger.debug(f"Image upload response (first 200 chars): {response_text[:200]}")
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
raise Exception(f"Failed to parse JSON response: {e}\nResponse: {response_text[:500]}")
# Handle API errors
self._handle_wechat_api_error(result, "上传新闻图片")
if 'url' not in result:
raise Exception(f"Failed to upload image for news: 缺少url字段")
return result['url']
async def get_media_list(self, media_type: str, permanent: bool = False,
offset: int = 0, count: int = 20) -> Dict:
"""
Get list of media materials from WeChat server.
Args:
media_type: Type of media ('image', 'voice', 'video', 'news')
permanent: Whether to get permanent materials (True) or temporary (False)
offset: Starting offset for pagination
count: Number of items to retrieve
Returns:
Dictionary with material list and total count
"""
access_token = await self._get_access_token()
if permanent:
url = f"{self.base_url}/material/batchget_material?access_token={access_token}"
data = {
"type": media_type,
"offset": offset,
"count": count
}
else:
url = f"{self.base_url}/material/get_materialcount?access_token={access_token}"
# For temporary materials, we can only get counts, not list
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"HTTP error {response.status}: {error_text}")
return await response.json()
# For permanent materials, get the actual list
async with aiohttp.ClientSession() as session:
headers = {'Content-Type': 'application/json; charset=utf-8'}
# 手动序列化JSON,确保中文字符不被转义
json_data = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
async with session.post(url, data=json_data, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"HTTP error {response.status}: {error_text}")
return await response.json()
async def delete_permanent_material(self, media_id: str) -> bool:
"""
Delete permanent material from WeChat server.
Args:
media_id: Media ID of the material to delete
Returns:
True if deletion was successful
"""
access_token = await self._get_access_token()
url = f"{self.base_url}/material/del_material?access_token={access_token}"
data = {"media_id": media_id}
async with aiohttp.ClientSession() as session:
headers = {'Content-Type': 'application/json; charset=utf-8'}
# 手动序列化JSON,确保中文字符不被转义
json_data = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
async with session.post(url, data=json_data, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"HTTP error {response.status}: {error_text}")
response_text = await response.text()
# Debug: Log first 200 chars of response for troubleshooting
logger.debug(f"Delete response (first 200 chars): {response_text[:200]}")
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
raise Exception(f"Failed to parse JSON response: {e}\nResponse: {response_text[:500]}")
# Handle API errors
self._handle_wechat_api_error(result, "删除永久素材")
return result.get('errcode') == 0