"""
应用服务器
整合 FastAPI 和 MCP 服务器
"""
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from loguru import logger
from mcp.server.sse import SseServerTransport
from starlette.types import Send
import json
import asyncio
from browser.browser import BrowserManager, create_browser
from service import XiaohongshuService
from mcp_server import MCPServer
from routes import create_router
from configs.settings import settings
class AppServer:
"""应用服务器"""
def __init__(self):
self.browser_manager: BrowserManager | None = None
self.service: XiaohongshuService | None = None
self.mcp_server: MCPServer | None = None
self.app: FastAPI | None = None
async def initialize(self) -> None:
"""初始化应用"""
logger.info("Initializing application...")
# 确保必要的目录存在
settings.ensure_cookies_dir()
settings.ensure_images_dir()
# 创建浏览器管理器
logger.info("Creating browser manager...")
self.browser_manager = await create_browser(
headless=settings.headless,
bin_path=settings.browser_bin_path
)
# 创建服务
logger.info("Creating xiaohongshu service...")
self.service = XiaohongshuService(self.browser_manager)
# 创建 MCP 服务器
logger.info("Creating MCP server...")
self.mcp_server = MCPServer(self.service)
# 创建 FastAPI 应用
logger.info("Creating FastAPI application...")
self.app = self._create_app()
logger.info("Application initialized successfully")
async def shutdown(self) -> None:
"""关闭应用"""
logger.info("Shutting down application...")
if self.browser_manager:
await self.browser_manager.close()
logger.info("Application shut down successfully")
def _create_app(self) -> FastAPI:
"""创建 FastAPI 应用"""
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动时已经初始化,这里不需要做什么
yield
# 关闭时清理资源
await self.shutdown()
app = FastAPI(
title="Xiaohongshu MCP Python",
description="小红书 MCP 服务器 - Python 实现",
version="1.0.0",
lifespan=lifespan
)
# 添加 CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
router = create_router(self.service)
app.include_router(router)
# 添加 MCP Streamable HTTP 端点 (用于 Gemini CLI)
@app.api_route("/mcp", methods=["GET", "POST"])
async def handle_mcp_streamable_http(request: Request):
"""MCP Streamable HTTP 端点 (支持 Gemini CLI)"""
logger.info(f"MCP Streamable HTTP request received: {request.method}")
try:
# 处理 GET 请求 (健康检查)
if request.method == "GET":
return {
"status": "ok",
"protocol": "MCP",
"transport": "streamable-http",
"version": "1.0.0"
}
# 获取请求体 (POST)
body = await request.json()
logger.debug(f"MCP request: {body}")
# 创建流式响应
async def stream_response():
"""流式响应生成器"""
# 处理 MCP 请求
method = body.get("method")
params = body.get("params", {})
request_id = body.get("id")
logger.info(f"MCP method: {method}")
# 根据方法处理请求
if method == "initialize":
# 返回初始化响应
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {},
"prompts": {},
"resources": {}
},
"serverInfo": {
"name": "xiaohongshu-mcp",
"version": "1.0.0"
}
}
}
elif method == "tools/list":
# 返回工具列表
tools = [
{
"name": "check_login_status",
"description": "检查小红书登录状态",
"inputSchema": {
"type": "object",
"properties": {},
"required": []
}
},
{
"name": "get_login_qrcode",
"description": "获取小红书登录二维码",
"inputSchema": {
"type": "object",
"properties": {},
"required": []
}
},
{
"name": "delete_cookies",
"description": "删除小红书 cookies",
"inputSchema": {
"type": "object",
"properties": {},
"required": []
}
},
{
"name": "list_feeds",
"description": "获取小红书 Feed 列表",
"inputSchema": {
"type": "object",
"properties": {},
"required": []
}
},
{
"name": "search_feeds",
"description": "搜索小红书内容",
"inputSchema": {
"type": "object",
"properties": {
"keyword": {
"type": "string",
"description": "搜索关键词"
}
},
"required": ["keyword"]
}
},
{
"name": "get_feed_detail",
"description": "获取小红书 Feed 详情",
"inputSchema": {
"type": "object",
"properties": {
"feed_id": {
"type": "string",
"description": "Feed ID"
},
"xsec_token": {
"type": "string",
"description": "安全令牌"
}
},
"required": ["feed_id", "xsec_token"]
}
},
{
"name": "post_comment_to_feed",
"description": "在小红书 Feed 下发表评论",
"inputSchema": {
"type": "object",
"properties": {
"feed_id": {
"type": "string",
"description": "Feed ID"
},
"xsec_token": {
"type": "string",
"description": "安全令牌"
},
"content": {
"type": "string",
"description": "评论内容"
}
},
"required": ["feed_id", "xsec_token", "content"]
}
},
{
"name": "reply_comment_in_feed",
"description": "回复小红书 Feed 中的评论",
"inputSchema": {
"type": "object",
"properties": {
"feed_id": {
"type": "string",
"description": "Feed ID"
},
"xsec_token": {
"type": "string",
"description": "安全令牌"
},
"comment_id": {
"type": "string",
"description": "评论 ID"
},
"content": {
"type": "string",
"description": "回复内容"
}
},
"required": ["feed_id", "xsec_token", "comment_id", "content"]
}
},
{
"name": "like_feed",
"description": "点赞或取消点赞小红书 Feed",
"inputSchema": {
"type": "object",
"properties": {
"feed_id": {
"type": "string",
"description": "Feed ID"
},
"xsec_token": {
"type": "string",
"description": "安全令牌"
},
"action": {
"type": "string",
"enum": ["like", "unlike"],
"description": "操作类型: like(点赞) 或 unlike(取消点赞)"
}
},
"required": ["feed_id", "xsec_token"]
}
},
{
"name": "favorite_feed",
"description": "收藏或取消收藏小红书 Feed",
"inputSchema": {
"type": "object",
"properties": {
"feed_id": {
"type": "string",
"description": "Feed ID"
},
"xsec_token": {
"type": "string",
"description": "安全令牌"
},
"action": {
"type": "string",
"enum": ["favorite", "unfavorite"],
"description": "操作类型: favorite(收藏) 或 unfavorite(取消收藏)"
}
},
"required": ["feed_id", "xsec_token"]
}
},
{
"name": "user_profile",
"description": "获取小红书用户主页信息",
"inputSchema": {
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "用户 ID"
},
"xsec_token": {
"type": "string",
"description": "安全令牌"
}
},
"required": ["user_id", "xsec_token"]
}
},
{
"name": "publish_content",
"description": "发布小红书图文内容",
"inputSchema": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "标题"
},
"content": {
"type": "string",
"description": "正文内容"
},
"images": {
"type": "array",
"items": {
"type": "string"
},
"description": "图片路径列表"
},
"tags": {
"type": "array",
"items": {
"type": "string"
},
"description": "标签列表"
}
},
"required": ["title", "content", "images"]
}
},
{
"name": "publish_with_video",
"description": "发布小红书视频内容",
"inputSchema": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "标题"
},
"content": {
"type": "string",
"description": "正文内容"
},
"video_path": {
"type": "string",
"description": "视频文件路径"
},
"cover_path": {
"type": "string",
"description": "封面图片路径"
},
"tags": {
"type": "array",
"items": {
"type": "string"
},
"description": "标签列表"
}
},
"required": ["title", "content", "video_path"]
}
}
]
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": tools
}
}
elif method == "tools/call":
# 调用工具
tool_name = params.get("name")
tool_args = params.get("arguments", {})
logger.info(f"Calling tool: {tool_name} with args: {tool_args}")
try:
# 实际调用 MCP 服务器的工具
if tool_name not in self.mcp_server.tool_handlers:
response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Tool not found: {tool_name}"
}
}
else:
# 调用工具处理函数
handler = self.mcp_server.tool_handlers[tool_name]
result = await handler(tool_args)
# 转换结果为 JSON-RPC 格式
content = []
for item in result:
content.append({
"type": item.type,
"text": item.text
})
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": content
}
}
except Exception as e:
logger.error(f"Tool execution failed: {e}", exc_info=True)
response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": f"Tool execution failed: {str(e)}"
}
}
else:
# 未知方法
response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
# 流式返回响应
response_json = json.dumps(response) + "\n"
yield response_json.encode()
return StreamingResponse(
stream_response(),
media_type="application/json",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
except Exception as e:
logger.error(f"MCP Streamable HTTP error: {e}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": str(e)
}
}
)
# 添加 MCP SSE 端点 (用于 Claude Desktop)
@app.get("/sse")
async def handle_sse(request: Request):
"""MCP SSE 端点"""
logger.info("MCP SSE connection established")
async def sse_stream():
"""SSE 流生成器"""
async with SseServerTransport("/messages") as transport:
# 连接 MCP 服务器和传输层
await self.mcp_server.get_server().connect(transport)
# 处理消息
async for message in transport:
yield message
return StreamingResponse(
sse_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
@app.post("/messages")
async def handle_messages(request: Request):
"""MCP 消息端点"""
logger.info("MCP message received")
# 获取请求体
body = await request.json()
# 处理消息
# 注意: 这里需要根据 MCP SDK 的实际 API 调整
# 当前版本可能需要不同的处理方式
return {"status": "ok"}
return app
def get_app(self) -> FastAPI:
"""获取 FastAPI 应用实例"""
if not self.app:
raise RuntimeError("Application not initialized. Call initialize() first.")
return self.app
# 全局应用实例
_app_server: AppServer | None = None
async def get_app_server() -> AppServer:
"""获取应用服务器实例"""
global _app_server
if _app_server is None:
_app_server = AppServer()
await _app_server.initialize()
return _app_server