mcp_server.py•8.22 kB
#!/usr/bin/env python
"""MCP server for Xiaohongshu API."""
import asyncio
import json
from typing import Dict, Optional
# MCP Server Imports
from mcp import types as mcp_types
from mcp.server.lowlevel import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.server.stdio
from shared.client_manager import ClientManager
from shared.error_handler import ErrorHandler, XhsError
from shared.types import (
# 请求类型
CreateClientRequest,
McpNoteRequest,
McpSearchNotesRequest,
McpSearchUsersRequest,
McpUserRequest,
McpFeedRequest,
# 响应类型
ResponseType,
)
# --- MCP Server Setup ---
print("Creating MCP Server instance...")
app = Server("xhs-mcp-server")
# 获取客户端管理器实例
client_manager = ClientManager()
# --- Tool Schema 定义 ---
# 1. 创建客户端
create_client_tool_schema = mcp_types.Tool(
name="create_xhs_client",
description="创建一个新的小红书客户端实例,需要提供 Cookie 进行认证。返回客户端 ID。",
inputSchema=CreateClientRequest.schema()
)
# 2. 笔记相关工具
get_note_tool_schema = mcp_types.Tool(
name="get_xhs_note",
description="通过笔记 ID 获取小红书笔记的详细信息。",
inputSchema=McpNoteRequest.schema()
)
get_note_html_tool_schema = mcp_types.Tool(
name="get_xhs_note_html",
description="通过笔记 ID 从 HTML 页面获取小红书笔记的详细信息。",
inputSchema=McpNoteRequest.schema()
)
# 3. 搜索相关工具
search_notes_tool_schema = mcp_types.Tool(
name="search_xhs_notes",
description="根据关键词搜索小红书笔记。",
inputSchema=McpSearchNotesRequest.schema()
)
search_users_tool_schema = mcp_types.Tool(
name="search_xhs_users",
description="根据关键词搜索小红书用户。",
inputSchema=McpSearchUsersRequest.schema()
)
# 4. 用户相关工具
get_user_info_tool_schema = mcp_types.Tool(
name="get_xhs_user_info",
description="通过用户 ID 获取小红书用户的信息。",
inputSchema=McpUserRequest.schema()
)
get_user_notes_tool_schema = mcp_types.Tool(
name="get_xhs_user_notes",
description="获取指定用户的笔记列表。",
inputSchema=McpUserRequest.schema()
)
# 5. Feed 相关工具
feed_categories_tool_schema = mcp_types.Tool(
name="get_xhs_feed_categories",
description="获取小红书推荐流的分类列表。",
inputSchema={"type": "object", "properties": {"client_id": {"type": "string"}}, "required": ["client_id"]}
)
get_feed_tool_schema = mcp_types.Tool(
name="get_xhs_feed",
description="获取指定类型的推荐流内容。",
inputSchema=McpFeedRequest.schema()
)
# --- MCP Handlers ---
@app.list_tools()
async def list_tools() -> list[mcp_types.Tool]:
"""MCP handler to list available tools."""
print("MCP Server: Received list_tools request.")
available_tools = [
create_client_tool_schema,
get_note_tool_schema,
get_note_html_tool_schema,
search_notes_tool_schema,
search_users_tool_schema,
get_user_info_tool_schema,
get_user_notes_tool_schema,
feed_categories_tool_schema,
get_feed_tool_schema,
]
print(f"MCP Server: Advertising {len(available_tools)} tools.")
return available_tools
@app.call_tool()
async def call_tool(
name: str, arguments: dict
) -> list[mcp_types.TextContent | mcp_types.ImageContent | mcp_types.EmbeddedResource]:
"""MCP handler to execute a tool call."""
print(f"MCP Server: Received call_tool request for '{name}' with args: {arguments}")
try:
if name == "create_xhs_client":
# 创建客户端
request = CreateClientRequest(**arguments)
client_id = await client_manager.create_client(request)
response = ResponseType(result={"client_id": client_id})
else:
# --- 所有其他工具都需要获取客户端实例 ---
client_id = arguments.get("client_id")
if not client_id:
raise XhsError("必须提供 client_id 参数", status_code=400)
client = await client_manager.get_client(client_id)
payload = {k: v for k, v in arguments.items() if k != "client_id"}
# --- 笔记相关工具 ---
if name == "get_xhs_note":
request = McpNoteRequest(client_id=client_id, **payload)
result = await client.get_note(request)
response = ResponseType(result=result)
elif name == "get_xhs_note_html":
request = McpNoteRequest(client_id=client_id, **payload)
result = await client.get_note_html(request)
response = ResponseType(result=result)
# --- 搜索相关工具 ---
elif name == "search_xhs_notes":
request = McpSearchNotesRequest(client_id=client_id, **payload)
result = await client.search_notes(request)
response = ResponseType(result=result)
elif name == "search_xhs_users":
request = McpSearchUsersRequest(client_id=client_id, **payload)
result = await client.search_users(request)
response = ResponseType(result=result)
# --- 用户相关工具 ---
elif name == "get_xhs_user_info":
request = McpUserRequest(client_id=client_id, **payload)
result = await client.get_user_info(request)
response = ResponseType(result=result)
elif name == "get_xhs_user_notes":
request = McpUserRequest(client_id=client_id, **payload)
result = await client.get_user_notes(request)
response = ResponseType(result=result)
# --- Feed 相关工具 ---
elif name == "get_xhs_feed_categories":
categories = await client.get_feed_categories()
response = ResponseType(result={"categories": categories})
elif name == "get_xhs_feed":
request = McpFeedRequest(client_id=client_id, **payload)
result = await client.get_feed(request.feed_type)
response = ResponseType(result=result)
else:
raise XhsError(f"未知的工具名称: {name}", status_code=400)
return [mcp_types.TextContent(type="text", text=response.json(ensure_ascii=False))]
except XhsError as e:
# 已经是统一的错误格式了,直接转换
return ErrorHandler.to_mcp_response(e)
except Exception as e:
# 其他异常包装为 XhsError
wrapped_error = XhsError(str(e))
return ErrorHandler.to_mcp_response(wrapped_error)
# --- MCP Server Runner ---
async def run_server():
"""Runs the MCP server over standard input/output."""
# 启动客户端管理器
await client_manager.start()
try:
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
print("MCP Server starting handshake...")
await app.run(
read_stream,
write_stream,
InitializationOptions(
server_name=app.name,
server_version="1.0.0",
capabilities=app.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
print("MCP Server run loop finished.")
finally:
# 确保关闭客户端管理器
await client_manager.stop()
if __name__ == "__main__":
print("Launching MCP Server for Xiaohongshu API")
try:
asyncio.run(run_server())
except KeyboardInterrupt:
print("\nMCP Server stopped by user.")
except Exception as e:
print(f"MCP Server encountered an error: {e}")
finally:
print("MCP Server process exiting.")