Skip to main content
Glama
test_agents.py22.4 kB
""" Unit tests for the Agent System. Tests cover: - Task creation and state management - Agent execution and delegation - SubAgent specialization - Orchestrator coordination - Workflow execution - Error handling and retries """ import pytest from datetime import datetime from mcp_demo.task import Task, TaskResult, TaskStatus, TaskPriority from mcp_demo.agent import Agent, AgentCapability from mcp_demo.subagent import ( CalculatorSubAgent, FileOperationsSubAgent, WeatherSubAgent, DataProcessingSubAgent, ) from mcp_demo.orchestrator import AgentOrchestrator, Workflow, WorkflowStep # ============================================================================ # Task Tests # ============================================================================ class TestTask: """Test Task model and state management.""" def test_task_creation(self): """Test creating a task with default values.""" task = Task( name="Test Task", description="A test task", task_type="calculation", parameters={"a": 1, "b": 2}, ) assert task.name == "Test Task" assert task.task_type == "calculation" assert task.status == TaskStatus.PENDING assert task.priority == TaskPriority.MEDIUM assert task.result is None assert task.retry_count == 0 def test_task_with_priority(self): """Test creating a task with specific priority.""" task = Task( name="High Priority Task", description="Important task", task_type="calculation", parameters={}, priority=TaskPriority.HIGH, ) assert task.priority == TaskPriority.HIGH def test_task_mark_started(self): """Test marking a task as started.""" task = Task( name="Test", description="Test", task_type="test", parameters={}, ) task.mark_started() assert task.status == TaskStatus.IN_PROGRESS assert task.started_at is not None assert isinstance(task.started_at, datetime) def test_task_mark_completed(self): """Test marking a task as completed.""" task = Task( name="Test", description="Test", task_type="test", parameters={}, ) result = TaskResult(success=True, data="result data") task.mark_started() task.mark_completed(result) assert task.status == TaskStatus.COMPLETED assert task.result == result assert task.completed_at is not None def test_task_mark_failed(self): """Test marking a task as failed.""" task = Task( name="Test", description="Test", task_type="test", parameters={}, ) task.mark_started() task.mark_failed("Test error") assert task.status == TaskStatus.FAILED assert task.result is not None assert task.result.success is False assert task.result.error == "Test error" def test_task_can_retry(self): """Test retry logic.""" task = Task( name="Test", description="Test", task_type="test", parameters={}, max_retries=3, ) assert task.can_retry() is True task.retry_count = 2 assert task.can_retry() is True task.retry_count = 3 assert task.can_retry() is False def test_task_increment_retry(self): """Test incrementing retry counter.""" task = Task( name="Test", description="Test", task_type="test", parameters={}, ) task.mark_started() task.mark_failed("Error") initial_count = task.retry_count task.increment_retry() assert task.retry_count == initial_count + 1 assert task.status == TaskStatus.PENDING def test_task_execution_time(self): """Test getting task execution time.""" import time task = Task( name="Test", description="Test", task_type="test", parameters={}, ) task.mark_started() time.sleep(0.1) # Small delay result = TaskResult(success=True, data="test") task.mark_completed(result) execution_time = task.get_execution_time() assert execution_time is not None assert execution_time >= 0.1 # ============================================================================ # Agent Tests # ============================================================================ class TestAgent: """Test Agent class and basic functionality.""" def test_agent_creation(self): """Test creating an agent.""" agent = Agent( name="TestAgent", capabilities=[ AgentCapability( task_type="test", mcp_tool="test_tool", description="Test capability", ) ], ) assert agent.name == "TestAgent" assert len(agent.capabilities) == 1 assert len(agent.subagents) == 0 assert agent._connected is False def test_agent_register_subagent(self): """Test registering a subagent.""" main_agent = Agent(name="MainAgent") sub_agent = Agent(name="SubAgent") main_agent.register_subagent(sub_agent) assert sub_agent.id in main_agent.subagents assert main_agent.subagents[sub_agent.id] == sub_agent def test_agent_can_handle_task(self): """Test checking if agent can handle a task.""" agent = Agent( name="TestAgent", capabilities=[ AgentCapability( task_type="calculation", mcp_tool="calculator", description="Math operations", ) ], ) calc_task = Task( name="Test", description="Test", task_type="calculation", parameters={}, ) other_task = Task( name="Test", description="Test", task_type="other", parameters={}, ) assert agent.can_handle_task(calc_task) is True assert agent.can_handle_task(other_task) is False def test_agent_get_capability_for_task(self): """Test getting capability for a task.""" capability = AgentCapability( task_type="calculation", mcp_tool="calculator", description="Math operations", ) agent = Agent(name="TestAgent", capabilities=[capability]) task = Task( name="Test", description="Test", task_type="calculation", parameters={}, ) found_capability = agent.get_capability_for_task(task) assert found_capability is not None assert found_capability.task_type == "calculation" assert found_capability.mcp_tool == "calculator" @pytest.mark.asyncio async def test_agent_execute_task_not_connected(self): """Test that executing without connection raises error.""" agent = Agent(name="TestAgent") task = Task( name="Test", description="Test", task_type="test", parameters={}, ) with pytest.raises(RuntimeError, match="not connected"): await agent.execute_task(task) @pytest.mark.asyncio async def test_agent_execute_task_success(self): """Test successful task execution.""" agent = Agent( name="TestAgent", capabilities=[ AgentCapability( task_type="calculation", mcp_tool="calculator", description="Math", ) ], ) task = Task( name="Add numbers", description="Add 10 + 20", task_type="calculation", parameters={"operation": "add", "a": 10, "b": 20}, ) async with agent: result = await agent.execute_task(task) assert result.success is True assert "30" in result.data assert task.status == TaskStatus.COMPLETED assert task.assigned_agent_id == agent.id @pytest.mark.asyncio async def test_agent_execute_tasks_sequential(self): """Test executing multiple tasks sequentially.""" agent = Agent( name="TestAgent", capabilities=[ AgentCapability( task_type="calculation", mcp_tool="calculator", description="Math", ) ], ) tasks = [ Task( name=f"Task {i}", description="Test", task_type="calculation", parameters={"operation": "add", "a": i, "b": i}, ) for i in range(3) ] async with agent: results = await agent.execute_tasks(tasks) assert len(results) == 3 assert all(r.success for r in results) @pytest.mark.asyncio async def test_agent_get_statistics(self): """Test getting agent statistics.""" agent = Agent( name="TestAgent", capabilities=[ AgentCapability( task_type="calculation", mcp_tool="calculator", description="Math", ) ], ) task = Task( name="Test", description="Test", task_type="calculation", parameters={"operation": "add", "a": 1, "b": 2}, ) async with agent: await agent.execute_task(task) stats = agent.get_task_statistics() assert stats["agent_name"] == "TestAgent" assert stats["total_tasks"] == 1 assert stats["completed"] == 1 assert stats["failed"] == 0 assert stats["success_rate"] == 100.0 # ============================================================================ # SubAgent Tests # ============================================================================ class TestSubAgent: """Test SubAgent specialization.""" def test_calculator_subagent(self): """Test CalculatorSubAgent creation.""" agent = CalculatorSubAgent() assert agent.name == "CalculatorAgent" assert len(agent.capabilities) > 0 assert any(c.task_type == "calculation" for c in agent.capabilities) def test_file_operations_subagent(self): """Test FileOperationsSubAgent creation.""" agent = FileOperationsSubAgent() assert agent.name == "FileOpsAgent" assert any(c.task_type == "file_read" for c in agent.capabilities) assert any(c.task_type == "file_write" for c in agent.capabilities) def test_weather_subagent(self): """Test WeatherSubAgent creation.""" agent = WeatherSubAgent() assert agent.name == "WeatherAgent" assert any(c.task_type == "weather" for c in agent.capabilities) def test_data_processing_subagent(self): """Test DataProcessingSubAgent creation.""" agent = DataProcessingSubAgent() assert agent.name == "DataProcessorAgent" assert any(c.task_type == "calculation" for c in agent.capabilities) assert any(c.task_type == "file_read" for c in agent.capabilities) def test_subagent_with_parent(self): """Test subagent registration with parent.""" parent = Agent(name="Parent") subagent = CalculatorSubAgent(parent_agent=parent) assert subagent.parent_agent == parent assert subagent.id in parent.subagents @pytest.mark.asyncio async def test_subagent_execution(self): """Test subagent task execution.""" agent = CalculatorSubAgent() task = Task( name="Calculate", description="Test calculation", task_type="calculation", parameters={"operation": "multiply", "a": 5, "b": 6}, ) async with agent: result = await agent.execute_task(task) assert result.success is True assert "30" in result.data # ============================================================================ # Orchestrator Tests # ============================================================================ class TestOrchestrator: """Test AgentOrchestrator functionality.""" def test_orchestrator_creation(self): """Test creating an orchestrator.""" orch = AgentOrchestrator(name="TestOrch") assert orch.name == "TestOrch" assert len(orch.agents) == 0 def test_orchestrator_register_agent(self): """Test registering agents.""" orch = AgentOrchestrator() agent = Agent(name="TestAgent") orch.register_agent(agent) assert agent.id in orch.agents assert orch.agents[agent.id] == agent def test_orchestrator_unregister_agent(self): """Test unregistering agents.""" orch = AgentOrchestrator() agent = Agent(name="TestAgent") orch.register_agent(agent) orch.unregister_agent(agent.id) assert agent.id not in orch.agents def test_orchestrator_find_agent_for_task(self): """Test finding appropriate agent for task.""" orch = AgentOrchestrator() calc_agent = CalculatorSubAgent() file_agent = FileOperationsSubAgent() orch.register_agent(calc_agent) orch.register_agent(file_agent) calc_task = Task( name="Test", description="Test", task_type="calculation", parameters={}, ) file_task = Task( name="Test", description="Test", task_type="file_read", parameters={}, ) found_calc = orch.find_agent_for_task(calc_task) found_file = orch.find_agent_for_task(file_task) assert found_calc is not None assert found_calc.id == calc_agent.id assert found_file is not None assert found_file.id == file_agent.id @pytest.mark.asyncio async def test_orchestrator_execute_task(self): """Test orchestrator task execution.""" orch = AgentOrchestrator() agent = CalculatorSubAgent() orch.register_agent(agent) task = Task( name="Test", description="Test", task_type="calculation", parameters={"operation": "add", "a": 5, "b": 10}, ) async with orch: result = await orch.execute_task(task) assert result.success is True assert "15" in result.data @pytest.mark.asyncio async def test_orchestrator_execute_tasks_parallel(self): """Test parallel task execution.""" orch = AgentOrchestrator() agent = CalculatorSubAgent() orch.register_agent(agent) tasks = [ Task( name=f"Task {i}", description="Test", task_type="calculation", parameters={"operation": "add", "a": i, "b": i}, ) for i in range(5) ] async with orch: results = await orch.execute_tasks(tasks, parallel=True) assert len(results) == 5 assert all(r.success for r in results) @pytest.mark.asyncio async def test_orchestrator_statistics(self): """Test orchestrator statistics.""" orch = AgentOrchestrator() agent = CalculatorSubAgent() orch.register_agent(agent) task = Task( name="Test", description="Test", task_type="calculation", parameters={"operation": "add", "a": 1, "b": 1}, ) async with orch: await orch.execute_task(task) stats = orch.get_statistics() assert stats["total_agents"] == 1 assert stats["total_tasks_executed"] == 1 assert stats["total_completed"] == 1 assert stats["overall_success_rate"] == 100.0 # ============================================================================ # Workflow Tests # ============================================================================ class TestWorkflow: """Test Workflow functionality.""" def test_workflow_creation(self): """Test creating a workflow.""" step1 = WorkflowStep(name="step1", tasks=[]) step2 = WorkflowStep(name="step2", tasks=[], depends_on=["step1"]) workflow = Workflow(name="TestWorkflow", steps=[step1, step2]) assert workflow.name == "TestWorkflow" assert len(workflow.steps) == 2 def test_workflow_get_ready_steps(self): """Test getting ready steps.""" task1 = Task( name="T1", description="T1", task_type="test", parameters={} ) task2 = Task( name="T2", description="T2", task_type="test", parameters={} ) step1 = WorkflowStep(name="step1", tasks=[task1]) step2 = WorkflowStep(name="step2", tasks=[task2], depends_on=["step1"]) workflow = Workflow(name="Test", steps=[step1, step2]) # Initially, only step1 should be ready ready = workflow.get_ready_steps() assert len(ready) == 1 assert ready[0].name == "step1" # After completing step1, step2 should be ready workflow.mark_step_completed("step1") ready = workflow.get_ready_steps() assert len(ready) == 1 assert ready[0].name == "step2" def test_workflow_is_completed(self): """Test checking if workflow is completed.""" step1 = WorkflowStep(name="step1", tasks=[]) step2 = WorkflowStep(name="step2", tasks=[]) workflow = Workflow(name="Test", steps=[step1, step2]) assert workflow.is_completed() is False workflow.mark_step_completed("step1") assert workflow.is_completed() is False workflow.mark_step_completed("step2") assert workflow.is_completed() is True @pytest.mark.asyncio async def test_orchestrator_execute_workflow(self): """Test executing a workflow.""" orch = AgentOrchestrator() agent = CalculatorSubAgent() orch.register_agent(agent) task1 = Task( name="Task1", description="First task", task_type="calculation", parameters={"operation": "add", "a": 1, "b": 2}, ) task2 = Task( name="Task2", description="Second task", task_type="calculation", parameters={"operation": "multiply", "a": 3, "b": 4}, ) step1 = WorkflowStep(name="step1", tasks=[task1]) step2 = WorkflowStep(name="step2", tasks=[task2], depends_on=["step1"]) workflow = Workflow(name="TestWorkflow", steps=[step1, step2]) async with orch: results = await orch.execute_workflow(workflow) assert len(results) == 2 assert "step1" in results assert "step2" in results assert all(r.success for r in results["step1"]) assert all(r.success for r in results["step2"]) # ============================================================================ # Integration Tests # ============================================================================ class TestIntegration: """Integration tests for the full agent system.""" @pytest.mark.asyncio async def test_agent_delegation_to_subagent(self): """Test agent delegating task to subagent.""" main_agent = Agent(name="MainAgent") calc_subagent = CalculatorSubAgent(parent_agent=main_agent) task = Task( name="Calculate", description="Math operation", task_type="calculation", parameters={"operation": "add", "a": 10, "b": 20}, ) await main_agent.connect() await calc_subagent.connect() try: # Main agent should delegate to calc subagent result = await main_agent.execute_task(task) assert result.success is True assert "30" in result.data assert result.metadata["agent_name"] == "CalculatorAgent" finally: await main_agent.disconnect() await calc_subagent.disconnect() @pytest.mark.asyncio async def test_multi_agent_workflow(self): """Test complex workflow with multiple agents.""" orch = AgentOrchestrator() calc_agent = CalculatorSubAgent() file_agent = FileOperationsSubAgent() orch.register_agent(calc_agent) orch.register_agent(file_agent) # Create workflow: calculate, then save result calc_task = Task( name="Calculate", description="Calculate sum", task_type="calculation", parameters={"operation": "add", "a": 100, "b": 200}, ) write_task = Task( name="Save", description="Save result", task_type="file_write", parameters={ "operation": "write", "path": "/tmp/test_workflow_result.txt", "content": "Result: 300", }, ) step1 = WorkflowStep(name="calculate", tasks=[calc_task]) step2 = WorkflowStep( name="save", tasks=[write_task], depends_on=["calculate"] ) workflow = Workflow(name="CalcAndSave", steps=[step1, step2]) async with orch: results = await orch.execute_workflow(workflow) assert all(r.success for r in results["calculate"]) assert all(r.success for r in results["save"]) # Verify the workflow completed in order assert workflow.is_completed() if __name__ == "__main__": pytest.main([__file__, "-v"])

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/joohnnie/mcp-agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server