"""GrACE-MCP Server 主程序 - 支持通过对话配置."""
import asyncio
import os
import sys
from pathlib import Path
from typing import Any, Sequence
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
CallToolRequest,
CallToolResult,
ListToolsRequest,
ListToolsResult,
ListResourcesRequest,
ListResourcesResult,
ReadResourceRequest,
ListPromptsRequest,
ListPromptsResult,
GetPromptRequest,
GetPromptResult,
TextContent,
TextResourceContents,
Resource,
Prompt,
PromptMessage,
)
from .config_manager import ConfigManager
from .graphiti_client import GraphitiClient
from .tools import get_tools, handle_tool_call
from .logger import default_logger as logger
# 导出 ACEManager 供测试使用
try:
from .ace_manager import ACEManager
except ImportError:
ACEManager = None # type: ignore
class TextResourceContentsWithContent(TextResourceContents):
"""TextResourceContents 的包装类,添加 content 属性以兼容 MCP 客户端."""
def __getattr__(self, name: str):
"""动态返回 content 属性,指向 text 属性."""
if name == 'content':
return self.text
if name == 'mime_type':
return getattr(self, 'mimeType', None)
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
def create_server() -> Server:
"""创建 MCP Server 实例."""
# 支持通过环境变量指定配置文件路径(用于测试隔离)
config_path = os.environ.get('GRAPHITIACE_CONFIG_PATH')
if config_path:
config_manager = ConfigManager(config_path=Path(config_path))
else:
config_manager = ConfigManager()
graphiti_client = GraphitiClient(config_manager)
# 初始化 ACE Manager(可选,如果 ace-framework 未安装会优雅降级)
ace_manager = None
if ACEManager is not None:
try:
ace_manager = ACEManager(config_manager, graphiti_client)
if ace_manager.is_enabled():
logger.info("ACE Manager 已启用")
else:
logger.info("ACE Manager 已初始化但未启用")
except Exception as e:
logger.warning(f"初始化 ACE Manager 失败: {e}")
else:
logger.info("ACE Manager 未安装,跳过初始化")
server = Server("graphitiace")
# 将 ace_manager 保存到闭包中,供 call_tool 使用
_ace_manager = ace_manager
@server.list_tools()
async def list_tools(request: ListToolsRequest) -> ListToolsResult:
"""
列出所有可用的工具.
这些工具都可以通过对话被 MCP Client AI 主动调用.
"""
tools = get_tools(config_manager)
return ListToolsResult(tools=tools)
@server.list_resources()
async def list_resources(request: ListResourcesRequest) -> ListResourcesResult:
"""
列出所有可用的资源.
Resources 是只读的数据检索工具,不执行副作用操作.
"""
resources = [
Resource(
uri="graphitiace://recent-episodes",
name="最近添加的 Episodes",
description="获取最近添加的交互记录",
mimeType="application/json"
),
Resource(
uri="graphitiace://entity-counts",
name="实体统计",
description="获取知识图谱中各种实体的数量统计",
mimeType="application/json"
),
Resource(
uri="graphitiace://configuration",
name="配置信息",
description="获取当前服务器配置信息(只读)",
mimeType="application/json"
),
Resource(
uri="graphitiace://relationship-stats",
name="关系统计",
description="获取知识图谱中各种关系的数量统计",
mimeType="application/json"
),
Resource(
uri="graphitiace://top-entities",
name="热门实体",
description="获取连接最多的实体(Top N)",
mimeType="application/json"
),
Resource(
uri="graphitiace://statistics",
name="完整统计信息",
description="获取知识图谱的完整统计信息(节点、关系、Episode 等)",
mimeType="application/json"
),
Resource(
uri="graphitiace://strategy-heatmap",
name="ACE 策略热力图",
description="聚合展示每个工具在不同成功率区间的策略表现(只读)",
mimeType="application/json"
),
]
return ListResourcesResult(resources=resources)
@server.read_resource()
async def read_resource(uri: str):
"""
读取资源内容.
MCP 框架会自动解包 ReadResourceRequest 的参数,将 uri 作为函数参数传递.
直接返回 TextResourceContents 列表,MCP 框架会自动处理包装.
"""
import json
from datetime import datetime, timedelta
# MCP 传入的是 AnyUrl,对比前先转换为字符串
uri = str(uri)
# 延迟连接,避免启动时阻塞
# 只在真正需要访问资源时才尝试连接
try:
graphiti_client.check_reconnect()
except Exception as e:
logger.warning(f"资源访问时连接/重连失败: {e}")
try:
if uri == "graphitiace://recent-episodes":
# 获取最近 10 个 episode
result = graphiti_client.query_by_time_range(days=30, limit=10)
content = json.dumps(result, indent=2, ensure_ascii=False, default=str)
return [TextResourceContentsWithContent(uri=uri, text=content)]
elif uri == "graphitiace://entity-counts":
# 获取实体统计
if graphiti_client.is_connected():
result = graphiti_client.query_knowledge_graph(
"""
MATCH (n)
RETURN labels(n) as labels, count(n) as count
ORDER BY count DESC
"""
)
stats = {}
if result['success']:
for record in result['results']:
labels = record.get('labels', [])
label = labels[0] if labels else 'Unknown'
count = record.get('count', 0)
stats[label] = count
content = json.dumps({
"success": True,
"statistics": stats,
"total_nodes": sum(stats.values())
}, indent=2, ensure_ascii=False)
else:
content = json.dumps({
"success": False,
"message": "未连接到数据库"
}, indent=2, ensure_ascii=False)
return [TextResourceContentsWithContent(uri=uri, text=content)]
elif uri == "graphitiace://configuration":
# 获取配置信息(只读)
status = config_manager.get_config_status()
# 移除敏感信息
safe_status = {
"neo4j_configured": status.get("neo4j_configured", False),
"api_configured": status.get("api_configured", False),
"group_id": status.get("group_id", "default"),
"neo4j": {
"uri": status.get("neo4j", {}).get("uri", ""),
"username": status.get("neo4j", {}).get("username", ""),
"database": status.get("neo4j", {}).get("database", "")
} if status.get("neo4j") else None,
"api": {
"provider": status.get("api", {}).get("provider", ""),
"has_api_key": status.get("api", {}).get("has_api_key", False),
"model": status.get("api", {}).get("model")
} if status.get("api") else None
}
content = json.dumps(safe_status, indent=2, ensure_ascii=False)
return [TextResourceContentsWithContent(uri=uri, text=content)]
elif uri == "graphitiace://relationship-stats":
# 获取关系统计
if graphiti_client.is_connected():
result = graphiti_client.query_knowledge_graph(
"""
MATCH ()-[r]->()
RETURN type(r) as relationship_type, count(r) as count
ORDER BY count DESC
"""
)
stats = {}
if result['success']:
for record in result['results']:
rel_type = record.get('relationship_type') or 'Unknown'
count = record.get('count', 0)
stats[rel_type] = count
content = json.dumps({
"success": True,
"statistics": stats,
"total_relationships": sum(stats.values())
}, indent=2, ensure_ascii=False)
else:
content = json.dumps({
"success": False,
"message": "未连接到数据库"
}, indent=2, ensure_ascii=False)
return [TextResourceContentsWithContent(uri=uri, text=content)]
elif uri == "graphitiace://top-entities":
# 获取热门实体(连接最多的)
if graphiti_client.is_connected():
result = graphiti_client.query_knowledge_graph(
"""
MATCH (n)-[r]-()
WITH n, count(r) as connection_count
RETURN labels(n) as labels, n.name as name, n.content as content, connection_count
ORDER BY connection_count DESC
LIMIT 20
"""
)
entities = []
if result['success']:
for record in result['results']:
entities.append({
"labels": record.get('labels', []),
"name": record.get('name'),
"content": record.get('content'),
"connection_count": record.get('connection_count', 0)
})
content = json.dumps({
"success": True,
"top_entities": entities
}, indent=2, ensure_ascii=False, default=str)
else:
content = json.dumps({
"success": False,
"message": "未连接到数据库"
}, indent=2, ensure_ascii=False)
return [TextResourceContentsWithContent(uri=uri, text=content)]
elif uri == "graphitiace://statistics":
# 获取完整统计信息
if graphiti_client.is_connected():
result = graphiti_client.get_statistics()
if result['success']:
content = json.dumps(result['statistics'], indent=2, ensure_ascii=False, default=str)
else:
content = json.dumps({
"success": False,
"message": result['message']
}, indent=2, ensure_ascii=False)
else:
content = json.dumps({
"success": False,
"message": "未连接到数据库"
}, indent=2, ensure_ascii=False)
return [TextResourceContentsWithContent(uri=uri, text=content)]
elif uri == "graphitiace://strategy-heatmap":
if _ace_manager is None or not _ace_manager.is_enabled():
content = json.dumps({
"success": False,
"message": "ACE Manager 未启用或未安装,无法生成策略热力图。"
}, indent=2, ensure_ascii=False)
else:
heatmap = _ace_manager.get_strategy_heatmap(limit=30, group_by="tool")
if heatmap and heatmap.get("success"):
content = json.dumps(heatmap, indent=2, ensure_ascii=False, default=str)
else:
message = (heatmap or {}).get("message", "暂无可用的策略数据。")
content = json.dumps({
"success": False,
"message": message
}, indent=2, ensure_ascii=False)
return [TextResourceContentsWithContent(uri=uri, text=content)]
else:
return [TextResourceContentsWithContent(uri=uri, text=f"❌ 未知资源: {uri}")]
except Exception as e:
return [TextResourceContentsWithContent(uri=uri, text=f"❌ 读取资源失败: {str(e)}")]
@server.list_prompts()
async def list_prompts(request: ListPromptsRequest) -> ListPromptsResult:
"""
列出所有可用的提示模板.
Prompts 是预定义的查询模板,用于标准化的交互模式.
"""
prompts = [
Prompt(
name="query_user_preferences",
description="查询用户的技术偏好和编程习惯",
arguments=[
{
"name": "category",
"description": "偏好类别(可选):programming, database, framework, tool",
"required": False
}
]
),
Prompt(
name="query_project_info",
description="查询项目信息和相关需求",
arguments=[
{
"name": "project_name",
"description": "项目名称(可选)",
"required": False
}
]
),
Prompt(
name="query_recent_learning",
description="查询最近的学习记录和进度",
arguments=[
{
"name": "days",
"description": "查询最近 N 天的记录(默认 30)",
"required": False
}
]
),
Prompt(
name="query_best_practices",
description="查询记录的最佳实践和解决方案",
arguments=[
{
"name": "topic",
"description": "主题关键词(可选)",
"required": False
}
]
),
Prompt(
name="add_learning_note",
description="添加学习笔记到知识图谱",
arguments=[
{
"name": "content",
"description": "学习笔记内容",
"required": True
},
{
"name": "topic",
"description": "学习主题(可选)",
"required": False
}
]
),
Prompt(
name="query_related_entities",
description="查询与指定实体相关的其他实体",
arguments=[
{
"name": "entity_name",
"description": "实体名称",
"required": True
},
{
"name": "depth",
"description": "查询深度(默认 1)",
"required": False
}
]
),
Prompt(
name="summarize_knowledge",
description="总结知识图谱中的关键信息",
arguments=[
{
"name": "category",
"description": "类别过滤(可选):preference, project, learning, practice",
"required": False
}
]
),
Prompt(
name="export_data",
description="导出知识图谱数据",
arguments=[
{
"name": "format",
"description": "导出格式:json 或 cypher(默认 json)",
"required": False
}
]
),
Prompt(
name="get_statistics",
description="获取知识图谱统计信息",
arguments=[]
),
]
return ListPromptsResult(prompts=prompts)
@server.get_prompt()
async def get_prompt(name: str, arguments: dict = None) -> GetPromptResult:
"""
获取提示模板内容.
MCP 框架会自动解包 GetPromptRequest 的参数,将 name 和 arguments 作为函数参数传递.
"""
if arguments is None:
arguments = {}
prompt_name = name
messages = []
if prompt_name == "query_user_preferences":
category = arguments.get("category", "")
query_text = f"查询我的技术偏好"
if category:
query_text += f",类别:{category}"
messages = [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"请使用 search_entities 工具{query_text}。"
)
)
]
elif prompt_name == "query_project_info":
project_name = arguments.get("project_name", "")
if project_name:
query_text = f"查询项目 '{project_name}' 的信息和需求"
else:
query_text = "查询所有项目信息"
messages = [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"请使用 search_entities 工具{query_text}。"
)
)
]
elif prompt_name == "query_recent_learning":
days = arguments.get("days", 30)
messages = [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"请使用 query_by_time_range 工具查询最近 {days} 天的学习记录。"
)
)
]
elif prompt_name == "query_best_practices":
topic = arguments.get("topic", "")
if topic:
query_text = f"搜索关于 '{topic}' 的最佳实践和解决方案"
else:
query_text = "搜索所有最佳实践和解决方案"
messages = [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"请使用 semantic_search 工具{query_text}。"
)
)
]
elif prompt_name == "add_learning_note":
content = arguments.get("content", "")
topic = arguments.get("topic", "")
metadata = {"type": "learning", "source": "user_input"}
if topic:
metadata["topic"] = topic
messages = [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"请使用 add_episode 工具记录学习笔记:内容='{content}',元数据={metadata}。"
)
)
]
elif prompt_name == "query_related_entities":
entity_name = arguments.get("entity_name", "")
depth = arguments.get("depth", 1)
messages = [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"请使用 search_relationships 工具查询与 '{entity_name}' 相关的实体,查询深度为 {depth}。"
)
)
]
elif prompt_name == "summarize_knowledge":
category = arguments.get("category", "")
if category:
query_text = f"查询类别为 '{category}' 的所有信息并总结"
else:
query_text = "查询所有信息并总结关键点"
messages = [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"请使用 search_entities 工具{query_text},然后对结果进行总结。"
)
)
]
elif prompt_name == "export_data":
format_type = arguments.get("format", "json")
messages = [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"请使用 export_graph_data 工具导出知识图谱数据,格式为 {format_type}。"
)
)
]
elif prompt_name == "get_statistics":
messages = [
PromptMessage(
role="user",
content=TextContent(
type="text",
text="请使用 get_statistics 工具获取知识图谱的统计信息。"
)
)
]
else:
messages = [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"❌ 未知的提示模板: {prompt_name}"
)
)
]
return GetPromptResult(messages=messages)
@server.call_tool()
async def call_tool(name: str, arguments: dict = None) -> CallToolResult:
# 使用闭包中的 ace_manager
"""
处理工具调用.
当 MCP Client AI 通过对话调用工具时,会执行这里。
MCP 框架会自动解包 CallToolRequest 的参数,将 name 和 arguments 作为函数参数传递.
"""
try:
# 注意:在真实 MCP 运行环境中,框架通常会在解包请求时将 None 归一化为 {},
# 因此这一分支极难在覆盖率统计中被直接命中;行为已通过多种等价路径测试。
if arguments is None: # pragma: no cover - MCP 解包后通常不会传入 None
arguments = {}
tool_name = name
# 延迟连接,避免启动时阻塞
# 只在真正需要访问资源时才尝试连接
try:
graphiti_client.check_reconnect()
except Exception as e:
logger.warning(f"工具调用时连接/重连失败: {e}")
# 检查 ACE 配置是否变更
if _ace_manager:
_ace_manager.check_reload()
# 处理工具调用(传递 ace_manager)
results = await handle_tool_call(
tool_name=tool_name,
arguments=arguments,
config_manager=config_manager,
graphiti_client=graphiti_client if graphiti_client.is_connected() else None,
ace_manager=_ace_manager
)
return CallToolResult(content=results)
except Exception as e:
# 如果执行失败,返回错误信息
import traceback
error_details = traceback.format_exc()
error_msg = f"❌ 执行工具时出错:{str(e)}\n\n详细错误:\n{error_details}"
return CallToolResult(content=[TextContent(type="text", text=error_msg)])
return server
async def main():
"""主函数 - 启动 MCP Server."""
server = create_server()
# 使用 stdio 传输(MCP Client 通过标准输入输出与服务器通信)
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__": # pragma: no cover - 仅脚本直接运行时触发的入口
asyncio.run(main())