"""MCP 工具定义 - 所有工具都可以通过对话被 AI 调用."""
import json
from typing import Any, Dict, List, Optional, Tuple
from mcp.types import Tool, TextContent
from pydantic import BaseModel, Field
from .config_manager import ConfigManager
from .logger import default_logger as logger
def _summarize_result_text(result_text: Any) -> Tuple[bool, str]:
"""提取结果文本的成功标记与截断后的内容."""
text = str(result_text)
success = "✅" in text or "成功" in text
return success, text[:500]
class ConfigureNeo4jInput(BaseModel):
"""配置 Neo4j 数据库的参数."""
uri: str = Field(..., description="Neo4j 连接 URI,例如: bolt://localhost:7687")
username: str = Field(..., description="Neo4j 用户名,通常是 'neo4j'")
password: str = Field(..., description="Neo4j 密码")
database: str = Field(default="neo4j", description="数据库名称,默认为 'neo4j'")
class ConfigureAPIInput(BaseModel):
"""配置 AI API 的参数(可选)."""
provider: str = Field(
default="openai",
description="API 提供商: 'openai' 或 'anthropic'。如果不提供 API key,将使用 MCP Client 内置 AI"
)
api_key: Optional[str] = Field(
default=None,
description="API Key(可选)。如果不提供,将使用 MCP Client 的内置 AI 能力"
)
model: Optional[str] = Field(
default=None,
description="模型名称,例如 'gpt-4' 或 'claude-3-opus'(可选)"
)
class SetGroupIdInput(BaseModel):
"""设置组 ID 的参数(用于数据隔离)."""
group_id: str = Field(..., description="组 ID,用于区分不同用户或项目的数据")
class AddEpisodeInput(BaseModel):
"""添加交互记录到知识图谱的参数."""
content: str = Field(..., description="要记录的交互内容,例如用户偏好、项目信息、学习笔记等")
metadata: Optional[Dict[str, Any]] = Field(
default=None,
description="可选的元数据,例如 {'type': 'preference', 'category': 'programming'}"
)
saga: Optional[str] = Field(
default=None,
description="Saga 名称(可选),用于将多个 episode 关联到同一个会话/事务"
)
saga_previous_episode_uuid: Optional[str] = Field(
default=None,
description="前一个 episode 的 UUID(可选),用于高效连接连续的 episode"
)
custom_extraction_instructions: Optional[str] = Field(
default=None,
description="自定义抽取指令(可选),用于指导实体和关系的抽取过程"
)
class SearchEntitiesInput(BaseModel):
"""搜索实体的参数."""
query: str = Field(..., description="搜索查询,可以是关键词或自然语言描述")
entity_type: Optional[str] = Field(
default=None,
description="实体类型过滤,例如 'Preference', 'Project', 'Requirement'"
)
limit: int = Field(default=10, description="返回结果数量限制")
class SearchRelationshipsInput(BaseModel):
"""搜索关系的参数."""
query: str = Field(..., description="搜索查询,用于查找实体之间的关系")
limit: int = Field(default=10, description="返回结果数量限制")
class QueryKnowledgeGraphInput(BaseModel):
"""查询知识图谱的参数."""
query: str = Field(..., description="Cypher 查询语句或自然语言查询")
limit: int = Field(default=20, description="返回结果数量限制")
class DeleteEpisodeInput(BaseModel):
"""删除 episode 的参数."""
episode_id: Optional[int] = Field(default=None, description="Episode 的 ID")
content: Optional[str] = Field(default=None, description="Episode 的内容")
delete_all: bool = Field(default=False, description="是否删除所有 episode")
class ClearGraphInput(BaseModel):
"""清空图谱的参数."""
confirm: bool = Field(..., description="确认清空图谱")
class QueryByTimeRangeInput(BaseModel):
"""时间范围查询的参数."""
start_time: Optional[str] = Field(default=None, description="开始时间(ISO 格式)")
end_time: Optional[str] = Field(default=None, description="结束时间(ISO 格式)")
days: Optional[int] = Field(default=None, description="查询最近 N 天的数据")
entity_type: Optional[str] = Field(default=None, description="实体类型过滤")
limit: int = Field(default=20, description="返回结果数量限制")
class SemanticSearchInput(BaseModel):
"""语义搜索的参数."""
query: str = Field(..., description="搜索查询(自然语言)")
num_results: int = Field(default=10, description="返回结果数量")
center_node_uuid: Optional[str] = Field(default=None, description="中心节点 UUID(可选)")
class BulkEpisodeInput(BaseModel):
"""批量添加 episode 的参数."""
episodes: List[Dict[str, Any]] = Field(..., description="Episode 列表")
class ExportGraphDataInput(BaseModel):
"""导出图谱数据的参数."""
format: str = Field(default="json", description="导出格式:json 或 cypher")
class ImportGraphDataInput(BaseModel):
"""导入图谱数据的参数."""
data: Dict[str, Any] = Field(..., description="要导入的数据")
format: str = Field(default="json", description="数据格式")
class ValidateDataInput(BaseModel):
"""数据验证的参数."""
check_orphaned: bool = Field(default=True, description="检查孤立节点")
check_duplicates: bool = Field(default=True, description="检查重复节点")
check_integrity: bool = Field(default=True, description="检查数据完整性")
class CleanOrphanedNodesInput(BaseModel):
"""清理孤立节点的参数."""
confirm: bool = Field(..., description="确认清理孤立节点")
node_types: Optional[List[str]] = Field(default=None, description="要清理的节点类型列表(可选)")
class RebuildIndexesInput(BaseModel):
"""重建索引的参数."""
index_types: Optional[List[str]] = Field(default=None, description="要重建的索引类型(可选,默认全部)")
class QueryStrategiesInput(BaseModel):
"""查询策略的参数."""
tool_name: Optional[str] = Field(default=None, description="工具名称(可选,用于筛选特定工具的策略)")
limit: int = Field(default=20, description="返回结果数量限制")
class GetStrategyStatsInput(BaseModel):
"""获取策略统计的参数."""
tool_name: Optional[str] = Field(default=None, description="工具名称(可选,用于筛选特定工具的统计)")
class RateResultInput(BaseModel):
"""对工具执行结果评分的参数."""
tool_name: str = Field(..., description="工具名称")
rating: int = Field(..., ge=1, le=5, description="评分(1-5,1=很差,5=很好)")
feedback: Optional[str] = Field(default=None, description="反馈意见(可选)")
context: Optional[Dict[str, Any]] = Field(default=None, description="上下文信息(可选,用于关联特定的工具调用)")
class ExportStrategiesInput(BaseModel):
"""导出策略的参数."""
tool_name: Optional[str] = Field(default=None, description="工具名称(可选,用于筛选特定工具的策略)")
file_path: Optional[str] = Field(default=None, description="导出文件路径(可选,默认使用配置目录)")
class ImportStrategiesInput(BaseModel):
"""导入策略的参数."""
file_path: str = Field(..., description="导入文件路径")
overwrite: bool = Field(default=False, description="是否覆盖现有策略(默认 false)")
class ToggleStrategyInput(BaseModel):
"""启用/禁用策略的参数."""
tool_name: str = Field(..., description="工具名称")
arguments_hash: Optional[str] = Field(default=None, description="参数哈希(可选,如果不提供则操作该工具的所有策略)")
enabled: bool = Field(..., description="是否启用(true=启用,false=禁用)")
class ListStrategyVersionsInput(BaseModel):
"""列出策略版本的参数."""
tool_name: str = Field(..., description="工具名称")
arguments_hash: Optional[str] = Field(default=None, description="参数哈希(可选,用于精确定位策略)")
limit: int = Field(default=10, description="返回版本数量限制")
class GetLearningTrendsInput(BaseModel):
"""获取学习趋势参数."""
tool_name: Optional[str] = Field(default=None, description="工具名称(可选)")
arguments_hash: Optional[str] = Field(default=None, description="参数哈希(可选)")
days: int = Field(default=30, ge=1, le=180, description="回溯天数(1-180)")
class BulkUpdateStrategiesInput(BaseModel):
"""批量策略管理参数."""
action: str = Field(..., description="操作类型:enable/disable/delete")
tool_name: Optional[str] = Field(default=None, description="工具名称(可选)")
success_rate_min: Optional[float] = Field(default=None, ge=0.0, le=1.0, description="最小成功率(0-1)")
success_rate_max: Optional[float] = Field(default=None, ge=0.0, le=1.0, description="最大成功率(0-1)")
usage_min: Optional[int] = Field(default=None, ge=0, description="最小使用次数")
usage_max: Optional[int] = Field(default=None, ge=0, description="最大使用次数")
tags: Optional[List[str]] = Field(default=None, description="包含指定标签的策略")
enabled: Optional[bool] = Field(default=None, description="仅匹配已启用/禁用策略")
limit: int = Field(default=100, ge=1, le=1000, description="最多影响的策略数量(默认100,最大1000)")
class BulkExportStrategiesInput(BaseModel):
"""批量导出策略参数."""
tool_name: Optional[str] = Field(default=None, description="工具名称(可选)")
success_rate_min: Optional[float] = Field(default=None, ge=0.0, le=1.0, description="最小成功率(0-1)")
success_rate_max: Optional[float] = Field(default=None, ge=0.0, le=1.0, description="最大成功率(0-1)")
usage_min: Optional[int] = Field(default=None, ge=0, description="最小使用次数")
usage_max: Optional[int] = Field(default=None, ge=0, description="最大使用次数")
tags: Optional[List[str]] = Field(default=None, description="包含指定标签的策略")
enabled: Optional[bool] = Field(default=None, description="仅匹配已启用/禁用策略")
limit: int = Field(default=500, ge=1, le=5000, description="导出数量上限(默认500,最大5000)")
file_path: Optional[str] = Field(default=None, description="导出文件路径(可选)")
class GetStrategyAlertsInput(BaseModel):
"""获取策略告警参数."""
failure_threshold: int = Field(
default=5, ge=1, description="在时间窗口内触发告警的最小失败次数(默认5)"
)
days: int = Field(
default=1, ge=1, le=30, description="回溯天数窗口(默认1天,最大30天)"
)
tool_name: Optional[str] = Field(default=None, description="工具名称(可选,仅查看某个工具的告警)")
arguments_hash: Optional[str] = Field(default=None, description="参数哈希(可选,用于精确定位策略)")
class RenderStrategyInsightsInput(BaseModel):
"""渲染策略洞察的参数."""
format: str = Field(
default="auto",
description="可视化格式:auto(默认)、mermaid、ascii"
)
top_n: int = Field(
default=15,
ge=1,
le=100,
description="包含的热力图分组数量(1-100)"
)
group_by: str = Field(
default="tool",
description="汇总方式:tool(默认)或 bucket"
)
# 缓存工具列表,避免重复创建(提升性能)
_tools_cache: Optional[List[Tool]] = None
def get_tools(config_manager: ConfigManager) -> List[Tool]:
"""
获取所有可用的 MCP 工具.
这些工具都可以通过对话被 MCP Client 主动调用.
使用缓存避免重复创建工具列表,提升性能.
"""
global _tools_cache
# 如果缓存存在,直接返回
if _tools_cache is not None:
return _tools_cache
# 创建工具列表(只执行一次)
_tools_cache = [
# ========== 配置工具 ==========
Tool(
name="configure_neo4j",
description=(
"配置 Neo4j 图数据库连接。这是使用 Graphiti MCP Server 的第一步。"
"当用户提到需要配置数据库或 Neo4j 时,应该主动调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"uri": {
"type": "string",
"description": "Neo4j 连接 URI,例如: bolt://localhost:7687"
},
"username": {
"type": "string",
"description": "Neo4j 用户名,通常是 'neo4j'"
},
"password": {
"type": "string",
"description": "Neo4j 密码"
},
"database": {
"type": "string",
"description": "数据库名称,默认为 'neo4j'",
"default": "neo4j"
}
},
"required": ["uri", "username", "password"]
}
),
Tool(
name="configure_api",
description=(
"配置 AI API Key(可选)。如果不配置,系统将使用 MCP Client 的内置 AI 能力。"
"当用户提供 API key 或想要配置 AI 服务时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"provider": {
"type": "string",
"description": "API 提供商: 'openai' 或 'anthropic'",
"default": "openai"
},
"api_key": {
"type": "string",
"description": "API Key(可选)。如果不提供,将使用 MCP Client 的内置 AI 能力"
},
"model": {
"type": "string",
"description": "模型名称,例如 'gpt-4' 或 'claude-3-opus'(可选)"
}
},
"required": []
}
),
Tool(
name="check_configuration",
description=(
"检查当前配置状态。当用户询问配置情况或需要验证配置时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="reset_configuration",
description=(
"重置所有配置。当用户想要重新开始配置时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="set_group_id",
description=(
"设置组 ID,用于数据隔离(区分不同用户或项目)。"
"当用户提到需要区分不同项目或用户时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"group_id": {
"type": "string",
"description": "组 ID,例如项目名称或用户名"
}
},
"required": ["group_id"]
}
),
# ========== 知识图谱工具 ==========
Tool(
name="add_episode",
description=(
"添加交互记录到知识图谱。当用户想要记录信息、偏好、项目详情、学习笔记等时,"
"应该主动调用此工具。例如:用户偏好、项目信息、代码风格、最佳实践等。"
"支持 Saga 功能:可将多个 episode 关联到同一个会话/事务中。"
),
inputSchema={
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "要记录的交互内容,例如:'我喜欢使用 TypeScript 和 React'"
},
"metadata": {
"type": "object",
"description": "可选的元数据,例如 {'type': 'preference', 'category': 'programming'}",
"additionalProperties": True
},
"saga": {
"type": "string",
"description": "Saga 名称(可选),用于将多个 episode 关联到同一个会话/事务,例如 'conversation_123'"
},
"saga_previous_episode_uuid": {
"type": "string",
"description": "前一个 episode 的 UUID(可选),用于高效连接连续的 episode"
},
"custom_extraction_instructions": {
"type": "string",
"description": "自定义抽取指令(可选),用于指导实体和关系的抽取过程"
}
},
"required": ["content"]
}
),
Tool(
name="search_entities",
description=(
"在知识图谱中搜索实体。当用户询问之前记录的信息、偏好、项目详情等时,"
"应该主动调用此工具进行检索。"
),
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询,可以是关键词或自然语言,例如:'TypeScript 偏好'"
},
"entity_type": {
"type": "string",
"description": "实体类型过滤,例如 'Preference', 'Project', 'Requirement'"
},
"limit": {
"type": "integer",
"description": "返回结果数量限制",
"default": 10
}
},
"required": ["query"]
}
),
Tool(
name="search_relationships",
description=(
"在知识图谱中搜索实体之间的关系。当用户询问实体之间的关联时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询,用于查找实体之间的关系"
},
"limit": {
"type": "integer",
"description": "返回结果数量限制",
"default": 10
}
},
"required": ["query"]
}
),
Tool(
name="query_knowledge_graph",
description=(
"使用 Cypher 查询语言或自然语言查询知识图谱。"
"当用户需要复杂的图谱查询时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Cypher 查询语句或自然语言查询"
},
"limit": {
"type": "integer",
"description": "返回结果数量限制",
"default": 20
}
},
"required": ["query"]
}
),
Tool(
name="delete_episode",
description=(
"删除知识图谱中的 episode。当用户想要删除特定记录时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"episode_id": {
"type": "integer",
"description": "Episode 的 ID(可选,如果提供则删除指定 ID 的 episode)"
},
"content": {
"type": "string",
"description": "Episode 的内容(可选,如果提供则删除匹配内容的 episode)"
},
"delete_all": {
"type": "boolean",
"description": "是否删除所有 episode(默认 false)",
"default": False
}
},
"required": []
}
),
Tool(
name="clear_graph",
description=(
"清空整个知识图谱。当用户想要重置所有数据时,应该调用此工具。"
"⚠️ 警告:此操作不可逆,会删除所有节点和关系。"
),
inputSchema={
"type": "object",
"properties": {
"confirm": {
"type": "boolean",
"description": "确认清空图谱(必须为 true 才能执行)",
"default": False
}
},
"required": ["confirm"]
}
),
Tool(
name="query_by_time_range",
description=(
"根据时间范围查询知识图谱。支持查询特定时间段内的 episode 或实体。"
"当用户询问'最近一周'、'上个月'等时间相关查询时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"start_time": {
"type": "string",
"description": "开始时间(ISO 格式,例如:2025-11-01T00:00:00)"
},
"end_time": {
"type": "string",
"description": "结束时间(ISO 格式,例如:2025-11-30T23:59:59)。如果不提供,默认为当前时间"
},
"days": {
"type": "integer",
"description": "查询最近 N 天的数据(可选,如果提供则忽略 start_time 和 end_time)"
},
"entity_type": {
"type": "string",
"description": "实体类型过滤(可选)"
},
"limit": {
"type": "integer",
"description": "返回结果数量限制",
"default": 20
}
},
"required": []
}
),
Tool(
name="semantic_search",
description=(
"语义搜索 - 智能搜索,理解查询意图并找到相关内容。"
"当用户需要进行语义理解搜索时(而不是简单的关键词匹配),应该调用此工具。"
"如果配置了 API key,会使用向量搜索;否则会使用增强的关键词搜索。"
"注意:即使没有 API key,你也可以理解查询意图,扩展关键词(同义词、相关概念),使用扩展关键词搜索,然后对结果进行语义相关性排序。"
),
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询(自然语言)"
},
"num_results": {
"type": "integer",
"description": "返回结果数量",
"default": 10
},
"center_node_uuid": {
"type": "string",
"description": "中心节点 UUID(可选,用于重新排序结果)"
}
},
"required": ["query"]
}
),
Tool(
name="add_episodes_bulk",
description=(
"批量添加多个 episode 到知识图谱。"
"当用户想要一次性记录多条信息时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"episodes": {
"type": "array",
"description": "Episode 列表,每个元素包含 content 和可选的 metadata",
"items": {
"type": "object",
"properties": {
"content": {"type": "string"},
"metadata": {"type": "object", "additionalProperties": True}
},
"required": ["content"]
}
}
},
"required": ["episodes"]
}
),
Tool(
name="health_check",
description=(
"执行系统健康检查。检查配置、数据库连接、Graphiti 状态等。"
"当用户询问系统状态或遇到问题时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="export_graph_data",
description=(
"导出知识图谱数据。支持 JSON 和 Cypher 格式。"
"当用户想要备份数据或迁移数据时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"format": {
"type": "string",
"description": "导出格式:'json' 或 'cypher'",
"default": "json",
"enum": ["json", "cypher"]
}
},
"required": []
}
),
Tool(
name="get_statistics",
description=(
"获取知识图谱的详细统计信息。包括节点数量、关系数量、Episode 统计等。"
"当用户想要了解知识图谱的整体情况时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="import_graph_data",
description=(
"导入知识图谱数据。支持从 JSON 格式导入。"
"当用户想要恢复备份数据或迁移数据时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"data": {
"type": "object",
"description": "要导入的数据(JSON 格式,包含 nodes 和 relationships)"
},
"format": {
"type": "string",
"description": "数据格式(默认 json)",
"default": "json"
}
},
"required": ["data"]
}
),
# ========== 实用工具 ==========
Tool(
name="validate_data",
description=(
"验证知识图谱数据的完整性。检查孤立节点、重复节点、数据完整性等问题。"
"当用户想要检查数据质量或排查问题时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"check_orphaned": {
"type": "boolean",
"description": "检查孤立节点",
"default": True
},
"check_duplicates": {
"type": "boolean",
"description": "检查重复节点",
"default": True
},
"check_integrity": {
"type": "boolean",
"description": "检查数据完整性",
"default": True
}
},
"required": []
}
),
Tool(
name="clean_orphaned_nodes",
description=(
"清理知识图谱中的孤立节点(没有关系的节点)。"
"当用户想要清理无用数据时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"confirm": {
"type": "boolean",
"description": "确认清理孤立节点(必须为 true 才能执行)",
"default": False
},
"node_types": {
"type": "array",
"items": {"type": "string"},
"description": "要清理的节点类型列表(可选,如果不提供则清理所有类型)"
}
},
"required": ["confirm"]
}
),
Tool(
name="rebuild_indexes",
description=(
"重建 Neo4j 数据库索引以优化查询性能。"
"当用户想要优化数据库性能时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"index_types": {
"type": "array",
"items": {"type": "string"},
"description": "要重建的索引类型(可选,如果不提供则重建所有索引)"
}
},
"required": []
}
),
Tool(
name="get_cache_stats",
description=(
"获取缓存统计信息。包括缓存条目数量、有效条目、过期条目等。"
"当用户想要了解缓存使用情况时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
# ========== ACE 策略管理工具 ==========
Tool(
name="query_strategies",
description=(
"查询 ACE 框架学习到的工具调用策略。"
"当用户想要查看系统学到了什么策略时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"tool_name": {
"type": "string",
"description": "工具名称(可选,用于筛选特定工具的策略)"
},
"limit": {
"type": "integer",
"description": "返回结果数量限制",
"default": 20
}
},
"required": []
}
),
Tool(
name="list_strategy_versions",
description=(
"列出指定策略的历史版本(含成功率、使用次数、是否为最新等信息)。"
"当用户需要比较不同版本或排查回归时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"tool_name": {
"type": "string",
"description": "工具名称"
},
"arguments_hash": {
"type": "string",
"description": "参数哈希(可选,用于精确定位策略)"
},
"limit": {
"type": "integer",
"description": "返回版本数量限制(默认 10)",
"default": 10
}
},
"required": ["tool_name"]
}
),
Tool(
name="get_learning_trends",
description=(
"获取 ACE 学习效果趋势(按天聚合成功率、使用次数、平均评分等)。"
"当用户想评估某个工具的命中率走势或最近表现时使用。"
),
inputSchema={
"type": "object",
"properties": {
"tool_name": {
"type": "string",
"description": "工具名称(可选)"
},
"arguments_hash": {
"type": "string",
"description": "参数哈希(可选)"
},
"days": {
"type": "integer",
"description": "回溯天数(默认 30,最大 180)",
"default": 30
}
},
"required": []
}
),
Tool(
name="bulk_update_strategies",
description=(
"批量启用/禁用/删除 ACE 策略,可按成功率、使用次数和标签等条件筛选。"
"适合在发现某类策略效果不佳时进行快速止损或整体调整。"
),
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["enable", "disable", "delete"],
"description": "操作类型:enable/disable/delete"
},
"tool_name": {"type": "string"},
"success_rate_min": {"type": "number"},
"success_rate_max": {"type": "number"},
"usage_min": {"type": "integer"},
"usage_max": {"type": "integer"},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "包含任意一个标签即匹配"
},
"enabled": {
"type": "boolean",
"description": "仅匹配已启用/禁用策略"
},
"limit": {
"type": "integer",
"description": "最多影响的策略数量(默认100,最大1000)",
"default": 100
}
},
"required": ["action"]
}
),
Tool(
name="bulk_export_strategies",
description=(
"按过滤条件批量导出 ACE 策略,支持写入 JSON 文件或直接返回数据。"
"适合做离线审核、备份或迁移 Skillbook。"
),
inputSchema={
"type": "object",
"properties": {
"tool_name": {"type": "string"},
"success_rate_min": {"type": "number"},
"success_rate_max": {"type": "number"},
"usage_min": {"type": "integer"},
"usage_max": {"type": "integer"},
"tags": {
"type": "array",
"items": {"type": "string"}
},
"enabled": {"type": "boolean"},
"limit": {
"type": "integer",
"description": "导出数量上限(默认500,最大5000)",
"default": 500
},
"file_path": {
"type": "string",
"description": "导出文件路径(可选,不填则只返回预览数据)"
}
},
"required": []
}
),
Tool(
name="get_strategy_alerts",
description=(
"获取 ACE 策略告警信息(基于最近 N 天的失败次数和使用情况)。"
"当用户想要排查哪些策略近期频繁失败时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"failure_threshold": {
"type": "integer",
"description": "在时间窗口内触发告警的最小失败次数(默认5)",
"default": 5
},
"days": {
"type": "integer",
"description": "回溯天数窗口(默认1天,最大30天)",
"default": 1
},
"tool_name": {
"type": "string",
"description": "工具名称(可选,仅查看某个工具的告警)"
},
"arguments_hash": {
"type": "string",
"description": "参数哈希(可选,用于精确定位某个策略)"
}
},
"required": []
}
),
Tool(
name="get_strategy_stats",
description=(
"获取 ACE 策略的统计信息。包括策略数量、平均成功率、总使用次数等。"
"当用户想要了解系统学习效果时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"tool_name": {
"type": "string",
"description": "工具名称(可选,用于筛选特定工具的统计)"
}
},
"required": []
}
),
Tool(
name="rate_result",
description=(
"对工具执行结果进行评分和反馈。这有助于 ACE 框架学习哪些策略更有效。"
"当用户对工具执行结果满意或不满意时,应该主动询问用户是否愿意提供反馈。"
),
inputSchema={
"type": "object",
"properties": {
"tool_name": {
"type": "string",
"description": "工具名称"
},
"rating": {
"type": "integer",
"description": "评分(1-5,1=很差,5=很好)",
"minimum": 1,
"maximum": 5
},
"feedback": {
"type": "string",
"description": "反馈意见(可选)"
},
"context": {
"type": "object",
"description": "上下文信息(可选,用于关联特定的工具调用)"
}
},
"required": ["tool_name", "rating"]
}
),
Tool(
name="export_strategies",
description=(
"导出 ACE 策略到 JSON 文件。用于备份策略或在不同环境间迁移。"
"当用户想要备份策略或迁移策略时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"tool_name": {
"type": "string",
"description": "工具名称(可选,用于筛选特定工具的策略)"
},
"file_path": {
"type": "string",
"description": "导出文件路径(可选,默认使用配置目录)"
}
},
"required": []
}
),
Tool(
name="import_strategies",
description=(
"从 JSON 文件导入 ACE 策略。用于恢复备份或迁移策略。"
"当用户想要恢复策略或从其他环境导入策略时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "导入文件路径"
},
"overwrite": {
"type": "boolean",
"description": "是否覆盖现有策略(默认 false)",
"default": False
}
},
"required": ["file_path"]
}
),
Tool(
name="toggle_strategy",
description=(
"启用或禁用 ACE 策略。用于临时禁用效果不佳的策略或重新启用已禁用的策略。"
"当用户想要管理策略的启用状态时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {
"tool_name": {
"type": "string",
"description": "工具名称"
},
"arguments_hash": {
"type": "string",
"description": "参数哈希(可选,如果不提供则操作该工具的所有策略)"
},
"enabled": {
"type": "boolean",
"description": "是否启用(true=启用,false=禁用)"
}
},
"required": ["tool_name", "enabled"]
}
),
Tool(
name="validate_strategies",
description=(
"验证 ACE 策略的健康状态。检查策略数据完整性、统计策略状态、识别数据问题等。"
"当用户想要检查策略数据质量或排查问题时,应该调用此工具。"
),
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="render_strategy_insights",
description=(
"渲染 ACE 策略热力图洞察。基于 `graphitiace://strategy-heatmap` 资源生成 Mermaid/ASCII 可视化,"
"帮助用户快速了解不同工具在各成功率区间的表现。当用户提到策略可视化、热力图或图表时调用。"
),
inputSchema={
"type": "object",
"properties": {
"format": {
"type": "string",
"description": "可视化格式:auto(默认)、mermaid、ascii",
"default": "auto"
},
"top_n": {
"type": "integer",
"description": "包含的分组数量(1-100)",
"default": 15,
"minimum": 1,
"maximum": 100
},
"group_by": {
"type": "string",
"description": "汇总方式:tool(默认)或 bucket",
"default": "tool"
}
},
"required": []
}
),
]
return _tools_cache
async def handle_tool_call(
tool_name: str,
arguments: Dict[str, Any],
config_manager: ConfigManager,
graphiti_client: Optional[Any] = None,
ace_manager: Optional[Any] = None
) -> List[TextContent]:
"""
处理工具调用,集成 ACE 框架进行学习和优化.
Args:
tool_name: 工具名称
arguments: 工具参数
config_manager: 配置管理器
graphiti_client: Graphiti 客户端(可选)
ace_manager: ACE 管理器(可选)
Returns:
工具执行结果
"""
result_text = ""
original_arguments = arguments.copy()
# 1. 使用 ACE Generator 优化工具调用策略
if ace_manager and ace_manager.is_enabled():
try:
strategy = ace_manager.generate_tool_strategy(
user_query=str(arguments.get('query', arguments.get('content', ''))),
tool_name=tool_name,
context=arguments
)
if strategy and strategy.get('optimized_arguments'):
# 应用优化后的参数
optimized = strategy.get('optimized_arguments', {})
# 合并优化参数,保留原有参数
arguments = {**arguments, **optimized}
logger.debug(f"ACE 优化了工具调用: {tool_name}")
except Exception as e:
logger.warning(f"ACE 策略生成失败: {e}")
try:
if tool_name == "configure_neo4j":
input_data = ConfigureNeo4jInput(**arguments)
result = config_manager.configure_neo4j(
uri=input_data.uri,
username=input_data.username,
password=input_data.password,
database=input_data.database
)
result_text = f"✅ {result['message']}\n\n配置详情:\n{json.dumps(result.get('config', {}), indent=2, ensure_ascii=False)}"
# 配置 Neo4j 后,尝试连接 Graphiti 客户端
# 注意:configure_neo4j 会保存配置,从而更新 config_manager 中的 _last_mtime
# graphiti_client.connect() 会读取 config_manager 中的配置,所以能获取到最新配置
if result['success'] and graphiti_client is not None:
if graphiti_client.connect():
result_text += "\n\n✅ Graphiti 客户端已成功连接到 Neo4j 数据库。"
else:
result_text += "\n\n⚠️ 警告:Neo4j 配置已保存,但连接测试失败。请检查数据库是否运行。"
elif tool_name == "configure_api":
input_data = ConfigureAPIInput(**arguments)
result = config_manager.configure_api(
provider=input_data.provider,
api_key=input_data.api_key,
model=input_data.model
)
if result['success']:
if input_data.api_key:
result_text = f"✅ {result['message']}\n\nAPI 配置已保存。"
else:
result_text = f"✅ {result['message']}\n\n注意:未提供 API key,系统将使用 Cursor 的内置 AI 能力。"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "check_configuration":
status = config_manager.get_config_status()
result_text = "📋 当前配置状态:\n\n"
result_text += f"Neo4j 已配置: {'✅' if status['neo4j_configured'] else '❌'}\n"
result_text += f"API 已配置: {'✅' if status['api_configured'] else '❌ (将使用 Cursor 内置 AI)'}\n"
result_text += f"组 ID: {status['group_id']}\n"
result_text += f"配置文件路径: {status['config_path']}\n\n"
if status.get('neo4j'):
result_text += f"Neo4j 配置:\n{json.dumps(status['neo4j'], indent=2, ensure_ascii=False)}\n\n"
if status.get('api'):
result_text += f"API 配置:\n{json.dumps(status['api'], indent=2, ensure_ascii=False)}\n"
elif tool_name == "reset_configuration":
result = config_manager.reset_config()
result_text = f"✅ {result['message']}"
elif tool_name == "set_group_id":
input_data = SetGroupIdInput(**arguments)
result = config_manager.set_group_id(input_data.group_id)
result_text = f"✅ {result['message']}"
elif tool_name == "add_episode":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
input_data = AddEpisodeInput(**arguments)
result = await graphiti_client.add_episode(
content=input_data.content,
metadata=input_data.metadata,
saga=input_data.saga,
saga_previous_episode_uuid=input_data.saga_previous_episode_uuid,
custom_extraction_instructions=input_data.custom_extraction_instructions
)
if result['success']:
result_text = f"✅ {result['message']}\n\n已记录内容:\n{input_data.content}"
if input_data.metadata:
result_text += f"\n\n元数据:{json.dumps(input_data.metadata, indent=2, ensure_ascii=False)}"
# 显示 saga 信息
if input_data.saga:
result_text += f"\n\nSaga:{input_data.saga}"
# 显示 episode UUID(用于后续 saga 连接)
if result.get('episode_uuid'):
result_text += f"\n\nEpisode UUID:{result['episode_uuid']}(可用于后续 saga 连接)"
# 如果没有进行实体抽取,提示 Cursor AI 可以手动完成
if not result.get('entities_extracted', False) and result.get('suggestion'):
result_text += f"\n\n💡 {result['suggestion']}"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "search_entities":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
input_data = SearchEntitiesInput(**arguments)
result = graphiti_client.search_entities(
query=input_data.query,
entity_type=input_data.entity_type,
limit=input_data.limit
)
if result['success']:
result_text = f"✅ {result['message']}\n\n"
if result['results']:
result_text += f"找到 {len(result['results'])} 个实体:\n\n"
for i, entity in enumerate(result['results'], 1):
result_text += f"{i}. {json.dumps(entity, indent=2, ensure_ascii=False, default=str)}\n\n"
else:
result_text += "未找到匹配的实体。"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "search_relationships":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
input_data = SearchRelationshipsInput(**arguments)
result = graphiti_client.search_relationships(
query=input_data.query,
limit=input_data.limit
)
if result['success']:
result_text = f"✅ {result['message']}\n\n"
if result['results']:
result_text += f"找到 {len(result['results'])} 个关系:\n\n"
# 导入 _neo4j_to_dict 进行额外转换
from .graphiti_client import _neo4j_to_dict
for i, rel in enumerate(result['results'], 1):
# 再次转换,确保所有 DateTime 对象都被处理
rel_converted = _neo4j_to_dict(rel)
result_text += f"{i}. {json.dumps(rel_converted, indent=2, ensure_ascii=False, default=str)}\n\n"
else:
result_text += "未找到匹配的关系。"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "query_knowledge_graph":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
input_data = QueryKnowledgeGraphInput(**arguments)
result = graphiti_client.query_knowledge_graph(
cypher_query=input_data.query,
limit=input_data.limit
)
if result['success']:
result_text = f"✅ {result['message']}\n\n"
if result['results']:
result_text += f"查询结果({len(result['results'])} 条):\n\n"
for i, record in enumerate(result['results'], 1):
result_text += f"{i}. {json.dumps(record, indent=2, ensure_ascii=False, default=str)}\n\n"
else:
result_text += "查询未返回结果。"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "delete_episode":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
input_data = DeleteEpisodeInput(**arguments)
result = graphiti_client.delete_episode(
episode_id=input_data.episode_id,
content=input_data.content,
delete_all=input_data.delete_all
)
if result['success']:
result_text = f"✅ {result['message']}"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "clear_graph":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
input_data = ClearGraphInput(**arguments)
if not input_data.confirm:
result_text = "❌ 错误:必须设置 confirm=true 才能清空图谱。"
else:
result = graphiti_client.clear_graph()
if result['success']:
result_text = f"✅ {result['message']}"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "query_by_time_range":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
input_data = QueryByTimeRangeInput(**arguments)
result = graphiti_client.query_by_time_range(
start_time=input_data.start_time,
end_time=input_data.end_time,
days=input_data.days,
entity_type=input_data.entity_type,
limit=input_data.limit
)
if result['success']:
result_text = f"✅ {result['message']}\n\n"
if result['results']:
result_text += f"找到 {len(result['results'])} 个节点:\n\n"
for i, entity in enumerate(result['results'], 1):
result_text += f"{i}. {json.dumps(entity, indent=2, ensure_ascii=False, default=str)}\n\n"
else:
result_text += "在指定时间范围内未找到节点。"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "semantic_search":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
input_data = SemanticSearchInput(**arguments)
result = await graphiti_client.semantic_search(
query=input_data.query,
num_results=input_data.num_results,
center_node_uuid=input_data.center_node_uuid
)
if result['success']:
search_type = result.get('search_type', 'basic')
result_text = f"✅ {result['message']}\n\n"
# 根据搜索类型显示不同的提示
if search_type == 'semantic':
result_text += "🔍 使用向量搜索(语义理解)\n\n"
elif search_type == 'enhanced_keyword':
result_text += "🔍 使用增强关键词搜索\n"
result_text += "💡 提示:配置 API key 后可启用向量搜索,获得更好的语义理解效果\n\n"
else:
result_text += "🔍 使用关键词搜索\n\n"
if result['results']:
if search_type == 'semantic':
result_text += f"找到 {len(result['results'])} 个相关关系:\n\n"
for i, rel in enumerate(result['results'], 1):
result_text += f"{i}. {rel.get('from_node', {}).get('name', 'Unknown')} "
result_text += f"-[{rel.get('relationship', 'RELATED_TO')}]-> "
result_text += f"{rel.get('to_node', {}).get('name', 'Unknown')}\n"
if rel.get('description'):
result_text += f" 描述: {rel['description']}\n"
result_text += "\n"
else:
result_text += f"找到 {len(result['results'])} 个结果:\n\n"
for i, entity in enumerate(result['results'], 1):
content = entity.get('content', '')
if content:
result_text += f"{i}. {content[:100]}{'...' if len(content) > 100 else ''}\n\n"
else:
result_text += f"{i}. {json.dumps(entity, indent=2, ensure_ascii=False)}\n\n"
else:
result_text += "未找到匹配的结果。"
if search_type != 'semantic':
result_text += "\n💡 提示:可以尝试使用不同的关键词,或配置 API key 启用向量搜索。"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "add_episodes_bulk":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
input_data = BulkEpisodeInput(**arguments)
result = await graphiti_client.add_episodes_bulk(
episodes=input_data.episodes
)
if result['success']:
result_text = f"✅ {result['message']}\n\n"
result_text += f"总计: {result['total']} 个\n"
result_text += f"成功: {result['success_count']} 个\n"
result_text += f"失败: {result['fail_count']} 个\n"
if result['fail_count'] > 0:
result_text += "\n失败的 Episode:\n"
for i, r in enumerate(result['results'], 1):
if not r.get('success'):
result_text += f"{i}. {r.get('message', '未知错误')}\n"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "health_check":
from .health_check import health_check
import asyncio
# 在线程池中运行同步的健康检查,避免阻塞事件循环
# 传递 config_manager 避免创建新实例连接真实数据库
health_result = await asyncio.to_thread(health_check, config_manager)
status_emoji = {
"healthy": "✅",
"degraded": "⚠️",
"unhealthy": "❌"
}
emoji = status_emoji.get(health_result['status'], "❓")
result_text = f"{emoji} 系统健康状态:{health_result['status'].upper()}\n\n"
result_text += f"检查时间:{health_result['timestamp']}\n\n"
for check_name, check_result in health_result['checks'].items():
check_emoji = {
"ok": "✅",
"warning": "⚠️",
"error": "❌"
}.get(check_result.get('status', 'unknown'), "❓")
result_text += f"{check_emoji} {check_name}: {check_result.get('status', 'unknown')}\n"
if check_result.get('message'):
result_text += f" {check_result['message']}\n"
result_text += "\n"
if health_result['status'] == "unhealthy":
result_text += "💡 建议:请检查 Neo4j 数据库连接和配置。"
elif health_result['status'] == "degraded":
result_text += "💡 提示:系统可以运行,但部分功能可能受限。"
elif tool_name == "export_graph_data":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
input_data = ExportGraphDataInput(**arguments)
result = graphiti_client.export_graph_data(format=input_data.format)
if result['success']:
result_text = f"✅ {result['message']}\n\n"
if input_data.format == "json":
data = result['data']
result_text += f"导出数据(JSON 格式):\n"
result_text += json.dumps(data, indent=2, ensure_ascii=False, default=str)
else:
result_text += f"导出数据(Cypher 格式):\n\n{result['data']}"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "get_statistics":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
result = graphiti_client.get_statistics()
if result['success']:
stats = result['statistics']
result_text = f"✅ {result['message']}\n\n"
result_text += "📊 知识图谱统计信息:\n\n"
result_text += f"节点总数: {stats['nodes']['total']}\n"
result_text += f"关系总数: {stats['relationships']['total']}\n"
result_text += f"Episode 总数: {stats['episodes']['total']}\n\n"
if stats['nodes']['by_type']:
result_text += "节点类型分布:\n"
for node_type, count in stats['nodes']['by_type'].items():
result_text += f" - {node_type}: {count}\n"
result_text += "\n"
if stats['relationships']['by_type']:
result_text += "关系类型分布:\n"
for rel_type, count in stats['relationships']['by_type'].items():
result_text += f" - {rel_type}: {count}\n"
result_text += "\n"
if stats['episodes']['first_episode']:
result_text += f"第一个 Episode: {stats['episodes']['first_episode']}\n"
if stats['episodes']['last_episode']:
result_text += f"最后一个 Episode: {stats['episodes']['last_episode']}\n"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "import_graph_data":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
input_data = ImportGraphDataInput(**arguments)
result = graphiti_client.import_graph_data(
data=input_data.data,
format=input_data.format
)
if result['success']:
result_text = f"✅ {result['message']}\n\n"
result_text += f"导入节点: {result.get('imported_nodes', 0)} 个\n"
result_text += f"导入关系: {result.get('imported_relationships', 0)} 个"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "validate_data":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
input_data = ValidateDataInput(**arguments)
result = graphiti_client.validate_data(
check_orphaned=input_data.check_orphaned,
check_duplicates=input_data.check_duplicates,
check_integrity=input_data.check_integrity
)
if result['success']:
result_text = f"✅ {result['message']}\n\n"
if result.get('issues'):
result_text += "发现的问题:\n"
for issue in result['issues']:
result_text += f"- {issue}\n"
else:
result_text += "✅ 未发现任何问题,数据完整性良好。"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "clean_orphaned_nodes":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
input_data = CleanOrphanedNodesInput(**arguments)
if not input_data.confirm:
result_text = "❌ 请确认清理操作(设置 confirm=true)"
else:
result = graphiti_client.clean_orphaned_nodes(
node_types=input_data.node_types
)
if result['success']:
result_text = f"✅ {result['message']}\n\n"
result_text += f"已清理 {result.get('deleted_count', 0)} 个孤立节点"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "rebuild_indexes":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
else:
input_data = RebuildIndexesInput(**arguments)
result = graphiti_client.rebuild_indexes(
index_types=input_data.index_types
)
if result['success']:
result_text = f"✅ {result['message']}\n\n"
if result.get('indexes'):
result_text += "已检查的索引:\n"
for idx in result['indexes']:
result_text += f"- {idx}\n"
else:
result_text = f"❌ {result['message']}"
elif tool_name == "get_cache_stats":
from .cache_manager import get_cache_manager
cache = get_cache_manager()
stats = cache.get_stats()
result_text = "📊 缓存统计信息:\n\n"
result_text += f"总条目数: {stats['total_entries']}\n"
result_text += f"有效条目: {stats['valid_entries']}\n"
result_text += f"过期条目: {stats['expired_entries']}\n"
result_text += f"默认 TTL: {stats['default_ttl']} 秒\n"
elif tool_name == "query_strategies":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
elif ace_manager is None or not ace_manager.is_enabled():
result_text = "❌ 错误:ACE Manager 未启用。请确保已安装 ace-framework 并配置了 API key。"
else:
input_data = QueryStrategiesInput(**arguments)
strategies = ace_manager.query_strategies(
tool_name=input_data.tool_name,
limit=input_data.limit
)
if strategies:
result_text = f"✅ 找到 {len(strategies)} 个策略:\n\n"
for i, strategy in enumerate(strategies, 1):
result_text += f"{i}. 工具: {strategy.get('tool_name', 'N/A')}\n"
result_text += f" 成功率: {strategy.get('success_rate', 0):.2%}\n"
result_text += f" 使用次数: {strategy.get('usage_count', 0)}\n"
result_text += f" 成功次数: {strategy.get('success_count', 0)}\n"
result_text += f" 失败次数: {strategy.get('failure_count', 0)}\n"
content = strategy.get('content', '')[:200]
if content:
result_text += f" 策略内容: {content}...\n"
result_text += "\n"
else:
result_text = "ℹ️ 暂无策略记录。系统会随着使用自动学习策略。"
elif tool_name == "list_strategy_versions":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
elif ace_manager is None or not ace_manager.is_enabled():
result_text = "❌ 错误:ACE Manager 未启用。请确保已安装 ace-framework 并配置了 API key。"
else:
input_data = ListStrategyVersionsInput(**arguments)
versions = ace_manager.list_strategy_versions(
tool_name=input_data.tool_name,
arguments_hash=input_data.arguments_hash,
limit=input_data.limit
)
if versions:
header = f"📚 {input_data.tool_name} 策略版本(最多 {len(versions)} 条):\n\n"
lines = [header]
for version in versions:
tag = " (最新)" if version.get("is_latest") else ""
lines.append(f"v{version.get('version', 1)}{tag}")
lines.append(f" 成功率: {version.get('success_rate', 0.0):.2%} | 使用次数: {version.get('usage_count', 0)}")
lines.append(f" 成功/失败: {version.get('success_count', 0)}/{version.get('failure_count', 0)}")
if version.get("arguments_hash"):
lines.append(f" 参数哈希: {version['arguments_hash']}")
if version.get("updated_at"):
lines.append(f" 更新时间: {version['updated_at']}")
lines.append("")
result_text = "\n".join(lines).strip()
else:
result_text = "ℹ️ 暂无历史版本记录。"
elif tool_name == "get_learning_trends":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
elif ace_manager is None or not ace_manager.is_enabled():
result_text = "❌ 错误:ACE Manager 未启用。请确保已安装 ace-framework 并配置了 API key。"
else:
input_data = GetLearningTrendsInput(**arguments)
trends = ace_manager.get_learning_trends(
tool_name=input_data.tool_name,
arguments_hash=input_data.arguments_hash,
days=input_data.days
)
if not trends:
result_text = "ℹ️ 暂无学习趋势数据,请先运行相关工具。"
else:
heading = input_data.tool_name or "所有工具"
result_lines = [
f"📈 {heading} 学习趋势(过去 {input_data.days} 天)\n"
]
for entry in trends:
rate = entry.get("success_rate", 0.0)
usage = entry.get("usage_count", 0)
success = entry.get("success_count", 0)
failure = entry.get("failure_count", 0)
avg_rating = entry.get("avg_rating")
line = f"- {entry.get('date')}: 成功率 {rate:.1%},使用 {usage:.1f} 次,成功/失败 {success:.1f}/{failure:.1f}"
if avg_rating is not None:
line += f",平均评分 {avg_rating:.2f}"
result_lines.append(line)
result_text = "\n".join(result_lines)
elif tool_name == "bulk_update_strategies":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
elif ace_manager is None or not ace_manager.is_enabled():
result_text = "❌ 错误:ACE Manager 未启用。请确保已安装 ace-framework 并配置了 API key。"
else:
input_data = BulkUpdateStrategiesInput(**arguments)
outcome = ace_manager.bulk_update_strategies(
action=input_data.action,
tool_name=input_data.tool_name,
success_rate_min=input_data.success_rate_min,
success_rate_max=input_data.success_rate_max,
usage_min=input_data.usage_min,
usage_max=input_data.usage_max,
tags=input_data.tags,
enabled_filter=input_data.enabled,
limit=input_data.limit,
)
if not outcome:
result_text = "ℹ️ 未执行任何批量操作,可能是未匹配到策略或数据库未连接。"
else:
action_map = {"enable": "启用", "disable": "禁用", "delete": "删除"}
zh_action = action_map.get(outcome["action"], outcome["action"])
result_text = (
f"✅ 已批量{zh_action}策略:\n\n"
f"- 影响的策略数: {outcome.get('affected', 0)}\n"
f"- 上限: {outcome.get('limit', input_data.limit)}\n"
)
elif tool_name == "bulk_export_strategies":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
elif ace_manager is None or not ace_manager.is_enabled():
result_text = "❌ 错误:ACE Manager 未启用。请确保已安装 ace-framework 并配置了 API key。"
else:
input_data = BulkExportStrategiesInput(**arguments)
exported = ace_manager.bulk_export_strategies(
tool_name=input_data.tool_name,
success_rate_min=input_data.success_rate_min,
success_rate_max=input_data.success_rate_max,
usage_min=input_data.usage_min,
usage_max=input_data.usage_max,
tags=input_data.tags,
enabled_filter=input_data.enabled,
limit=input_data.limit,
file_path=input_data.file_path,
)
if not exported:
result_text = "ℹ️ 未导出任何策略。"
else:
if exported.get("file_path"):
result_text = (
f"✅ 已导出 {exported['count']} 个策略到文件:\n"
f"- 路径: {exported['file_path']}"
)
else:
count = exported.get("count", 0)
preview = exported.get("strategies", [])[:3]
lines = [f"✅ 匹配 {count} 个策略(以下为前 {len(preview)} 个预览):\n"]
for idx, s in enumerate(preview, 1):
lines.append(
f"{idx}. 工具: {s.get('tool_name', 'N/A')} | "
f"成功率: {s.get('success_rate', 0.0):.1%} | "
f"使用次数: {s.get('usage_count', 0)}"
)
result_text = "\n".join(lines)
elif tool_name == "get_strategy_alerts":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
elif ace_manager is None or not ace_manager.is_enabled():
result_text = "❌ 错误:ACE Manager 未启用。请确保已安装 ace-framework 并配置了 API key。"
else:
input_data = GetStrategyAlertsInput(**arguments)
alerts = ace_manager.get_strategy_alerts(
failure_threshold=input_data.failure_threshold,
days=input_data.days,
tool_name=input_data.tool_name,
arguments_hash=input_data.arguments_hash,
)
if not alerts:
result_text = "ℹ️ 近期未发现策略告警,一切正常。"
else:
lines = [
f"🚨 策略告警(最近 {input_data.days} 天,失败次数 ≥ {input_data.failure_threshold}):\n"
]
for alert in alerts:
lines.append(
f"- 工具: {alert.get('tool_name', 'N/A')} | "
f"失败次数: {int(alert.get('failures', 0))} / 使用: {int(alert.get('usage', 0))} "
f"({alert.get('failure_rate', 0.0):.1%}) | "
f"最近日期: {alert.get('last_date', '未知')} | "
f"级别: {alert.get('severity', 'info')}"
)
if alert.get("arguments_hash"):
lines.append(f" 参数哈希: {alert['arguments_hash']}")
result_text = "\n".join(lines)
elif tool_name == "get_strategy_stats":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
elif ace_manager is None or not ace_manager.is_enabled():
result_text = "❌ 错误:ACE Manager 未启用。请确保已安装 ace-framework 并配置了 API key。"
else:
input_data = GetStrategyStatsInput(**arguments)
stats = ace_manager.get_strategy_stats(tool_name=input_data.tool_name)
if stats:
result_text = "📊 ACE 策略统计信息:\n\n"
result_text += f"总策略数: {stats.get('total_strategies', 0)}\n"
result_text += f"平均成功率: {stats.get('avg_success_rate', 0):.2%}\n"
result_text += f"总使用次数: {stats.get('total_usage', 0)}\n"
result_text += f"总成功次数: {stats.get('total_success', 0)}\n"
result_text += f"总失败次数: {stats.get('total_failure', 0)}\n"
if stats.get('by_tool'):
result_text += "\n按工具分类:\n"
for tool_name, tool_stats in stats['by_tool'].items():
result_text += f" - {tool_name}:\n"
result_text += f" 策略数: {tool_stats.get('count', 0)}\n"
result_text += f" 平均成功率: {tool_stats.get('avg_success_rate', 0):.2%}\n"
result_text += f" 使用次数: {tool_stats.get('usage', 0)}\n"
else:
result_text = "ℹ️ 暂无策略统计信息。系统会随着使用自动学习策略。"
elif tool_name == "rate_result":
if ace_manager is None or not ace_manager.is_enabled():
result_text = "❌ 错误:ACE Manager 未启用。请确保已安装 ace-framework 并配置了 API key。"
else:
input_data = RateResultInput(**arguments)
result = ace_manager.rate_result(
tool_name=input_data.tool_name,
rating=input_data.rating,
feedback=input_data.feedback,
context=input_data.context
)
if result:
result_text = f"✅ 感谢您的反馈!\n\n"
result_text += f"工具: {input_data.tool_name}\n"
result_text += f"评分: {'⭐' * input_data.rating} ({input_data.rating}/5)\n"
if input_data.feedback:
result_text += f"反馈: {input_data.feedback}\n"
result_text += "\n您的反馈将帮助系统改进工具调用策略。"
else:
result_text = "❌ 保存反馈失败。请检查系统配置。"
elif tool_name == "export_strategies":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
elif ace_manager is None or not ace_manager.is_enabled():
result_text = "❌ 错误:ACE Manager 未启用。请确保已安装 ace-framework 并配置了 API key。"
else:
input_data = ExportStrategiesInput(**arguments)
result = ace_manager.export_strategies(
tool_name=input_data.tool_name,
file_path=input_data.file_path
)
if result:
result_text = f"✅ 策略导出成功!\n\n"
result_text += f"文件路径: {result.get('file_path', 'N/A')}\n"
result_text += f"导出策略数: {result.get('count', 0)}\n"
result_text += f"文件大小: {result.get('size', 0)} 字节"
else:
result_text = "❌ 策略导出失败。请检查系统配置和文件路径。"
elif tool_name == "import_strategies":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
elif ace_manager is None or not ace_manager.is_enabled():
result_text = "❌ 错误:ACE Manager 未启用。请确保已安装 ace-framework 并配置了 API key。"
else:
input_data = ImportStrategiesInput(**arguments)
result = ace_manager.import_strategies(
file_path=input_data.file_path,
overwrite=input_data.overwrite
)
if result:
result_text = f"✅ 策略导入成功!\n\n"
result_text += f"导入策略数: {result.get('count', 0)}\n"
result_text += f"覆盖策略数: {result.get('overwritten', 0)}\n"
result_text += f"新增策略数: {result.get('created', 0)}"
if result.get('errors'):
result_text += f"\n\n⚠️ 导入错误: {len(result['errors'])} 个"
else:
result_text = "❌ 策略导入失败。请检查文件路径和格式。"
elif tool_name == "toggle_strategy":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
elif ace_manager is None or not ace_manager.is_enabled():
result_text = "❌ 错误:ACE Manager 未启用。请确保已安装 ace-framework 并配置了 API key。"
else:
input_data = ToggleStrategyInput(**arguments)
result = ace_manager.toggle_strategy(
tool_name=input_data.tool_name,
arguments_hash=input_data.arguments_hash,
enabled=input_data.enabled
)
if result:
action = "启用" if input_data.enabled else "禁用"
result_text = f"✅ 策略已{action}!\n\n"
result_text += f"工具: {input_data.tool_name}\n"
if input_data.arguments_hash:
result_text += f"参数哈希: {input_data.arguments_hash}\n"
result_text += f"操作: {action}\n"
result_text += f"影响的策略数: {result.get('count', 0)}"
else:
result_text = "❌ 策略操作失败。请检查参数和系统配置。"
elif tool_name == "validate_strategies":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
elif ace_manager is None or not ace_manager.is_enabled():
result_text = "❌ 错误:ACE Manager 未启用。请确保已安装 ace-framework 并配置了 API key。"
else:
result = ace_manager.validate_strategies()
if result:
if result.get("valid", False):
result_text = "✅ 策略验证通过!\n\n"
else:
result_text = "⚠️ 策略验证发现问题:\n\n"
result_text += f"总策略数: {result.get('total', 0)}\n"
result_text += f"已启用: {result.get('enabled', 0)}\n"
result_text += f"已禁用: {result.get('disabled', 0)}\n"
result_text += f"平均成功率: {result.get('avg_success_rate', 0.0):.2%}\n"
if result.get("issues"):
result_text += f"\n⚠️ 发现的问题:\n"
for issue in result.get("issues", []):
result_text += f" - {issue}\n"
if result.get("error"):
result_text += f"\n❌ 错误: {result.get('error')}"
else:
result_text = "❌ 策略验证失败。请检查系统配置。"
elif tool_name == "render_strategy_insights":
if graphiti_client is None:
result_text = "❌ 错误:Graphiti 客户端未初始化。请先配置 Neo4j 数据库。"
elif ace_manager is None or not ace_manager.is_enabled():
result_text = "❌ 错误:ACE Manager 未启用。请确保已安装 ace-framework 并完成配置。"
else:
input_data = RenderStrategyInsightsInput(**arguments)
heatmap = ace_manager.get_strategy_heatmap(
limit=input_data.top_n,
group_by=input_data.group_by
)
if not heatmap or not heatmap.get("success"):
message = (heatmap or {}).get("message", "暂无策略热力图数据。")
result_text = f"ℹ️ {message}"
else:
entries = heatmap.get("entries", [])
if not entries:
result_text = "ℹ️ 暂无策略数据。请先运行几次工具以便 ACE 学习。"
else:
summary = heatmap.get("summary", {})
total_usage = summary.get(
"total_usage",
sum(entry.get("total_usage", 0) for entry in entries)
)
total_entries = summary.get("total_entries", len(entries))
unique_tools = summary.get(
"unique_tools",
len({entry.get("tool_name") for entry in entries})
)
format_mode = input_data.format.lower()
if format_mode not in {"auto", "mermaid", "ascii"}:
format_mode = "auto"
if format_mode == "auto":
format_mode = "mermaid" if len(entries) <= 40 else "ascii"
insights = [
"🎯 策略热力图洞察",
f"- 数据时间: {heatmap.get('generated_at', '未知')}",
f"- 分组数量: {total_entries}",
f"- 覆盖工具: {unique_tools}",
f"- 总使用次数: {total_usage}"
]
top_entry = max(entries, key=lambda e: e.get("total_usage", 0))
insights.append(
f"- 最活跃策略: {top_entry.get('tool_name', '未知')} / {top_entry.get('bucket', 'N/A')} "
f"(使用 {top_entry.get('total_usage', 0)} 次,平均成功率 {top_entry.get('avg_success_rate', 0.0):.1%})"
)
viz_block = ""
if format_mode == "mermaid":
viz_lines = ["```mermaid", "pie showData"]
for entry in entries:
label = f"{entry.get('tool_name', '未知')} {entry.get('bucket', '')}".strip()
value = max(0, int(entry.get('total_usage', 0) or 0))
viz_lines.append(f' "{label}" : {value}')
viz_lines.append("```")
viz_block = "\n".join(viz_lines)
else:
header = f"{'Tool':<22} {'Bucket':<10} {'Strategies':<10} {'Usage':<8} {'Avg%':>6}"
separator = "-" * len(header)
rows = [header, separator]
for entry in entries:
rows.append(
f"{entry.get('tool_name', '未知')[:22]:<22} "
f"{entry.get('bucket', 'N/A'):<10} "
f"{entry.get('strategy_count', 0):<10} "
f"{entry.get('total_usage', 0):<8} "
f"{entry.get('avg_success_rate', 0.0) * 100:>5.1f}%"
)
viz_block = "```text\n" + "\n".join(rows) + "\n```"
groups = summary.get("groups")
if groups:
top_groups = ", ".join(
f"{group['name']}({group['total_usage']})"
for group in groups[:3]
)
insights.append(f"- 重点分组: {top_groups}")
result_text = "\n".join(insights) + "\n\n" + viz_block
else:
result_text = f"❌ 未知工具:{tool_name}"
except KeyError as e:
result_text = f"❌ 参数错误:缺少必需的参数 '{str(e)}'。请检查工具参数。"
except ValueError as e:
result_text = f"❌ 参数值错误:{str(e)}。请检查参数格式和类型。"
except Exception as e:
import traceback
error_type = type(e).__name__
result_text = f"❌ 执行工具时出错({error_type}):{str(e)}\n\n"
result_text += "💡 提示:\n"
result_text += "- 请检查 Neo4j 数据库是否运行\n"
result_text += "- 请检查配置是否正确\n"
result_text += "- 如果问题持续,请查看详细错误日志"
# 3. 使用 ACE Reflector 分析结果(异步,不阻塞响应)
if ace_manager and ace_manager.is_enabled():
try:
# 判断执行是否成功并截断文本
success, message_snippet = _summarize_result_text(result_text)
result_dict = {
'success': success,
'message': message_snippet # 限制长度
}
# 异步执行反思(不阻塞响应)
# 使用 asyncio.create_task 在后台执行,避免阻塞响应
try:
# 检查是否有异步版本的反思方法
if hasattr(ace_manager, 'reflect_on_result_async'):
# 使用异步版本(如果存在)
asyncio.create_task(ace_manager.reflect_on_result_async(
tool_name,
original_arguments,
result_dict,
None # user_feedback
))
else:
# 回退到线程方式(兼容同步版本)
import threading
def run_reflection():
try:
ace_manager.reflect_on_result(
tool_name,
original_arguments,
result_dict,
None # user_feedback
)
except Exception as e:
logger.warning(f"ACE 反思执行失败: {e}")
# 在后台线程中执行反思
thread = threading.Thread(target=run_reflection, daemon=True)
thread.start()
except Exception as e:
# 如果异步执行失败,记录警告但不影响响应
logger.warning(f"启动 ACE 反思失败: {e}")
except Exception as e:
logger.warning(f"ACE 反思失败: {e}")
return [TextContent(type="text", text=result_text)]