"""Graphiti 客户端边界情况和错误处理测试."""
import pytest
from unittest.mock import Mock, MagicMock, patch
from src.config_manager import ConfigManager
from src.graphiti_client import GraphitiClient
class TestGraphitiClientEdgeCases:
"""Graphiti 客户端边界情况测试类."""
@pytest.fixture
def config_manager(self, temp_config_dir):
"""创建配置管理器."""
return ConfigManager(config_path=temp_config_dir / ".graphitiace" / "config.json")
@pytest.fixture
def client(self, config_manager):
"""创建客户端."""
return GraphitiClient(config_manager)
@pytest.fixture
def connected_client(self, config_manager, mock_neo4j_config, mock_driver):
"""创建已连接的客户端."""
config_manager.configure_neo4j(**mock_neo4j_config)
client = GraphitiClient(config_manager)
driver, session = mock_driver
client.driver = driver
client._connected = True
return client, session
@pytest.mark.asyncio
async def test_add_episode_not_connected(self, client):
"""测试未连接时添加 Episode."""
result = await client.add_episode(content="Test")
assert result['success'] is False
assert "未连接" in result['message']
def test_search_entities_not_connected(self, client):
"""测试未连接时搜索实体."""
result = client.search_entities(query="test")
assert result['success'] is False
assert "未连接" in result['message']
def test_search_entities_empty_query(self, client, mock_neo4j_config):
"""测试空查询."""
client.config_manager.configure_neo4j(**mock_neo4j_config)
# 即使未连接,也应该返回错误而不是崩溃
result = client.search_entities(query="")
assert result is not None
def test_query_knowledge_graph_invalid_cypher(self, client, mock_neo4j_config, mock_driver):
"""测试无效的 Cypher 查询."""
client.config_manager.configure_neo4j(**mock_neo4j_config)
driver, session = mock_driver
client.driver = driver
client._connected = True
# Mock 抛出异常
session.run.side_effect = Exception("Invalid Cypher")
result = client.query_knowledge_graph(cypher_query="INVALID QUERY")
assert result['success'] is False
assert "失败" in result['message']
def test_delete_episode_not_found(self, client, mock_neo4j_config, mock_driver):
"""测试删除不存在的 Episode."""
client.config_manager.configure_neo4j(**mock_neo4j_config)
driver, session = mock_driver
client.driver = driver
client._connected = True
# Mock 查询结果为空
mock_result = MagicMock()
mock_result.peek.return_value = None
session.run.return_value = mock_result
result = client.delete_episode(episode_id=999)
assert result['success'] is False
assert "未找到" in result['message']
def test_clear_graph_not_confirmed(self, client, mock_neo4j_config, mock_driver):
"""测试清空图谱未确认."""
client.config_manager.configure_neo4j(**mock_neo4j_config)
driver, session = mock_driver
client.driver = driver
client._connected = True
# 不提供 confirm 参数
result = client.clear_graph()
# 应该返回错误或要求确认
assert result is not None
def test_export_graph_data_invalid_format(self, connected_client):
"""测试无效的导出格式."""
client, session = connected_client
result = client.export_graph_data(format="invalid")
assert result['success'] is False
assert "不支持" in result['message']
def test_import_graph_data_invalid_format(self, connected_client):
"""测试无效的导入格式."""
client, session = connected_client
result = client.import_graph_data(data={}, format="invalid")
assert result['success'] is False
assert "不支持" in result['message']
def test_get_statistics_error(self, connected_client):
"""测试获取统计信息时出错."""
client, session = connected_client
# Mock 抛出异常
session.run.side_effect = Exception("Database error")
result = client.get_statistics()
assert result['success'] is False
assert "失败" in result['message']
@pytest.mark.asyncio
async def test_semantic_search_without_graphiti(self, connected_client):
"""测试无 Graphiti 时的语义搜索."""
client, session = connected_client
client.graphiti = None
# Mock 搜索实体结果
mock_result = MagicMock()
mock_record = MagicMock()
mock_record.__getitem__.return_value = {"name": "Test"}
mock_result.__iter__.return_value = [mock_record]
session.run.return_value = mock_result
result = await client.semantic_search(query="test")
assert result['success'] is True
assert result.get('search_type') == 'enhanced_keyword'
@pytest.mark.asyncio
async def test_add_episode_with_graphiti_error(self, connected_client):
"""测试 Graphiti 处理失败时的添加 Episode."""
client, session = connected_client
# Mock Graphiti
from unittest.mock import AsyncMock
mock_graphiti = MagicMock()
mock_graphiti.add_episode = AsyncMock(side_effect=Exception("Graphiti error"))
client.graphiti = mock_graphiti
# Mock 查询结果
mock_result = MagicMock()
mock_record = MagicMock()
mock_record.__getitem__.return_value = 1
mock_result.single.return_value = mock_record
session.run.return_value = mock_result
result = await client.add_episode(content="Test")
# 应该成功,但实体抽取失败
assert result['success'] is True
assert result.get('entities_extracted') is False
def test_query_by_time_range_invalid_dates(self, connected_client):
"""测试无效的时间范围."""
client, session = connected_client
# Mock 查询结果
mock_result = MagicMock()
mock_result.__iter__.return_value = []
session.run.return_value = mock_result
# 测试无效的日期范围(结束日期早于开始日期)
result = client.query_by_time_range(
start_time="2025-01-10T00:00:00",
end_time="2025-01-01T00:00:00"
)
# 应该返回结果(即使为空)
assert result is not None
def test_validate_data_with_orphaned_nodes(self, connected_client):
"""测试验证数据时发现孤立节点."""
client, session = connected_client
# Mock 孤立节点查询结果
orphaned_result = MagicMock()
orphaned_record = MagicMock()
orphaned_record.__getitem__.side_effect = lambda key: {
"count": 5,
"labels": ["Entity"]
}[key]
orphaned_result.__iter__.return_value = [orphaned_record]
# Mock 其他查询结果
duplicates_result = MagicMock()
duplicates_result.__iter__.return_value = []
integrity_result = MagicMock()
integrity_record = MagicMock()
integrity_record.__getitem__.return_value = 0
integrity_result.single.return_value = integrity_record
session.run.side_effect = [orphaned_result, duplicates_result, integrity_result]
result = client.validate_data()
assert result['success'] is True
assert 'issues' in result
def test_clean_orphaned_nodes_dry_run(self, connected_client):
"""测试清理孤立节点(干运行模式)."""
client, session = connected_client
# clean_orphaned_nodes 没有 dry_run 参数,只测试正常清理
# Mock 查询结果
mock_result = MagicMock()
mock_record = MagicMock()
mock_record.__getitem__.return_value = 5
mock_result.single.return_value = mock_record
mock_result.peek.return_value = True
session.run.return_value = mock_result
result = client.clean_orphaned_nodes()
assert result['success'] is True
assert result.get('deleted_count') == 5
def test_rebuild_indexes_no_indexes(self, connected_client):
"""测试重建索引(无索引)."""
client, session = connected_client
# Mock 查询结果为空
mock_result = MagicMock()
mock_result.__iter__.return_value = []
session.run.return_value = mock_result
result = client.rebuild_indexes()
assert result['success'] is True
assert result.get('index_count', 0) == 0