"""
MCP 伺服器主程式 - 記帳工具的 MCP 伺服器實作
基於 MCP Python SDK 實作完整的記帳 MCP 伺服器,包括:
- stdio 傳輸協定
- 工具註冊與呼叫
- 資源註冊與存取
- 錯誤處理與日誌記錄
"""
import asyncio
import json
import logging
import sys
from typing import Any, Dict, List, Optional
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.server.models import InitializationOptions
from mcp.types import (
Resource,
Tool,
ToolAnnotations,
TextContent,
CallToolRequest,
CallToolResult,
ReadResourceRequest, # 更新的 API 名稱
ReadResourceResult, # 更新的 API 名稱
ServerCapabilities,
ToolsCapability,
ResourcesCapability,
)
from .tools import AccountingTools, get_tool_definitions
from .resources import AccountingResources, get_resource_definitions
from .storage import StorageManager
# 設定日誌
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('mcp_server.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class AccountingMCPServer:
"""記帳 MCP 伺服器"""
def __init__(self, data_dir: str = "data"):
"""
初始化 MCP 伺服器
Args:
data_dir: 資料儲存目錄
"""
self.server = Server("accounting-mcp")
self.storage = StorageManager(data_dir)
self.tools = AccountingTools(self.storage)
self.resources = AccountingResources(self.storage)
# 註冊處理器
self._register_handlers()
logger.info(f"記帳 MCP 伺服器初始化完成,資料目錄: {data_dir}")
async def _dispatch_tool(self, tool_name: Optional[str], arguments: Dict[str, Any]) -> CallToolResult:
try:
# 根據工具名稱分派呼叫
if tool_name == "add_transaction":
result = await self.tools.add_transaction(
amount=arguments.get("amount"),
category=arguments.get("category"),
description=arguments.get("description", ""),
date_str=arguments.get("date")
)
elif tool_name == "get_balance":
result = await self.tools.get_balance(
detailed=arguments.get("detailed", False)
)
elif tool_name == "list_transactions":
result = await self.tools.list_transactions(
limit=arguments.get("limit", 20),
offset=arguments.get("offset", 0),
category=arguments.get("category"),
start_date=arguments.get("start_date"),
end_date=arguments.get("end_date")
)
elif tool_name == "get_monthly_summary":
result = await self.tools.get_monthly_summary(
year=arguments.get("year"),
month=arguments.get("month")
)
elif tool_name == "get_categories":
result = await self.tools.get_categories()
else:
result = {
"success": False,
"error": f"未知的工具: {tool_name}",
"error_type": "invalid_tool"
}
# 格式化回傳結果
content = [TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
logger.info(f"工具 {tool_name} 執行完成,成功: {result.get('success', False)}")
return CallToolResult(content=content)
except Exception as e:
error_result = {
"success": False,
"error": f"工具執行失敗: {str(e)}",
"error_type": "execution_error"
}
logger.error(f"工具 {tool_name} 執行失敗: {str(e)}", exc_info=True)
content = [TextContent(
type="text",
text=json.dumps(error_result, indent=2, ensure_ascii=False)
)]
return CallToolResult(content=content, isError=True)
def _register_handlers(self) -> None:
"""註冊所有 MCP 處理器"""
def _normalize_params(raw: Any) -> Dict[str, Any]:
if raw is None:
return {}
if isinstance(raw, dict):
return raw
if hasattr(raw, "model_dump"):
return raw.model_dump()
if hasattr(raw, "dict"):
return raw.dict()
if hasattr(raw, "__dict__"):
return dict(raw.__dict__)
return {}
def _select_request_arg(args_list: Any) -> Any:
for candidate in args_list:
if candidate is None:
continue
if isinstance(candidate, dict):
if "params" in candidate or "name" in candidate or "uri" in candidate:
return candidate
if hasattr(candidate, "params") or hasattr(candidate, "name") or hasattr(candidate, "uri"):
return candidate
return args_list[-1] if args_list else None
@self.server.list_tools()
async def list_tools(*_args: Any, **_kwargs: Any) -> List[Tool]:
"""列出所有可用工具"""
tools = []
tool_definitions = get_tool_definitions()
for tool_def in tool_definitions:
# 新增工具註解,提供額外的中繼資料
annotations = ToolAnnotations(
readOnlyHint=tool_def.get("readOnly", False),
destructiveHint=tool_def.get("destructive", False)
)
tools.append(Tool(
name=tool_def["name"],
description=tool_def["description"],
inputSchema=tool_def["inputSchema"],
annotations=annotations
))
logger.info(f"列出 {len(tools)} 個工具")
return tools
@self.server.call_tool()
async def call_tool(*args: Any, **_kwargs: Any) -> CallToolResult:
"""呼叫工具"""
# MCP SDK 可能直接傳入 (name, arguments)
if len(args) >= 2 and isinstance(args[0], str):
tool_name = args[0]
arguments = _normalize_params(args[1])
logger.info(f"呼叫工具: {tool_name}, 參數: {arguments}")
return await self._dispatch_tool(tool_name, arguments)
request = _select_request_arg(args)
if hasattr(request, "name") and hasattr(request, "arguments"):
params = _normalize_params(request)
elif isinstance(request, dict):
if "name" in request or "arguments" in request:
params = request
else:
params = request.get("params")
elif hasattr(request, "params"):
params = request.params
else:
params = {}
params = _normalize_params(params)
tool_name = params.get("name")
arguments = params.get("arguments") or {}
if not tool_name and isinstance(request, dict) and request.get("method") == "tools/call":
fallback_params = request.get("params") or {}
tool_name = fallback_params.get("name")
arguments = fallback_params.get("arguments") or {}
logger.info(f"呼叫工具: {tool_name}, 參數: {arguments}")
return await self._dispatch_tool(tool_name, arguments)
@self.server.list_resources()
async def list_resources(*_args: Any, **_kwargs: Any) -> List[Resource]:
"""列出所有可用資源"""
resources = []
resource_definitions = get_resource_definitions()
for res_def in resource_definitions:
resources.append(Resource(
uri=res_def["uri"],
name=res_def["name"],
description=res_def["description"],
mimeType=res_def["mimeType"]
))
logger.info(f"列出 {len(resources)} 個資源")
return resources
@self.server.read_resource()
async def get_resource(*args: Any, **_kwargs: Any) -> ReadResourceResult:
"""取得資源內容"""
request = _select_request_arg(args)
if hasattr(request, "uri"):
params = _normalize_params(request)
elif isinstance(request, dict):
params = request.get("params")
elif hasattr(request, "params"):
params = request.params
else:
params = {}
params = _normalize_params(params)
uri = params.get("uri")
logger.info(f"取得資源: {uri}")
try:
result = await self.resources.get_resource(uri)
if result.get("success"):
content = [TextContent(
type="text",
text=result["content"]
)]
logger.info(f"資源 {uri} 取得成功")
return ReadResourceResult(contents=content)
else:
# 資源取得失敗
error_content = [TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
logger.error(f"資源 {uri} 取得失敗: {result.get('error')}")
return ReadResourceResult(contents=error_content)
except Exception as e:
error_result = {
"success": False,
"error": f"資源取得失敗: {str(e)}",
"error_type": "resource_error"
}
logger.error(f"資源 {uri} 取得異常: {str(e)}", exc_info=True)
error_content = [TextContent(
type="text",
text=json.dumps(error_result, indent=2, ensure_ascii=False)
)]
return ReadResourceResult(contents=error_content)
async def run(self) -> None:
"""啟動 MCP 伺服器"""
logger.info("啟動記帳 MCP 伺服器...")
async with stdio_server() as (read_stream, write_stream):
logger.info("MCP 伺服器正在執行,等待客戶端連線...")
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="accounting-mcp-server",
server_version="1.0.0",
capabilities=ServerCapabilities(
tools=ToolsCapability(), # 宣告支援工具
resources=ResourcesCapability(), # 宣告支援資源
),
),
)
# 伺服器入口點
async def main():
"""主函式"""
import argparse
parser = argparse.ArgumentParser(description="記帳 MCP 伺服器")
parser.add_argument(
"--data-dir",
default="data",
help="資料儲存目錄 (預設: data)"
)
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="INFO",
help="日誌級別 (預設: INFO)"
)
args = parser.parse_args()
# 設定日誌級別
logging.getLogger().setLevel(getattr(logging, args.log_level))
# 建立並啟動伺服器
server = AccountingMCPServer(data_dir=args.data_dir)
try:
await server.run()
except KeyboardInterrupt:
logger.info("伺服器被使用者中斷")
except Exception as e:
logger.error(f"伺服器執行異常: {str(e)}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())