WeCom Bot MCP Server
by loonghao
Verified
- wecom-bot-mcp-server
- src
- wecom_bot_mcp_server
"""Image handling functionality for WeCom Bot MCP Server."""
# Import built-in modules
import os
from pathlib import Path
import tempfile
from typing import Any
# Import third-party modules
from PIL import Image
import aiohttp
from loguru import logger
from mcp.server.fastmcp import Context
from notify_bridge import NotifyBridge
# Import local modules
from wecom_bot_mcp_server.app import mcp
from wecom_bot_mcp_server.errors import ErrorCode
from wecom_bot_mcp_server.errors import WeComError
from wecom_bot_mcp_server.utils import get_webhook_url
async def download_image(url: str, ctx: Context | None = None) -> Path:
"""Download image from URL with retry mechanism.
Args:
url: URL to download image from
ctx: FastMCP context
Returns:
Path: Path to downloaded image
Raises:
WeComError: If download fails or response is not an image
"""
if ctx:
await ctx.report_progress(0.2)
await ctx.info(f"Downloading image from {url}")
try:
# Create a temporary file with the correct extension
temp_dir = Path(tempfile.gettempdir()) / "wecom_images"
os.makedirs(temp_dir, exist_ok=True)
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status != 200:
error_msg = f"Failed to download image: HTTP {response.status}"
if ctx:
await ctx.error(error_msg)
raise WeComError(
error_msg,
ErrorCode.NETWORK_ERROR,
)
content_type = response.headers.get("Content-Type", "")
if not content_type.startswith("image/"):
error_msg = f"Invalid content type: {content_type}"
if ctx:
await ctx.error(error_msg)
raise WeComError(error_msg, ErrorCode.FILE_ERROR)
# Update file extension based on content type
ext = content_type.split("/")[1]
final_file = temp_dir / f"image_{hash(url)}.{ext}"
# Write the content to the file
with open(final_file, "wb") as f:
content = await response.read()
f.write(content)
return final_file
except aiohttp.ClientError as e:
error_msg = f"Failed to download image: {e!s}"
logger.error(error_msg)
if ctx:
await ctx.error(error_msg)
raise WeComError(error_msg, ErrorCode.NETWORK_ERROR) from e
@mcp.tool()
async def send_wecom_image(
image_path: str | Path,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Send image to WeCom.
Args:
image_path: Path to image file or URL
ctx: FastMCP context
Returns:
dict: Response containing status and message
Raises:
WeComError: If image is not found or API call fails.
"""
if ctx:
await ctx.report_progress(0.1)
await ctx.info(f"Processing image: {image_path}")
try:
# Process and validate image
image_path = await _process_image_path(image_path, ctx)
# Get webhook URL
base_url = await _get_webhook_url(ctx)
# Send image to WeCom
if ctx:
await ctx.report_progress(0.5)
await ctx.info("Sending image via notify-bridge...")
response = await _send_image_to_wecom(image_path, base_url)
# Process response
return await _process_image_response(response, image_path, ctx)
except Exception as e:
error_msg = f"Error sending image: {e!s}"
logger.error(error_msg)
if ctx:
await ctx.error(error_msg)
raise WeComError(error_msg, ErrorCode.NETWORK_ERROR) from e
async def _process_image_path(image_path: str | Path, ctx: Context | None = None) -> Path:
"""Process and validate image path.
Args:
image_path: Path to image file or URL
ctx: FastMCP context
Returns:
Path: Validated image path
Raises:
WeComError: If image is not found or invalid
"""
# Handle URL
if isinstance(image_path, str) and image_path.startswith(("http://", "https://")):
try:
image_path = await download_image(image_path, ctx)
except WeComError as e:
if ctx:
await ctx.error(str(e))
raise
# Convert to Path object if string
if isinstance(image_path, str):
image_path = Path(image_path)
# Check if file exists
if not image_path.exists():
error_msg = f"Image file not found: {image_path}"
logger.error(error_msg)
if ctx:
await ctx.error(error_msg)
raise WeComError(error_msg, ErrorCode.FILE_ERROR)
# Validate image format
try:
Image.open(image_path)
except Exception as e:
error_msg = f"Invalid image format: {e!s}"
logger.error(error_msg)
if ctx:
await ctx.error(error_msg)
raise WeComError(error_msg, ErrorCode.FILE_ERROR) from e
return image_path
async def _get_webhook_url(ctx: Context | None = None) -> str:
"""Get webhook URL.
Args:
ctx: FastMCP context
Returns:
str: Webhook URL
Raises:
WeComError: If webhook URL is not found
"""
try:
return get_webhook_url()
except WeComError as e:
if ctx:
await ctx.error(str(e))
raise
async def _send_image_to_wecom(image_path: Path, base_url: str) -> Any:
"""Send image to WeCom using NotifyBridge.
Args:
image_path: Path to image
base_url: Webhook URL
Returns:
Any: Response from NotifyBridge
"""
logger.info(f"Processing image: {image_path}")
# Use NotifyBridge to send image directly
async with NotifyBridge() as nb:
response = await nb.send_async(
"wecom",
{
"base_url": base_url,
"msg_type": "image",
"image": str(image_path.absolute()),
},
)
return response
async def _process_image_response(response: Any, image_path: Path, ctx: Context | None = None) -> dict[str, Any]:
"""Process response from WeCom API.
Args:
response: Response from NotifyBridge
image_path: Path to image
ctx: FastMCP context
Returns:
dict: Response containing status and message
Raises:
WeComError: If API call fails
"""
# Check response
if not getattr(response, "success", False):
error_msg = f"Failed to send image: {response}"
logger.error(error_msg)
if ctx:
await ctx.error(error_msg)
raise WeComError(error_msg, ErrorCode.API_FAILURE)
# Check WeChat API response
data = getattr(response, "data", {})
if isinstance(data, dict) and data.get("errcode", -1) != 0:
error_msg = f"WeChat API error: {data.get('errmsg', 'Unknown error')}"
logger.error(error_msg)
if ctx:
await ctx.error(error_msg)
raise WeComError(error_msg, ErrorCode.API_FAILURE)
success_msg = "Image sent successfully"
logger.info(success_msg)
if ctx:
await ctx.report_progress(1.0)
await ctx.info(success_msg)
return {
"status": "success",
"message": success_msg,
"image_path": str(image_path),
}