"""
MCP 服务器主程序 - 记账工具的 MCP 服务器实现
基于 MCP Python SDK 实现完整的记账 MCP 服务器,包括:
- stdio 传输协议
- 工具注册和调用
- 资源注册和访问
- 错误处理和日志记录
"""
import asyncio
import json
import logging
import sys
from typing import Any, Dict, List, Optional
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.server.models import InitializationOptions
from mcp.types import (
Resource,
Tool,
ToolAnnotations,
TextContent,
CallToolRequest,
CallToolResult,
ReadResourceRequest, # 更新的API名称
ReadResourceResult, # 更新的API名称
ServerCapabilities,
ToolsCapability,
ResourcesCapability,
)
from .tools import AccountingTools, get_tool_definitions
from .resources import AccountingResources, get_resource_definitions
from .storage import StorageManager
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('mcp_server.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class AccountingMCPServer:
"""记账 MCP 服务器"""
def __init__(self, data_dir: str = "data"):
"""
初始化 MCP 服务器
Args:
data_dir: 数据存储目录
"""
self.server = Server("accounting-mcp")
self.storage = StorageManager(data_dir)
self.tools = AccountingTools(self.storage)
self.resources = AccountingResources(self.storage)
# 注册处理器
self._register_handlers()
logger.info(f"记账 MCP 服务器初始化完成,数据目录: {data_dir}")
def _register_handlers(self) -> None:
"""注册所有 MCP 处理器"""
@self.server.list_tools()
async def list_tools() -> List[Tool]:
"""列出所有可用工具"""
tools = []
tool_definitions = get_tool_definitions()
for tool_def in tool_definitions:
# 添加工具注解,提供额外的元数据
annotations = ToolAnnotations(
readOnlyHint=tool_def.get("readOnly", False),
destructiveHint=tool_def.get("destructive", False)
)
tools.append(Tool(
name=tool_def["name"],
description=tool_def["description"],
inputSchema=tool_def["inputSchema"],
annotations=annotations
))
logger.info(f"列出 {len(tools)} 个工具")
return tools
@self.server.call_tool()
async def call_tool(request: CallToolRequest) -> CallToolResult:
"""调用工具"""
tool_name = request.params.name
arguments = request.params.arguments or {}
logger.info(f"调用工具: {tool_name}, 参数: {arguments}")
try:
# 根据工具名称分发调用
if tool_name == "add_transaction":
result = await self.tools.add_transaction(
amount=arguments.get("amount"),
category=arguments.get("category"),
description=arguments.get("description", ""),
date_str=arguments.get("date")
)
elif tool_name == "get_balance":
result = await self.tools.get_balance(
detailed=arguments.get("detailed", False)
)
elif tool_name == "list_transactions":
result = await self.tools.list_transactions(
limit=arguments.get("limit", 20),
offset=arguments.get("offset", 0),
category=arguments.get("category"),
start_date=arguments.get("start_date"),
end_date=arguments.get("end_date")
)
elif tool_name == "get_monthly_summary":
result = await self.tools.get_monthly_summary(
year=arguments.get("year"),
month=arguments.get("month")
)
elif tool_name == "get_categories":
result = await self.tools.get_categories()
else:
result = {
"success": False,
"error": f"未知的工具: {tool_name}",
"error_type": "invalid_tool"
}
# 格式化返回结果
content = [TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
logger.info(f"工具 {tool_name} 执行完成,成功: {result.get('success', False)}")
return CallToolResult(content=content)
except Exception as e:
error_result = {
"success": False,
"error": f"工具执行失败: {str(e)}",
"error_type": "execution_error"
}
logger.error(f"工具 {tool_name} 执行失败: {str(e)}", exc_info=True)
content = [TextContent(
type="text",
text=json.dumps(error_result, indent=2, ensure_ascii=False)
)]
return CallToolResult(content=content, isError=True)
@self.server.list_resources()
async def list_resources() -> List[Resource]:
"""列出所有可用资源"""
resources = []
resource_definitions = get_resource_definitions()
for res_def in resource_definitions:
resources.append(Resource(
uri=res_def["uri"],
name=res_def["name"],
description=res_def["description"],
mimeType=res_def["mimeType"]
))
logger.info(f"列出 {len(resources)} 个资源")
return resources
@self.server.read_resource()
async def get_resource(request: ReadResourceRequest) -> ReadResourceResult:
"""获取资源内容"""
uri = request.params.uri
logger.info(f"获取资源: {uri}")
try:
result = await self.resources.get_resource(uri)
if result.get("success"):
content = [TextContent(
type="text",
text=result["content"]
)]
logger.info(f"资源 {uri} 获取成功")
return ReadResourceResult(contents=content)
else:
# 资源获取失败
error_content = [TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
logger.error(f"资源 {uri} 获取失败: {result.get('error')}")
return ReadResourceResult(contents=error_content)
except Exception as e:
error_result = {
"success": False,
"error": f"资源获取失败: {str(e)}",
"error_type": "resource_error"
}
logger.error(f"资源 {uri} 获取异常: {str(e)}", exc_info=True)
error_content = [TextContent(
type="text",
text=json.dumps(error_result, indent=2, ensure_ascii=False)
)]
return ReadResourceResult(contents=error_content)
async def run(self) -> None:
"""启动 MCP 服务器"""
logger.info("启动记账 MCP 服务器...")
async with stdio_server() as (read_stream, write_stream):
logger.info("MCP 服务器正在运行,等待客户端连接...")
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="accounting-mcp-server",
server_version="1.0.0",
capabilities=ServerCapabilities(
tools=ToolsCapability(), # 声明支持工具
resources=ResourcesCapability(), # 声明支持资源
),
),
)
# 服务器入口点
async def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description="记账 MCP 服务器")
parser.add_argument(
"--data-dir",
default="data",
help="数据存储目录 (默认: data)"
)
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="INFO",
help="日志级别 (默认: INFO)"
)
args = parser.parse_args()
# 设置日志级别
logging.getLogger().setLevel(getattr(logging, args.log_level))
# 创建并启动服务器
server = AccountingMCPServer(data_dir=args.data_dir)
try:
await server.run()
except KeyboardInterrupt:
logger.info("服务器被用户中断")
except Exception as e:
logger.error(f"服务器运行异常: {str(e)}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())