"""ACEManager 额外覆盖率测试 - 覆盖未测试的分支."""
import json
import os
from datetime import datetime
from pathlib import Path
from unittest.mock import MagicMock, patch, Mock
import pytest
from src.ace_manager import ACEManager
from src.config_manager import ConfigManager
class TestACEManagerAdditionalCoverage:
"""覆盖 ACEManager 中未测试的分支."""
@pytest.fixture
def config_manager(self, temp_config_dir):
return ConfigManager(config_path=temp_config_dir / ".graphitiace" / "config.json")
def _create_manager(self, config_manager, graphiti_client=None):
"""创建禁用 ACE 初始化的 ACEManager."""
with patch.object(ACEManager, "_initialize_ace", lambda self: None):
manager = ACEManager(config_manager, graphiti_client)
manager.enabled = True
manager.ace_agent = MagicMock()
return manager
def _mock_session(self, graphiti_client):
"""为 graphiti_client.driver.session 配置 MagicMock."""
session = MagicMock()
graphiti_client.driver.session.return_value.__enter__.return_value = session
return session
# ----- _initialize_ace 分支测试 -----
def test_initialize_ace_mcp_mode_with_api_key(self, config_manager, temp_config_dir):
"""测试 MCP 模式下的 ACE 初始化(带 API key)。"""
# 配置 API key
api_config_path = temp_config_dir / ".graphitiace" / "config.json"
api_config_path.parent.mkdir(parents=True, exist_ok=True)
with open(api_config_path, 'w', encoding='utf-8') as f:
json.dump({
"api": {
"provider": "openai",
"api_key": "test-key",
"model": "gpt-4"
}
}, f)
with patch("ace.ACELiteLLM") as mock_ace:
mock_ace_instance = MagicMock()
mock_ace.return_value = mock_ace_instance
with patch("sys.stdin.isatty", return_value=False): # MCP 模式
manager = ACEManager(config_manager)
assert manager.enabled is True
assert manager.ace_agent is not None
def test_initialize_ace_non_mcp_mode_without_api_key(self, config_manager):
"""测试非 MCP 模式下无 API key 的初始化。"""
with patch("ace.ACELiteLLM") as mock_ace:
mock_ace_instance = MagicMock()
mock_ace.return_value = mock_ace_instance
with patch("sys.stdin.isatty", return_value=True): # 非 MCP 模式
manager = ACEManager(config_manager)
assert manager.enabled is True
assert manager.ace_agent is not None
def test_initialize_ace_anthropic_provider_sets_env(self, config_manager, temp_config_dir):
"""Anthropic provider 应写入 ANTHROPIC_API_KEY。"""
api_config_path = temp_config_dir / ".graphitiace" / "config.json"
api_config_path.parent.mkdir(parents=True, exist_ok=True)
with open(api_config_path, 'w', encoding='utf-8') as f:
json.dump({
"api": {
"provider": "anthropic",
"api_key": "anthropic-key",
"model": "claude-3"
}
}, f)
with patch("ace.ACELiteLLM") as mock_ace, patch("sys.stdin.isatty", return_value=True), patch.dict(os.environ, {}, clear=False):
mock_ace_instance = MagicMock()
mock_ace.return_value = mock_ace_instance
manager = ACEManager(config_manager)
assert os.environ.get("ANTHROPIC_API_KEY") == "anthropic-key"
assert manager.enabled is True
def test_initialize_ace_openai_provider_non_mcp_sets_env(self, config_manager, temp_config_dir):
"""非 MCP 模式下 OpenAI provider 应写入 OPENAI_API_KEY。"""
api_config_path = temp_config_dir / ".graphitiace" / "config.json"
api_config_path.parent.mkdir(parents=True, exist_ok=True)
with open(api_config_path, 'w', encoding='utf-8') as f:
json.dump({
"api": {
"provider": "openai",
"api_key": "openai-key",
"model": "gpt-4o-mini"
}
}, f)
with patch("ace.ACELiteLLM") as mock_ace, patch("sys.stdin.isatty", return_value=True), patch.dict(os.environ, {}, clear=False):
mock_ace_instance = MagicMock()
mock_ace.return_value = mock_ace_instance
manager = ACEManager(config_manager)
assert os.environ.get("OPENAI_API_KEY") == "openai-key"
assert manager.enabled is True
def test_initialize_ace_anthropic_provider_mcp_mode(self, config_manager, temp_config_dir):
"""MCP 模式下 Anthropic provider 分支也应被覆盖."""
api_config_path = temp_config_dir / ".graphitiace" / "config.json"
api_config_path.parent.mkdir(parents=True, exist_ok=True)
with open(api_config_path, 'w', encoding='utf-8') as f:
json.dump({
"api": {
"provider": "anthropic",
"api_key": "anthropic-key-mcp",
"model": "claude-3-opus"
}
}, f)
with patch("ace.ACELiteLLM") as mock_ace, patch("sys.stdin.isatty", return_value=False), patch.dict(os.environ, {}, clear=False):
mock_ace_instance = MagicMock()
mock_ace.return_value = mock_ace_instance
manager = ACEManager(config_manager)
assert os.environ.get("ANTHROPIC_API_KEY") == "anthropic-key-mcp"
assert manager.enabled is True
def test_initialize_ace_import_error(self, config_manager):
"""测试 ACE 框架未安装时的处理。"""
with patch("ace.ACELiteLLM", side_effect=ImportError("No module named 'ace'")):
manager = ACEManager(config_manager)
assert manager.enabled is False
assert manager.ace_agent is None
def test_initialize_ace_exception(self, config_manager):
"""测试 ACE 初始化时的异常处理。"""
with patch("ace.ACELiteLLM", side_effect=Exception("Initialization failed")):
manager = ACEManager(config_manager)
assert manager.enabled is False
assert manager.ace_agent is None
# ----- generate_tool_strategy 分支测试 -----
def test_generate_tool_strategy_with_relevant_strategies(self, config_manager):
"""测试生成策略时使用相关策略。"""
manager = self._create_manager(config_manager)
manager._cached_strategies = [
{
"tool_name": "search_entities",
"success_rate": 0.9,
"usage_count": 10,
"content": "Test strategy content"
}
]
manager.ace_agent.ask.return_value = json.dumps({"optimized_arguments": {}})
result = manager.generate_tool_strategy(
user_query="test",
tool_name="search_entities",
context={"query": "test"}
)
assert result is not None
assert "optimized_arguments" in result
def test_generate_tool_strategy_json_extraction(self, config_manager):
"""测试从非纯 JSON 响应中提取 JSON。"""
manager = self._create_manager(config_manager)
manager.ace_agent.ask.return_value = "Some text before\n{\"optimized_arguments\": {}}\nSome text after"
result = manager.generate_tool_strategy(
user_query="test",
tool_name="test_tool",
context={}
)
assert result is not None
assert "optimized_arguments" in result
def test_generate_tool_strategy_invalid_json(self, config_manager):
"""测试无法解析 JSON 的情况。"""
manager = self._create_manager(config_manager)
manager.ace_agent.ask.return_value = "This is not JSON at all"
result = manager.generate_tool_strategy(
user_query="test",
tool_name="test_tool",
context={}
)
assert result is None
def test_generate_tool_strategy_exception(self, config_manager):
"""测试生成策略时的异常处理。"""
manager = self._create_manager(config_manager)
manager.ace_agent.ask.side_effect = Exception("Network error")
result = manager.generate_tool_strategy(
user_query="test",
tool_name="test_tool",
context={}
)
assert result is None
def test_generate_tool_strategy_disabled_returns_none(self, config_manager):
"""未启用时应直接返回 None。"""
manager = self._create_manager(config_manager)
manager.enabled = False
manager.ace_agent = None
result = manager.generate_tool_strategy(
user_query="disabled",
tool_name="test_tool",
context={}
)
assert result is None
def test_generate_tool_strategy_non_mcp_mode(self, config_manager):
"""非 MCP 模式下应直接调用 ask 而不重定向。"""
manager = self._create_manager(config_manager)
manager.ace_agent.ask.return_value = json.dumps({"optimized_arguments": {}})
with patch("sys.stdin.isatty", return_value=True):
result = manager.generate_tool_strategy(
user_query="test",
tool_name="test_tool",
context={}
)
manager.ace_agent.ask.assert_called_once()
assert result is not None
# ----- _get_relevant_strategies 测试 -----
def test_get_relevant_strategies_with_sorting(self, config_manager):
"""测试获取相关策略时的排序逻辑。"""
manager = self._create_manager(config_manager)
manager._cached_strategies = [
{"tool_name": "test_tool", "success_rate": 0.5, "usage_count": 5},
{"tool_name": "test_tool", "success_rate": 0.9, "usage_count": 10},
{"tool_name": "other_tool", "success_rate": 0.8, "usage_count": 8},
{"tool_name": "test_tool", "success_rate": 0.7, "usage_count": 3},
]
relevant = manager._get_relevant_strategies("test_tool", limit=2)
assert len(relevant) == 2
assert relevant[0]["success_rate"] == 0.9 # 最高成功率
assert relevant[1]["success_rate"] == 0.7
# ----- _create_strategy_version 测试 -----
def test_create_strategy_version_with_prev_node(self, config_manager):
"""测试创建策略版本时存在前一个节点的情况。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
# 模拟存在前一个节点
prev_node = MagicMock()
prev_node.id = 123
prev_node.get.return_value = None
prev_node.__getitem__ = lambda self, key: {"tool_name": "test", "version": 1}.get(key, 0)
prev_node.__iter__ = lambda self: iter({"tool_name": "test", "version": 1, "success_count": 5}.items())
record = MagicMock()
record.__getitem__ = lambda self, key: prev_node if key == "s" else None
record.single.return_value = record
session.run.return_value = record
manager = self._create_manager(config_manager, graphiti_client)
manager._create_strategy_version(
tool_name="test_tool",
arguments_hash="hash123",
reflection="Test reflection",
success_increment=1.0,
failure_increment=0.0,
usage_increment=1.0
)
# 验证调用了更新 is_latest 的查询
assert session.run.call_count >= 2
def test_create_strategy_version_with_rating(self, config_manager):
"""测试创建策略版本时包含评分。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
record = MagicMock()
record.single.return_value = None
session.run.return_value = record
manager = self._create_manager(config_manager, graphiti_client)
manager._create_strategy_version(
tool_name="test_tool",
arguments_hash="hash123",
reflection="Test reflection",
success_increment=1.0,
failure_increment=0.0,
usage_increment=1.0,
rating=5.0,
feedback_increment=1.0
)
# 验证调用了创建策略的查询
assert session.run.call_count >= 1
def test_create_strategy_version_without_connection(self, config_manager):
"""测试数据库未连接时不创建策略版本。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = False
manager = self._create_manager(config_manager, graphiti_client)
manager._create_strategy_version(
tool_name="test_tool",
arguments_hash="hash123",
reflection="Test reflection",
success_increment=1.0,
failure_increment=0.0,
usage_increment=1.0
)
# 不应该调用 session
assert not hasattr(graphiti_client.driver, 'session') or graphiti_client.driver.session.call_count == 0
# ----- _record_strategy_trend 测试 -----
def test_record_strategy_trend_success(self, config_manager):
"""测试记录策略趋势成功。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
manager = self._create_manager(config_manager, graphiti_client)
manager._record_strategy_trend(
tool_name="test_tool",
arguments_hash="hash123",
success_increment=1.0,
failure_increment=0.0,
usage_increment=1.0,
feedback_increment=0.0,
rating_sum_increment=0.0,
timestamp=datetime.utcnow()
)
assert session.run.call_count == 1
def test_record_strategy_trend_without_connection(self, config_manager):
"""测试数据库未连接时不记录趋势。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = False
manager = self._create_manager(config_manager, graphiti_client)
manager._record_strategy_trend(
tool_name="test_tool",
arguments_hash="hash123",
success_increment=1.0,
failure_increment=0.0,
usage_increment=1.0,
feedback_increment=0.0,
rating_sum_increment=0.0,
timestamp=datetime.utcnow()
)
assert graphiti_client.driver.session.call_count == 0
def test_record_strategy_trend_exception(self, config_manager):
"""当写入趋势失败时应捕获异常。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
session.run.side_effect = Exception("trend error")
manager = self._create_manager(config_manager, graphiti_client)
manager._record_strategy_trend(
tool_name="test_tool",
arguments_hash="hash123",
success_increment=1.0,
failure_increment=0.0,
usage_increment=1.0,
feedback_increment=0.0,
rating_sum_increment=0.0,
timestamp=datetime.utcnow()
)
# ----- query_strategies 测试 -----
def test_query_strategies_with_tool_name(self, config_manager):
"""测试按工具名称查询策略。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
# 模拟返回策略节点
strategy_node = MagicMock()
strategy_node.get.side_effect = lambda key, default=None: {
"tool_name": "test_tool",
"content": "test content",
"success_rate": 0.9,
"usage_count": 10,
"success_count": 9,
"failure_count": 1,
"arguments_hash": "hash123",
"created_at": "2025-01-01",
"updated_at": "2025-01-02"
}.get(key, default)
record = MagicMock()
record.__getitem__ = lambda self, key: strategy_node if key == "s" else None
session.run.return_value = [record]
manager = self._create_manager(config_manager, graphiti_client)
strategies = manager.query_strategies(tool_name="test_tool", limit=10)
assert len(strategies) == 1
assert strategies[0]["tool_name"] == "test_tool"
def test_query_strategies_without_tool_name(self, config_manager):
"""测试查询所有策略。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
strategy_node = MagicMock()
strategy_node.get.side_effect = lambda key, default=None: {
"tool_name": "test_tool",
"content": "test",
"success_rate": 0.8,
"usage_count": 5,
"success_count": 4,
"failure_count": 1,
"arguments_hash": "hash",
"created_at": "2025-01-01",
"updated_at": "2025-01-01"
}.get(key, default)
record = MagicMock()
record.__getitem__ = lambda self, key: strategy_node if key == "s" else None
session.run.return_value = [record]
manager = self._create_manager(config_manager, graphiti_client)
strategies = manager.query_strategies(limit=10)
assert len(strategies) == 1
def test_query_strategies_exception(self, config_manager):
"""测试查询策略时的异常处理。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
session.run.side_effect = Exception("Database error")
manager = self._create_manager(config_manager, graphiti_client)
strategies = manager.query_strategies()
assert strategies == []
def test_query_strategies_not_connected(self, config_manager):
"""Graphiti 未连接时 query_strategies 应直接返回空列表。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = False
manager = self._create_manager(config_manager, graphiti_client)
strategies = manager.query_strategies(tool_name="any", limit=5)
assert strategies == []
# ----- _load_skillbook 测试 -----
def test_load_skillbook_with_strategies(self, config_manager):
"""测试加载 Skillbook 时存在策略。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
strategy_node = MagicMock()
strategy_node.get.side_effect = lambda key, default=None: {
"tool_name": "test_tool",
"content": "test content",
"success_rate": 0.9,
"usage_count": 10,
"arguments_hash": "hash123"
}.get(key, default)
record = MagicMock()
record.__getitem__ = lambda self, key: strategy_node if key == "s" else None
session.run.return_value = [record]
manager = self._create_manager(config_manager, graphiti_client)
manager._load_skillbook()
assert len(manager._cached_strategies) == 1
def test_load_skillbook_no_strategies(self, config_manager):
"""测试加载 Skillbook 时无策略。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
session.run.return_value = []
manager = self._create_manager(config_manager, graphiti_client)
manager._load_skillbook()
assert manager._cached_strategies == []
def test_load_skillbook_exception(self, config_manager):
"""测试加载 Skillbook 时的异常处理。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
session.run.side_effect = Exception("Database error")
manager = self._create_manager(config_manager, graphiti_client)
manager._load_skillbook()
assert manager._cached_strategies == []
def test_reload_strategies_async_handles_error(self, config_manager):
"""_reload_strategies_async 遇到异常时不应抛出。"""
manager = self._create_manager(config_manager)
with patch.object(manager, "_load_skillbook", side_effect=Exception("reload boom")):
manager._reload_strategies_async()
# ----- save_skillbook 和 load_skillbook 测试 -----
def test_save_skillbook_success(self, config_manager, temp_config_dir):
"""测试保存 Skillbook 成功。"""
manager = self._create_manager(config_manager)
manager.ace_agent.save_skillbook = MagicMock()
file_path = str(temp_config_dir / "test_skillbook.json")
manager.save_skillbook(file_path)
manager.ace_agent.save_skillbook.assert_called_once_with(file_path)
def test_save_skillbook_without_method(self, config_manager):
"""测试 ACE Agent 不支持 save_skillbook 方法。"""
manager = self._create_manager(config_manager)
del manager.ace_agent.save_skillbook
manager.save_skillbook()
# 应该不会抛出异常
def test_save_skillbook_not_enabled_and_exception(self, config_manager, temp_config_dir):
"""save_skillbook 的未启用与异常分支。"""
manager = self._create_manager(config_manager)
manager.enabled = False
# 未启用时直接返回
manager.save_skillbook(str(temp_config_dir / "ignore.json"))
# 启用但内部抛异常
manager.enabled = True
manager.ace_agent.save_skillbook = MagicMock(side_effect=Exception("save error"))
manager.save_skillbook(str(temp_config_dir / "err.json"))
def test_load_skillbook_from_file_success(self, config_manager, temp_config_dir):
"""测试从文件加载 Skillbook 成功。"""
manager = self._create_manager(config_manager)
# 当前实现优先使用 ace_agent.load_skillbook 方法
manager.ace_agent.load_skillbook = MagicMock()
file_path = str(temp_config_dir / "test_skillbook.json")
manager.load_skillbook(file_path)
# 验证 load_skillbook 被调用
manager.ace_agent.load_skillbook.assert_called_once_with(file_path)
def test_load_skillbook_without_method(self, config_manager):
"""测试 ACE Agent 不支持 from_skillbook 方法。"""
with patch("ace.ACELiteLLM") as mock_ace_class:
# 模拟没有 from_skillbook 方法
if hasattr(mock_ace_class, 'from_skillbook'):
delattr(mock_ace_class, 'from_skillbook')
manager = self._create_manager(config_manager)
manager.load_skillbook("test.json")
# 应该不会抛出异常
def test_load_skillbook_not_enabled_and_exception(self, config_manager, temp_config_dir):
"""load_skillbook 的未启用与异常分支。"""
manager = self._create_manager(config_manager)
manager.enabled = False
manager.load_skillbook(str(temp_config_dir / "ignore.json"))
manager.enabled = True
with patch("ace.ACELiteLLM") as mock_ace_class:
mock_ace_class.from_skillbook.side_effect = Exception("load error")
manager.load_skillbook(str(temp_config_dir / "err.json"))
# ----- export_strategies / import_strategies / toggle_strategy / validate_strategies -----
def test_export_strategies_not_connected_and_exception(self, config_manager, tmp_path):
"""export_strategies 的未连接与异常分支。"""
# 未连接
graphiti_client_nc = MagicMock()
graphiti_client_nc.is_connected.return_value = False
manager_nc = self._create_manager(config_manager, graphiti_client_nc)
assert manager_nc.export_strategies("tool") is None
# 连接但内部抛异常(通过 query_strategies 抛错)
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
manager = self._create_manager(config_manager, graphiti_client)
with patch.object(manager, "query_strategies", side_effect=Exception("query error")):
assert manager.export_strategies("tool", file_path=str(tmp_path / "out.json")) is None
def test_bulk_update_and_bulk_export_strategies_delete_and_empty(self, config_manager, tmp_path):
"""覆盖 bulk_update_strategies 的 delete 分支与 bulk_export_strategies 的空结果分支。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
# bulk_update_strategies delete 分支
manager = self._create_manager(config_manager, graphiti_client)
with patch.object(manager, "_build_strategy_filter", return_value=("true", {"group_id": "g", "limit": 10})):
# session.run 返回带有 affected 字段的记录
record = {"affected": 3}
session.run.return_value.single.return_value = record # type: ignore[attr-defined]
result = manager.bulk_update_strategies(action="delete", limit=10)
assert result["action"] == "delete"
# bulk_export_strategies 空结果 + 指定 file_path
session.run.return_value = [] # type: ignore[assignment]
result_export = manager.bulk_export_strategies(
tool_name="tool", file_path=str(tmp_path / "bulk_export.json")
)
assert result_export["count"] == 0
def test_import_strategies_not_connected_and_missing_file_and_outer_exception(self, config_manager, tmp_path):
"""import_strategies 的未连接、文件不存在与外层异常分支。"""
# 未连接
graphiti_client_nc = MagicMock()
graphiti_client_nc.is_connected.return_value = False
manager_nc = self._create_manager(config_manager, graphiti_client_nc)
assert manager_nc.import_strategies(str(tmp_path / "x.json")) is None
# 文件不存在
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
manager = self._create_manager(config_manager, graphiti_client)
assert manager.import_strategies(str(tmp_path / "missing.json")) is None
# 外层异常(例如 json 解析失败)
import_path = tmp_path / "bad.json"
import_path.write_text("not-json", encoding="utf-8")
with patch("json.load", side_effect=Exception("load error")):
assert manager.import_strategies(str(import_path)) is None
def test_import_strategies_skip_existing_without_overwrite(self, config_manager, tmp_path):
"""已有策略且 overwrite=False 时应跳过导入。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
# check_query 找到 existing,import_query 不应被执行
existing_record = MagicMock()
session.run.side_effect = [existing_record] # 只返回一次 existing
manager = self._create_manager(config_manager, graphiti_client)
strategies_payload = {
"strategies": [
{
"tool_name": "tool",
"arguments_hash": "hash",
"content": "c",
"success_rate": 1.0,
"usage_count": 1,
"success_count": 1,
"failure_count": 0,
}
]
}
file_path = tmp_path / "strategies.json"
file_path.write_text(json.dumps(strategies_payload), encoding="utf-8")
result = manager.import_strategies(str(file_path), overwrite=False)
assert result["count"] == 0
def test_import_strategies_empty_and_missing_tool_name_and_inner_exception(self, config_manager, tmp_path):
"""覆盖 import_strategies 中空列表、缺少 tool_name 与单条导入异常分支。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
manager = self._create_manager(config_manager, graphiti_client)
# 空 strategies 列表
empty_payload = {"strategies": []}
empty_path = tmp_path / "empty.json"
empty_path.write_text(json.dumps(empty_payload), encoding="utf-8")
assert manager.import_strategies(str(empty_path)) is None
# 缺少 tool_name
missing_tool_payload = {"strategies": [{"arguments_hash": "h", "content": "c"}]}
missing_tool_path = tmp_path / "missing_tool.json"
missing_tool_path.write_text(json.dumps(missing_tool_payload), encoding="utf-8")
res_missing_tool = manager.import_strategies(str(missing_tool_path))
assert res_missing_tool["errors"] # 至少有一个错误
# 单条导入时 session.run 抛异常,走到内层异常分支
session.run.side_effect = Exception("inner import error")
error_payload = {
"strategies": [
{
"tool_name": "tool",
"arguments_hash": "hash",
"content": "c",
"success_rate": 1.0,
"usage_count": 1,
"success_count": 1,
"failure_count": 0,
}
]
}
error_path = tmp_path / "error.json"
error_path.write_text(json.dumps(error_payload), encoding="utf-8")
res_error = manager.import_strategies(str(error_path))
assert "导入策略失败" in "".join(res_error["errors"])
def test_toggle_and_validate_strategies_not_connected_and_exception(self, config_manager):
"""覆盖 toggle_strategy 与 validate_strategies 的未连接和异常分支。"""
# toggle_strategy 未连接
graphiti_client_nc = MagicMock()
graphiti_client_nc.is_connected.return_value = False
manager_nc = self._create_manager(config_manager, graphiti_client_nc)
assert manager_nc.toggle_strategy("tool") is None
# toggle_strategy 异常
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
session.run.side_effect = Exception("toggle error")
manager = self._create_manager(config_manager, graphiti_client)
assert manager.toggle_strategy("tool", "hash") is None
# validate_strategies 未连接
manager_nc_validate = self._create_manager(config_manager, graphiti_client_nc)
validate_result_nc = manager_nc_validate.validate_strategies()
assert validate_result_nc["valid"] is False
assert "error" in validate_result_nc
# validate_strategies 有数据且有 issues
session.run.side_effect = None
record = {
"total": 2,
"enabled_count": 1,
"disabled_count": 1,
"missing_success_rate": 1,
"missing_usage_count": 1,
"avg_success_rate": 0.5,
}
session.run.return_value.single.return_value = record # type: ignore[attr-defined]
validate_manager = self._create_manager(config_manager, graphiti_client)
validate_result = validate_manager.validate_strategies()
assert validate_result["valid"] is False
assert len(validate_result["issues"]) == 2
# validate_strategies 无记录时返回默认健康结果
session.run.return_value.single.return_value = None # type: ignore[attr-defined]
validate_result_empty = validate_manager.validate_strategies()
assert validate_result_empty["valid"] is True
# ----- get_strategy_heatmap / get_strategy_stats 覆盖 -----
def test_get_strategy_heatmap_invalid_group_by_and_exception(self, config_manager):
"""覆盖 get_strategy_heatmap 的非法 group_by 与异常分支。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
# 先返回正常结果,验证 invalid group_by 被归一化为 tool
record = {"tool_name": "t", "bucket": "80-100", "success_rate": 0.9, "usage_count": 10}
session.run.return_value = [record]
manager = self._create_manager(config_manager, graphiti_client)
result = manager.get_strategy_heatmap(limit=5, group_by="INVALID")
assert result["success"] is True
assert result["summary"]["group_by"] == "tool"
# 再触发异常,走到 except 分支
session.run.side_effect = Exception("heatmap error")
result_error = manager.get_strategy_heatmap(limit=5, group_by="tool")
assert result_error["success"] is False
def test_get_strategy_heatmap_not_connected(self, config_manager):
"""未连接时 get_strategy_heatmap 返回错误信息。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = False
manager = self._create_manager(config_manager, graphiti_client)
result = manager.get_strategy_heatmap()
assert result["success"] is False
def test_get_strategy_alerts_warning_severity(self, config_manager):
"""get_strategy_alerts 的 warning 等级分支。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
# failure_threshold=5, 介于 10 和 20 之间应为 warning
record = {
"tool_name": "tool",
"arguments_hash": "hash",
"failures": 15,
"usage": 30,
"last_date": "2025-01-01",
}
session.run.return_value = [record] # type: ignore[assignment]
manager = self._create_manager(config_manager, graphiti_client)
alerts = manager.get_strategy_alerts(failure_threshold=5, days=1)
assert len(alerts) == 1
assert alerts[0]["severity"] == "warning"
def test_get_strategy_stats_tool_and_all_and_not_connected(self, config_manager):
"""覆盖 get_strategy_stats 的三条路径:tool / all / 未连接。"""
# 未连接分支
graphiti_client_nc = MagicMock()
graphiti_client_nc.is_connected.return_value = False
manager_nc = self._create_manager(config_manager, graphiti_client_nc)
assert manager_nc.get_strategy_stats("tool") is None
# 按 tool_name 聚合
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
record_tool = {
"total_strategies": 2,
"avg_success_rate": 0.8,
"total_usage": 10,
"total_success": 8,
"total_failure": 2,
}
session.run.return_value.single.return_value = record_tool # type: ignore[attr-defined]
manager = self._create_manager(config_manager, graphiti_client)
stats_tool = manager.get_strategy_stats("search_entities")
assert stats_tool["total_strategies"] == 2
# 汇总所有工具(by_tool 分支)
record_all = {
"total_strategies": 3,
"avg_success_rate": 0.7,
"total_usage": 20,
"total_success": 14,
"total_failure": 6,
"by_tool": [
{
"tool_name": "a",
"count": 1,
"avg_success_rate": 0.8,
"usage": 5,
"success": 4,
"failure": 1,
}
],
}
session.run.return_value.single.return_value = record_all # type: ignore[attr-defined]
stats_all = manager.get_strategy_stats()
assert stats_all["total_strategies"] == 3
# ----- 内部 Skillbook 更新辅助方法 -----
def test_reflect_on_tool_result_disabled(self, config_manager):
"""ACE 未启用时应直接返回。"""
manager = self._create_manager(config_manager)
manager.enabled = False
manager.ace_agent = None
with patch.object(manager, "_update_skillbook") as mock_update:
manager.reflect_on_result("tool", {}, {"success": True})
mock_update.assert_not_called()
def test_reflect_on_tool_result_non_mcp(self, config_manager):
"""非 MCP 模式下应直接调用 ace_agent.ask。"""
manager = self._create_manager(config_manager)
manager.ace_agent.ask.return_value = "{}"
with patch("sys.stdin.isatty", return_value=True), patch.object(
manager, "_update_skillbook"
) as mock_update, patch.object(manager, "_reload_strategies_async") as mock_reload:
manager.reflect_on_result("tool", {}, {"success": True})
manager.ace_agent.ask.assert_called_once()
mock_update.assert_called_once()
mock_reload.assert_called_once()
def test_update_skillbook_with_feedback_no_client(self, config_manager):
"""Graphiti 未连接时 _update_skillbook_with_feedback 直接返回。"""
manager = self._create_manager(config_manager)
manager.graphiti_client = None
with patch.object(manager, "_create_strategy_version") as mock_create:
manager._update_skillbook_with_feedback(
tool_name="tool",
arguments={"a": 1},
result={"rating": 4},
reflection="ok",
success=True,
weight=1.0,
)
mock_create.assert_not_called()
def test_update_skillbook_with_feedback_exception(self, config_manager):
"""_update_skillbook_with_feedback 内部异常会被捕获。"""
manager = self._create_manager(config_manager)
manager.graphiti_client = MagicMock()
manager.graphiti_client.is_connected.return_value = True
with patch("json.dumps", side_effect=Exception("json error")):
# 不应抛出异常
manager._update_skillbook_with_feedback(
tool_name="tool",
arguments={"a": 1},
result={"rating": 3},
reflection="ok",
success=True,
weight=1.0,
)
def test_update_skillbook_no_client_and_exception(self, config_manager):
"""_update_skillbook 的早退与异常分支。"""
manager = self._create_manager(config_manager)
manager.graphiti_client = None
# 未连接时直接返回
manager._update_skillbook("tool", {}, {}, "ok", True)
# 设置连接但 _create_strategy_version 抛异常
manager.graphiti_client = MagicMock()
manager.graphiti_client.is_connected.return_value = True
with patch.object(manager, "_create_strategy_version", side_effect=Exception("boom")):
manager._update_skillbook("tool", {}, {}, "ok", True)
def test_rate_result_non_mcp_branch(self, config_manager):
"""rate_result 的非 MCP 模式分支(直接调用 ask)。"""
manager = self._create_manager(config_manager)
manager.graphiti_client = MagicMock()
manager.graphiti_client.is_connected.return_value = True
manager.ace_agent.ask.return_value = "{}"
with patch("sys.stdin.isatty", return_value=True), patch.object(
manager, "_update_skillbook_with_feedback"
) as mock_update:
ok = manager.rate_result("tool", rating=4, feedback="good", context={"a": 1})
assert ok is True
manager.ace_agent.ask.assert_called_once()
mock_update.assert_called_once()
def test_create_strategy_version_exception(self, config_manager):
"""创建策略版本时异常会被捕获。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
session.run.side_effect = Exception("create error")
manager = self._create_manager(config_manager, graphiti_client)
manager._create_strategy_version(
tool_name="tool",
arguments_hash="hash",
reflection="oops",
success_increment=1.0,
failure_increment=0.0,
usage_increment=1.0
)
# ----- export_strategies 测试 -----
def test_export_strategies_with_tool_name_in_filename(self, config_manager, temp_config_dir):
"""测试导出策略时文件名包含工具名称。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
session.run.return_value = []
manager = self._create_manager(config_manager, graphiti_client)
# 模拟有策略
with patch.object(manager, 'query_strategies', return_value=[
{"tool_name": "test_tool", "content": "test", "success_rate": 0.9}
]):
result = manager.export_strategies(tool_name="test_tool")
assert result is not None
assert "test_tool" in result["file_path"]
def test_export_strategies_no_strategies(self, config_manager):
"""测试导出策略时无策略可导出。"""
graphiti_client = MagicMock()
graphiti_client.is_connected.return_value = True
session = self._mock_session(graphiti_client)
session.run.return_value = []
manager = self._create_manager(config_manager, graphiti_client)
with patch.object(manager, 'query_strategies', return_value=[]):
result = manager.export_strategies()
assert result is None