test_performance.pyโข2.92 kB
"""Tests for performance optimizations."""
import pytest
import asyncio
import sqlite3
from pathlib import Path
import tempfile
from unittest.mock import MagicMock, patch
from datetime import datetime
from delegation_mcp.agent_discovery import AgentDiscovery, AgentMetadata
from delegation_mcp.persistence import PersistenceManager, AuditLogEntry, WorkflowState
@pytest.mark.asyncio
async def test_parallel_agent_discovery():
"""Test that agent discovery runs in parallel and respects semaphore."""
with tempfile.TemporaryDirectory() as tmpdir:
# Mock Path.home to pass validation
with patch("delegation_mcp.agent_discovery.Path.home", return_value=Path(tmpdir)):
discovery = AgentDiscovery()
# Mock _discover_single_agent to be slow
async def slow_discover(name, config):
await asyncio.sleep(0.1)
return AgentMetadata(name=name, command="test", available=True)
with patch.object(discovery, "_discover_single_agent", side_effect=slow_discover):
start_time = datetime.now()
await discovery.discover_agents(force_refresh=True)
duration = (datetime.now() - start_time).total_seconds()
# If sequential (5 agents * 0.1s = 0.5s), if parallel should be ~0.1s + overhead
# We expect it to be faster than sequential
assert duration < 0.4, f"Discovery took too long: {duration}s"
@pytest.mark.asyncio
async def test_async_persistence():
"""Test that persistence methods are async and don't block."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
persistence = PersistenceManager(db_path=db_path)
# Test async log_delegation
entry = AuditLogEntry(
client_id="test",
query="test query",
orchestrator="claude",
success=True,
duration=1.0,
output_size=100
)
# This should be awaitable
entry_id = await persistence.log_delegation(entry)
assert entry_id > 0
# Test async get_audit_logs
logs = await persistence.get_audit_logs()
assert len(logs) == 1
assert logs[0].id == entry_id
# Test async workflow state
state = WorkflowState(
workflow_name="test_workflow",
status="running"
)
state_id = await persistence.save_workflow_state(state)
assert state_id > 0
loaded_state = await persistence.load_workflow_state(state_id)
assert loaded_state.workflow_name == "test_workflow"
# Test async stats
await persistence.record_delegation_history("claude", True, 1.0)
stats = await persistence.get_statistics()
assert stats["total"] == 1