#!/usr/bin/env python3
"""
ID Factory MCP Server
提供ID解析和查询功能
"""
import asyncio
import json
import logging
import urllib.request
import urllib.parse
import os
from typing import Any, Dict, List, Optional
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import Resource, Tool, TextContent, ImageContent, EmbeddedResource
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("idfactory-mcp-server")
# API配置
IDFACTORY_CONFIG = {
"base_url": "http://resolve.idfactory.cn:8081",
"api_key_env": "IDFACTORY_API_KEY", # 可选,用于未来扩展
"default_headers": {
"User-Agent": "IDFactory-MCP-Server/1.0",
"Accept": "application/json"
}
}
# 工具定义
IDFACTORY_TOOLS = [
{
"name": "resolve_id",
"description": "解析ID获取详细信息",
"endpoint": "/{handle}",
"method": "GET",
"params": {
"handle": {"type": "string", "description": "要解析的ID句柄(如:700.100)", "required": True}
}
},
{
"name": "batch_resolve_ids",
"description": "批量解析多个ID句柄",
"endpoint": "/{handle}",
"method": "GET",
"params": {
"handles": {"type": "array", "description": "要解析的ID句柄列表", "required": True, "items": {"type": "string"}}
}
},
{
"name": "parse_id_components",
"description": "解析并格式化ID组件信息",
"endpoint": "/{handle}",
"method": "GET",
"params": {
"handle": {"type": "string", "description": "要解析的ID句柄", "required": True},
"format_output": {"type": "boolean", "description": "是否格式化输出", "default": True, "required": False}
}
}
]
# 创建MCP服务器
server = Server("idfactory-mcp-server")
async def call_idfactory_api(handle: str) -> dict:
"""调用ID Factory API"""
try:
# 构建完整URL
url = f"{IDFACTORY_CONFIG['base_url']}/{urllib.parse.quote(handle)}"
logger.info(f"解析ID: {handle}")
logger.info(f"请求URL: {url}")
# 发送请求
req = urllib.request.Request(url, headers=IDFACTORY_CONFIG["default_headers"])
with urllib.request.urlopen(req) as response:
data = json.loads(response.read().decode('utf-8'))
return data
except urllib.error.HTTPError as e:
error_msg = f"HTTP错误 {e.code}: {e.reason}"
logger.error(f"解析ID {handle} 失败: {error_msg}")
try:
error_data = json.loads(e.read().decode())
return {"error": error_msg, "details": error_data, "handle": handle}
except:
return {"error": error_msg, "handle": handle}
except Exception as e:
error_msg = f"API调用失败: {str(e)}"
logger.error(f"解析ID {handle} 失败: {error_msg}")
return {"error": error_msg, "handle": handle}
def build_input_schema(params_config: Dict[str, Dict]) -> Dict[str, Any]:
"""构建输入模式"""
properties = {}
required = []
for param_name, param_config in params_config.items():
properties[param_name] = {
"type": param_config["type"],
"description": param_config["description"]
}
if "default" in param_config:
properties[param_name]["default"] = param_config["default"]
if param_config.get("required", False):
required.append(param_name)
if param_config.get("items"):
properties[param_name]["items"] = param_config["items"]
return {
"type": "object",
"properties": properties,
"required": required
}
@server.list_tools()
async def handle_list_tools() -> List[Tool]:
"""列出ID Factory工具"""
tools = []
for tool_config in IDFACTORY_TOOLS:
tool = Tool(
name=tool_config["name"],
description=tool_config["description"],
inputSchema=build_input_schema(tool_config["params"])
)
tools.append(tool)
return tools
@server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""处理ID Factory工具调用"""
try:
# 查找工具配置
tool_config = None
for config in IDFACTORY_TOOLS:
if config["name"] == name:
tool_config = config
break
if not tool_config:
raise ValueError(f"未知工具: {name}")
if name == "batch_resolve_ids":
# 批量处理
handles = arguments.get("handles", [])
if not handles:
raise ValueError("请提供至少一个ID句柄")
results = []
for handle in handles:
result = await call_idfactory_api(handle)
formatted_result = format_id_response(result, handle, arguments.get("format_output", True))
results.append(formatted_result)
formatted_result = {
"success": True,
"tool": name,
"total_requested": len(handles),
"successful_resolves": sum(1 for r in results if not r.get("error")),
"failed_resolves": sum(1 for r in results if r.get("error")),
"results": results
}
else:
# 单个处理
handle = arguments.get("handle", "")
if not handle:
raise ValueError("请提供ID句柄")
# 调用API
result = await call_idfactory_api(handle)
format_output = arguments.get("format_output", True)
# 格式化响应
formatted_result = format_id_response(result, handle, format_output)
formatted_result["tool"] = name
formatted_result["arguments"] = arguments
return [TextContent(
type="text",
text=json.dumps(formatted_result, indent=2, ensure_ascii=False)
)]
except Exception as e:
error_result = {
"success": False,
"error": f"工具调用失败: {str(e)}",
"tool": name,
"arguments": arguments
}
logger.error(f"Error calling {name}: {e}")
return [TextContent(
type="text",
text=json.dumps(error_result, indent=2, ensure_ascii=False)
)]
def format_id_response(data: dict, handle: str, format_output: bool = True) -> dict:
"""格式化ID解析响应"""
if "error" in data:
return {
"success": False,
"error": data["error"],
"handle": handle,
"details": data.get("details", {})
}
if not format_output:
return {
"success": True,
"handle": handle,
"raw_data": data
}
# 解析响应码
response_code = data.get("responseCode", 0)
result = {
"success": response_code == 1,
"handle": data.get("handle", handle),
"response_code": response_code,
"timestamp": data.get("timestamp", ""),
"ttl": data.get("ttl", 0),
"components": [],
"summary": {}
}
# 解析value数组
values = data.get("value", [])
for value_item in values:
component = {
"index": value_item.get("index"),
"type": value_item.get("type"),
"data": value_item.get("data", {}),
"ttl": value_item.get("ttl", 0),
"timestamp": value_item.get("timestamp", ""),
"permissions": {
"admin_read": value_item.get("adminRead", 0) == 1,
"admin_write": value_item.get("adminWrite", 0) == 1,
"public_read": value_item.get("publicRead", 0) == 1,
"public_write": value_item.get("publicWrite", 0) == 1
}
}
# 特殊处理不同类型的组件
if value_item.get("type") == "HS_SITE.PREFIX":
site_data = value_item.get("data", {}).get("value", {})
component["parsed_data"] = {
"site_info": {
"version": site_data.get("version"),
"protocol_version": site_data.get("protocolVersion"),
"serial_number": site_data.get("serialNumber"),
"primary_site": site_data.get("primarySite"),
"multi_primary": site_data.get("multiPrimary"),
"hash_option": site_data.get("hashOption"),
"description": extract_attribute(site_data, "desc"),
"servers": format_servers(site_data.get("servers", []))
}
}
elif value_item.get("type") == "AFFILIATION":
component["parsed_data"] = {
"affiliation": value_item.get("data", {}).get("value", "")
}
else:
component["parsed_data"] = value_item.get("data", {})
result["components"].append(component)
# 生成摘要信息
result["summary"] = generate_summary(result["components"])
return result
def extract_attribute(attributes: list, name: str) -> str:
"""从属性列表中提取指定名称的值"""
if not attributes:
return ""
for attr in attributes:
if isinstance(attr, dict) and attr.get("name") == name:
return attr.get("value", "")
return ""
def format_servers(servers: list) -> list:
"""格式化服务器信息"""
formatted_servers = []
for server in servers:
formatted_server = {
"server_id": server.get("serverid"),
"address": server.get("address"),
"public_key": server.get("pubkey", ""),
"interfaces": server.get("interfaces", [])
}
# 格式化公钥(去掉换行符)
if formatted_server["public_key"]:
formatted_server["public_key"] = formatted_server["public_key"].replace('\n', '\\n')
formatted_servers.append(formatted_server)
return formatted_servers
def generate_summary(components: list) -> dict:
"""生成组件摘要信息"""
summary = {
"total_components": len(components),
"component_types": [],
"has_site_prefix": False,
"has_affiliation": False,
"servers": []
}
for component in components:
comp_type = component.get("type", "unknown")
if comp_type not in summary["component_types"]:
summary["component_types"].append(comp_type)
if comp_type == "HS_SITE.PREFIX":
summary["has_site_prefix"] = True
site_info = component.get("parsed_data", {}).get("site_info", {})
summary["servers"] = site_info.get("servers", [])
summary["site_description"] = site_info.get("description", "")
elif comp_type == "AFFILIATION":
summary["has_affiliation"] = True
affiliation = component.get("parsed_data", {}).get("affiliation", "")
summary["affiliation"] = affiliation
return summary
@server.list_resources()
async def handle_list_resources() -> List[Resource]:
"""列出可用资源"""
return [
Resource(
uri="idfactory://info",
name="ID Factory服务信息",
description="ID Factory解析服务的基本信息",
mimeType="application/json"
),
Resource(
uri="idfactory://examples",
name="示例ID句柄",
description="一些可以测试的ID句柄示例",
mimeType="application/json"
)
]
@server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""读取资源内容"""
if uri == "idfactory://info":
info = {
"service": "ID Factory MCP Server",
"version": "1.0.0",
"base_url": IDFACTORY_CONFIG["base_url"],
"description": "提供ID句柄解析和查询功能",
"capabilities": [
"resolve_id",
"batch_resolve_ids",
"parse_id_components"
]
}
return json.dumps(info, indent=2, ensure_ascii=False)
elif uri == "idfactory://examples":
examples = {
"handles": [
"700.100",
"700.1",
"700.2"
],
"usage_examples": [
"解析单个ID: resolve_id(handle='700.100')",
"批量解析: batch_resolve_ids(handles=['700.100', '700.1'])",
"格式化解析: parse_id_components(handle='700.100', format_output=True)"
]
}
return json.dumps(examples, indent=2, ensure_ascii=False)
raise ValueError(f"未知资源: {uri}")
async def main():
"""主函数"""
logger.info("启动ID Factory MCP服务器")
logger.info(f"服务地址: {IDFACTORY_CONFIG['base_url']}")
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="idfactory-mcp-server",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
asyncio.run(main())