"""
MCP 服务器实现
注册所有 MCP 工具并处理请求
"""
import json
from typing import Any
from mcp.server import Server
from mcp.types import Tool, TextContent
from loguru import logger
from service import XiaohongshuService
class MCPServer:
"""MCP 服务器"""
def __init__(self, service: XiaohongshuService):
self.service = service
self.server = Server("xiaohongshu-mcp")
self.tool_handlers = {} # 存储工具处理函数
self._register_tools()
def _format_feeds_response(self, feeds: list, keyword: str = None, format_type: str = "text") -> str:
"""
格式化 Feed 列表响应
Args:
feeds: Feed 列表
keyword: 搜索关键词(可选)
format_type: 返回格式,"text" 或 "json"
"""
if format_type == "json":
# 返回 JSON 格式
feeds_data = []
for feed in feeds[:10]:
feed_dict = {
"id": feed.id,
"xsec_token": feed.xsec_token,
"title": feed.note_card.display_title,
"author": feed.note_card.user.display_name,
"author_id": feed.note_card.user.user_id,
"type": feed.note_card.type,
"cover_url": feed.note_card.cover.display_url,
"interact_info": {
"liked_count": feed.note_card.interact_info.liked_count,
"comment_count": feed.note_card.interact_info.comment_count,
"collected_count": feed.note_card.interact_info.collected_count,
}
}
# 如果是视频,添加视频信息
if feed.note_card.video:
feed_dict["video"] = {
"duration": feed.note_card.video.capa.duration
}
feeds_data.append(feed_dict)
result = {
"total": len(feeds),
"displayed": len(feeds_data),
"feeds": feeds_data
}
if keyword:
result["keyword"] = keyword
return json.dumps(result, ensure_ascii=False, indent=2)
else:
# 返回文本格式
if keyword:
result = f"搜索 '{keyword}' 找到 {len(feeds)} 条结果:\n\n"
else:
result = f"获取到 {len(feeds)} 条 Feed:\n\n"
# 显示前 10 条,包含详细信息
for i, feed in enumerate(feeds[:10], 1):
result += f"{i}. 【{feed.note_card.display_title}】\n"
result += f" 作者: {feed.note_card.user.display_name}\n"
result += f" 类型: {feed.note_card.type}\n"
result += f" 点赞: {feed.note_card.interact_info.liked_count} | "
result += f"评论: {feed.note_card.interact_info.comment_count} | "
result += f"收藏: {feed.note_card.interact_info.collected_count}\n"
result += f" Feed ID: {feed.id}\n"
result += f" Token: {feed.xsec_token}\n"
result += f" 封面: {feed.note_card.cover.display_url}\n"
# 如果是视频,显示视频时长
if feed.note_card.video:
duration = feed.note_card.video.capa.duration
result += f" 视频时长: {duration}秒\n"
result += "\n"
result += f"\n💡 提示: 使用 get_feed_detail 工具查看详情,需要提供 feed_id 和 xsec_token"
return result
def _register_tools(self) -> None:
"""注册所有 MCP 工具"""
logger.info("Registering MCP tools...")
# 定义所有工具处理函数并存储到字典中
# 1. 检查登录状态
async def check_login_status(arguments: dict[str, Any]) -> list[TextContent]:
"""检查登录状态"""
try:
is_logged_in = await self.service.check_login_status()
return [TextContent(
type="text",
text=f"登录状态: {'已登录' if is_logged_in else '未登录'}"
)]
except Exception as e:
logger.error(f"Check login status failed: {e}")
return [TextContent(type="text", text=f"检查登录状态失败: {str(e)}")]
# 2. 获取登录二维码
async def get_login_qrcode(arguments: dict[str, Any]) -> list[TextContent]:
"""获取登录二维码"""
try:
result = await self.service.get_login_qrcode()
if result["logged_in"]:
return [TextContent(type="text", text="已登录,无需扫码")]
return [TextContent(
type="text",
text=f"请扫描二维码登录: {result['qr_url']}"
)]
except Exception as e:
logger.error(f"Get login qrcode failed: {e}")
return [TextContent(type="text", text=f"获取二维码失败: {str(e)}")]
# 3. 删除 cookies
async def delete_cookies(arguments: dict[str, Any]) -> list[TextContent]:
"""删除 cookies"""
try:
await self.service.delete_cookies()
return [TextContent(type="text", text="Cookies 已删除")]
except Exception as e:
logger.error(f"Delete cookies failed: {e}")
return [TextContent(type="text", text=f"删除 cookies 失败: {str(e)}")]
# 4. 获取 Feed 列表
async def list_feeds(arguments: dict[str, Any]) -> list[TextContent]:
"""获取 Feed 列表"""
try:
format_type = arguments.get("format", "text") # 支持 "text" 或 "json"
feeds = await self.service.list_feeds()
result = self._format_feeds_response(feeds, format_type=format_type)
return [TextContent(type="text", text=result)]
except Exception as e:
logger.error(f"List feeds failed: {e}")
return [TextContent(type="text", text=f"获取 Feed 列表失败: {str(e)}")]
# 5. 搜索内容
async def search_feeds(arguments: dict[str, Any]) -> list[TextContent]:
"""搜索内容"""
try:
keyword = arguments.get("keyword", "")
filters = arguments.get("filters")
format_type = arguments.get("format", "text") # 支持 "text" 或 "json"
feeds = await self.service.search_feeds(keyword, filters)
result = self._format_feeds_response(feeds, keyword=keyword, format_type=format_type)
return [TextContent(type="text", text=result)]
except Exception as e:
logger.error(f"Search feeds failed: {e}")
return [TextContent(type="text", text=f"搜索失败: {str(e)}")]
# 6. 获取 Feed 详情
async def get_feed_detail(arguments: dict[str, Any]) -> list[TextContent]:
"""获取 Feed 详情"""
try:
feed_id = arguments.get("feed_id", "")
xsec_token = arguments.get("xsec_token", "")
load_all_comments = arguments.get("load_all_comments", False)
detail = await self.service.get_feed_detail(
feed_id, xsec_token, load_all_comments
)
result = f"标题: {detail.note.title}\n"
result += f"内容: {detail.note.desc}\n"
result += f"点赞数: {detail.note.interact_info.liked_count}\n"
result += f"评论数: {len(detail.comments.list)}\n"
return [TextContent(type="text", text=result)]
except Exception as e:
logger.error(f"Get feed detail failed: {e}")
return [TextContent(type="text", text=f"获取详情失败: {str(e)}")]
# 7. 发表评论
async def post_comment_to_feed(arguments: dict[str, Any]) -> list[TextContent]:
"""发表评论"""
try:
feed_id = arguments.get("feed_id", "")
xsec_token = arguments.get("xsec_token", "")
content = arguments.get("content", "")
success = await self.service.post_comment_to_feed(
feed_id, xsec_token, content
)
if success:
return [TextContent(type="text", text="评论发表成功")]
return [TextContent(type="text", text="评论发表失败")]
except Exception as e:
logger.error(f"Post comment failed: {e}")
return [TextContent(type="text", text=f"发表评论失败: {str(e)}")]
# 8. 回复评论
async def reply_comment_in_feed(arguments: dict[str, Any]) -> list[TextContent]:
"""回复评论"""
try:
feed_id = arguments.get("feed_id", "")
xsec_token = arguments.get("xsec_token", "")
comment_id = arguments.get("comment_id", "")
content = arguments.get("content", "")
success = await self.service.reply_comment_to_feed(
feed_id, xsec_token, comment_id, content
)
if success:
return [TextContent(type="text", text="回复成功")]
return [TextContent(type="text", text="回复失败")]
except Exception as e:
logger.error(f"Reply comment failed: {e}")
return [TextContent(type="text", text=f"回复评论失败: {str(e)}")]
# 9. 点赞
async def like_feed(arguments: dict[str, Any]) -> list[TextContent]:
"""点赞"""
try:
feed_id = arguments.get("feed_id", "")
xsec_token = arguments.get("xsec_token", "")
action = arguments.get("action", "like") # like or unlike
if action == "like":
success = await self.service.like_feed(feed_id, xsec_token)
else:
success = await self.service.unlike_feed(feed_id, xsec_token)
if success:
return [TextContent(type="text", text=f"{'点赞' if action == 'like' else '取消点赞'}成功")]
return [TextContent(type="text", text=f"{'点赞' if action == 'like' else '取消点赞'}失败")]
except Exception as e:
logger.error(f"Like feed failed: {e}")
return [TextContent(type="text", text=f"操作失败: {str(e)}")]
# 10. 收藏
async def favorite_feed(arguments: dict[str, Any]) -> list[TextContent]:
"""收藏"""
try:
feed_id = arguments.get("feed_id", "")
xsec_token = arguments.get("xsec_token", "")
action = arguments.get("action", "favorite") # favorite or unfavorite
if action == "favorite":
success = await self.service.favorite_feed(feed_id, xsec_token)
else:
success = await self.service.unfavorite_feed(feed_id, xsec_token)
if success:
return [TextContent(type="text", text=f"{'收藏' if action == 'favorite' else '取消收藏'}成功")]
return [TextContent(type="text", text=f"{'收藏' if action == 'favorite' else '取消收藏'}失败")]
except Exception as e:
logger.error(f"Favorite feed failed: {e}")
return [TextContent(type="text", text=f"操作失败: {str(e)}")]
# 11. 获取用户主页
async def user_profile(arguments: dict[str, Any]) -> list[TextContent]:
"""获取用户主页"""
try:
user_id = arguments.get("user_id", "")
xsec_token = arguments.get("xsec_token", "")
profile = await self.service.user_profile(user_id, xsec_token)
result = f"昵称: {profile.user_basic_info.nickname}\n"
result += f"简介: {profile.user_basic_info.desc}\n"
result += f"笔记数: {len(profile.feeds)}\n"
return [TextContent(type="text", text=result)]
except Exception as e:
logger.error(f"Get user profile failed: {e}")
return [TextContent(type="text", text=f"获取用户主页失败: {str(e)}")]
# 12. 发布图文
async def publish_content(arguments: dict[str, Any]) -> list[TextContent]:
"""发布图文内容"""
try:
title = arguments.get("title", "")
content = arguments.get("content", "")
images = arguments.get("images", [])
tags = arguments.get("tags")
success = await self.service.publish_content(title, content, images, tags)
if success:
return [TextContent(type="text", text="发布成功")]
return [TextContent(type="text", text="发布失败")]
except Exception as e:
logger.error(f"Publish content failed: {e}")
return [TextContent(type="text", text=f"发布失败: {str(e)}")]
# 13. 发布视频
async def publish_with_video(arguments: dict[str, Any]) -> list[TextContent]:
"""发布视频内容"""
try:
title = arguments.get("title", "")
content = arguments.get("content", "")
video_path = arguments.get("video_path", "")
cover_path = arguments.get("cover_path")
tags = arguments.get("tags")
success = await self.service.publish_video(
title, content, video_path, cover_path, tags
)
if success:
return [TextContent(type="text", text="视频发布成功")]
return [TextContent(type="text", text="视频发布失败")]
except Exception as e:
logger.error(f"Publish video failed: {e}")
return [TextContent(type="text", text=f"发布视频失败: {str(e)}")]
# 将所有工具处理函数存储到字典中
self.tool_handlers = {
"check_login_status": check_login_status,
"get_login_qrcode": get_login_qrcode,
"delete_cookies": delete_cookies,
"list_feeds": list_feeds,
"search_feeds": search_feeds,
"get_feed_detail": get_feed_detail,
"post_comment_to_feed": post_comment_to_feed,
"reply_comment_in_feed": reply_comment_in_feed,
"like_feed": like_feed,
"favorite_feed": favorite_feed,
"user_profile": user_profile,
"publish_content": publish_content,
"publish_with_video": publish_with_video,
}
# 注册统一的 call_tool 处理器
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]:
"""统一的工具调用处理器"""
logger.info(f"Handling tool call: {name}")
if name not in self.tool_handlers:
logger.error(f"Tool not found: {name}")
return [TextContent(type="text", text=f"工具不存在: {name}")]
handler = self.tool_handlers[name]
return await handler(arguments)
logger.info("All 13 MCP tools registered successfully")
def get_server(self) -> Server:
"""获取 MCP 服务器实例"""
return self.server