"""
Agent Orchestration Platform - FastMCP Server Integration Tests
This module implements comprehensive integration tests for the FastMCP server
with all managers properly integrated and MCP tools functioning end-to-end.
Test Coverage:
- FastMCP server initialization and tool registration
- MCP tool invocation with manager integration
- Context injection and progress reporting
- Error handling and validation
- Authentication and security
- Concurrent tool invocations
- Server lifecycle management
Quality Assurance:
- End-to-end tool testing
- Security contract enforcement
- Performance validation
- Error recovery scenarios
Author: Adder_1 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import asyncio
import pytest
from typing import Dict, Any, Optional
from pathlib import Path
from datetime import datetime
from unittest.mock import AsyncMock, MagicMock, patch
from fastmcp import Context
from hypothesis import given, strategies as st
# Import system under test
from src.core.integrated_server import IntegratedAgentOrchestrationServer
from src.core.server import AgentOrchestrationServer
from src.core.coordinator import CoordinationStrategy
# Import types
from src.models.ids import SessionId, AgentId
from src.models.agent import AgentStatus, AgentSpecialization
from src.models.session import SessionStatus, SecurityLevel
from src.models.security import SecurityContext
# Import mocks
from tests.mocks.iterm_manager import MockITermManager
from tests.mocks.claude_code import MockClaudeCodeManager
class MockContext(Context):
"""Mock MCP context for testing."""
def __init__(self):
self.info_messages = []
self.error_messages = []
self.progress_reports = []
async def info(self, message: str):
"""Record info message."""
self.info_messages.append(message)
async def error(self, message: str):
"""Record error message."""
self.error_messages.append(message)
async def report_progress(self, current: int, total: int, description: str):
"""Record progress report."""
self.progress_reports.append({
"current": current,
"total": total,
"description": description
})
@pytest.fixture
async def integrated_server():
"""Create fully integrated server for testing."""
server = IntegratedAgentOrchestrationServer(
name="TestAgentOrchestration",
coordination_strategy=CoordinationStrategy.PERFORMANCE_OPTIMIZED,
max_agents=8,
max_sessions=4
)
# Patch real managers with mocks where needed
with patch('src.core.iterm_manager.ITermManager', MockITermManager):
with patch('src.core.claude_manager.ClaudeCodeManager', MockClaudeCodeManager):
await server.initialize()
yield server
# Cleanup
await server.coordinator.shutdown()
@pytest.fixture
async def basic_server():
"""Create basic server without full integration."""
server = AgentOrchestrationServer(
name="BasicTestServer",
security_level=SecurityLevel.HIGH
)
# Register tools
await server.register_agent_tools()
yield server
await server.shutdown()
class TestFastMCPServerIntegration:
"""Integration tests for FastMCP server functionality."""
@pytest.mark.asyncio
async def test_server_initialization(self, integrated_server):
"""Test server initializes with all components properly."""
server = integrated_server
# Verify initialization
assert server.initialized
assert server.tools_registered
# Verify all managers initialized
assert server.agent_manager is not None
assert server.session_manager is not None
assert server.iterm_manager is not None
assert server.claude_manager is not None
assert server.state_manager is not None
assert server.coordinator is not None
# Verify MCP tools registered
tools = list(server.mcp._tools.keys())
expected_tools = [
"create_agent", "delete_agent",
"create_session", "delete_session", "get_session_status",
"send_message_to_agent",
"clear_agent_conversation", "start_new_agent_conversation"
]
for tool_name in expected_tools:
assert any(tool_name in tool for tool in tools)
@pytest.mark.asyncio
async def test_create_agent_tool_integration(self, integrated_server):
"""Test create_agent MCP tool with full integration."""
server = integrated_server
ctx = MockContext()
# First create a session
session_result = await self._invoke_tool(
server, "create_session",
root_path="/test/project",
session_name="TestSession",
security_level="HIGH",
ctx=ctx
)
assert session_result["success"]
session_id = session_result["session_id"]
# Create agent in session
agent_result = await self._invoke_tool(
server, "create_agent",
session_id=session_id,
agent_name="Agent_1",
specialization="ADDER",
system_prompt_suffix="Test agent",
claude_config={"model": "claude-3"},
ctx=ctx
)
# Verify result
assert agent_result["success"]
assert agent_result["agent_name"] == "Agent_1"
assert agent_result["session_id"] == session_id
assert "managers_involved" in agent_result
assert len(agent_result["managers_involved"]) > 0
# Verify context interactions
assert len(ctx.info_messages) > 0
assert len(ctx.progress_reports) > 0
assert ctx.progress_reports[-1]["current"] == 100
@pytest.mark.asyncio
async def test_delete_agent_tool_integration(self, integrated_server):
"""Test delete_agent MCP tool with full integration."""
server = integrated_server
ctx = MockContext()
# Setup: Create session and agent
session_result = await self._invoke_tool(
server, "create_session",
root_path="/test/project",
session_name="TestSession",
ctx=ctx
)
session_id = session_result["session_id"]
agent_result = await self._invoke_tool(
server, "create_agent",
session_id=session_id,
agent_name="Agent_2",
ctx=ctx
)
# Delete agent
delete_result = await self._invoke_tool(
server, "delete_agent",
agent_name="Agent_2",
force=False,
ctx=ctx
)
# Verify result
assert delete_result["success"]
assert delete_result["agent_name"] == "Agent_2"
@pytest.mark.asyncio
async def test_session_management_tools(self, integrated_server):
"""Test session management MCP tools."""
server = integrated_server
ctx = MockContext()
# Create session
create_result = await self._invoke_tool(
server, "create_session",
root_path="/test/workspace",
session_name="WorkspaceSession",
security_level="MEDIUM",
ctx=ctx
)
assert create_result["success"]
session_id = create_result["session_id"]
# Get session status
status_result = await self._invoke_tool(
server, "get_session_status",
session_id=session_id,
ctx=ctx
)
assert status_result["sessions"]
assert str(session_id) in status_result["sessions"]
# Get all sessions status
all_status = await self._invoke_tool(
server, "get_session_status",
session_id=None, # Get all sessions
ctx=ctx
)
assert all_status["total_agents"] >= 0
assert all_status["active_agents"] >= 0
# Delete session
delete_result = await self._invoke_tool(
server, "delete_session",
session_id=str(session_id),
cleanup_agents=True,
preserve_work=True,
ctx=ctx
)
assert delete_result["success"]
@pytest.mark.asyncio
async def test_communication_tools(self, integrated_server):
"""Test agent communication MCP tools."""
server = integrated_server
ctx = MockContext()
# Setup: Create session and agent
session_result = await self._invoke_tool(
server, "create_session",
root_path="/test/comms",
session_name="CommsTest",
ctx=ctx
)
agent_result = await self._invoke_tool(
server, "create_agent",
session_id=session_result["session_id"],
agent_name="Agent_3",
ctx=ctx
)
# Send message to agent
message_result = await self._invoke_tool(
server, "send_message_to_agent",
agent_name="Agent_3",
message="Test message content",
prepend_adder=True,
wait_for_response=False,
ctx=ctx
)
assert message_result["success"]
assert message_result["agent_name"] == "Agent_3"
@pytest.mark.asyncio
async def test_conversation_management_tools(self, integrated_server):
"""Test conversation management MCP tools."""
server = integrated_server
ctx = MockContext()
# Setup: Create session and agent
session_result = await self._invoke_tool(
server, "create_session",
root_path="/test/conv",
session_name="ConvTest",
ctx=ctx
)
agent_result = await self._invoke_tool(
server, "create_agent",
session_id=session_result["session_id"],
agent_name="Agent_4",
ctx=ctx
)
# Clear conversation
clear_result = await self._invoke_tool(
server, "clear_agent_conversation",
agent_name="Agent_4",
preserve_state=True,
ctx=ctx
)
assert clear_result["success"]
# Start new conversation
start_result = await self._invoke_tool(
server, "start_new_agent_conversation",
agent_name="Agent_4",
restore_context=True,
ctx=ctx
)
assert start_result["success"]
@pytest.mark.asyncio
async def test_error_handling_in_tools(self, integrated_server):
"""Test error handling in MCP tools."""
server = integrated_server
ctx = MockContext()
# Test invalid session ID
with pytest.raises(Exception):
await self._invoke_tool(
server, "create_agent",
session_id="invalid_session",
agent_name="Agent_X",
ctx=ctx
)
# Verify error was reported to context
assert len(ctx.error_messages) > 0
# Test invalid agent name format
ctx = MockContext() # Reset context
with pytest.raises(Exception):
await self._invoke_tool(
server, "create_agent",
session_id="some_session",
agent_name="InvalidName", # Should be Agent_#
ctx=ctx
)
@pytest.mark.asyncio
async def test_concurrent_tool_invocations(self, integrated_server):
"""Test concurrent MCP tool invocations."""
server = integrated_server
# Create multiple sessions concurrently
tasks = []
for i in range(3):
task = self._invoke_tool(
server, "create_session",
root_path=f"/test/concurrent_{i}",
session_name=f"ConcurrentSession_{i}",
ctx=MockContext()
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Verify all succeeded
for result in results:
assert not isinstance(result, Exception)
assert result["success"]
# Get status of all sessions
status = await self._invoke_tool(
server, "get_session_status",
ctx=MockContext()
)
assert len(status["sessions"]) >= 3
@pytest.mark.asyncio
async def test_tool_security_validation(self, basic_server):
"""Test security validation in MCP tools."""
server = basic_server
ctx = MockContext()
# Test path traversal attempt
with pytest.raises(Exception):
await self._invoke_tool(
server, "create_session",
root_path="../../../etc/passwd",
session_name="MaliciousSession",
ctx=ctx
)
# Test SQL injection attempt in agent name
with pytest.raises(Exception):
await self._invoke_tool(
server, "create_agent",
session_id="test_session",
agent_name="Agent_1'; DROP TABLE agents; --",
ctx=ctx
)
async def _invoke_tool(self, server, tool_name: str, **kwargs) -> Dict[str, Any]:
"""Helper to invoke MCP tool by name."""
# Find tool
tool = None
for t in server.mcp._tools.values():
if t.name == tool_name:
tool = t
break
if not tool:
raise ValueError(f"Tool {tool_name} not found")
# Invoke tool handler
return await tool.handler(**kwargs)
class TestPropertyBasedIntegration:
"""Property-based tests for server integration."""
@pytest.mark.asyncio
@given(
session_names=st.lists(
st.text(min_size=1, max_size=20, alphabet=st.characters(whitelist_categories=["L", "N"])),
min_size=1,
max_size=3
),
agent_counts=st.lists(st.integers(min_value=0, max_value=3), min_size=1, max_size=3)
)
async def test_session_agent_consistency(self, session_names, agent_counts):
"""Property: System maintains consistency across sessions and agents."""
server = IntegratedAgentOrchestrationServer()
with patch('src.core.iterm_manager.ITermManager', MockITermManager):
with patch('src.core.claude_manager.ClaudeCodeManager', MockClaudeCodeManager):
await server.initialize()
try:
created_sessions = []
total_agents = 0
# Create sessions and agents
for i, (session_name, agent_count) in enumerate(zip(session_names, agent_counts)):
# Create session
session_result = await self._invoke_tool(
server, "create_session",
root_path=f"/test/prop_{i}",
session_name=session_name,
ctx=MockContext()
)
assert session_result["success"]
created_sessions.append(session_result["session_id"])
# Create agents in session
for j in range(min(agent_count, 3)): # Limit agents per session
agent_result = await self._invoke_tool(
server, "create_agent",
session_id=session_result["session_id"],
agent_name=f"Agent_{j + 1}",
ctx=MockContext()
)
if agent_result["success"]:
total_agents += 1
# Verify system state consistency
status = await self._invoke_tool(
server, "get_session_status",
ctx=MockContext()
)
# Properties to verify
assert len(status["sessions"]) == len(created_sessions)
assert status["total_agents"] == total_agents
assert status["active_agents"] == total_agents
finally:
await server.coordinator.shutdown()
async def _invoke_tool(self, server, tool_name: str, **kwargs) -> Dict[str, Any]:
"""Helper to invoke MCP tool by name."""
tool = None
for t in server.mcp._tools.values():
if t.name == tool_name:
tool = t
break
if not tool:
raise ValueError(f"Tool {tool_name} not found")
return await tool.handler(**kwargs)
class TestServerLifecycle:
"""Test server lifecycle management."""
@pytest.mark.asyncio
async def test_server_graceful_shutdown(self):
"""Test server shuts down gracefully."""
server = IntegratedAgentOrchestrationServer()
with patch('src.core.iterm_manager.ITermManager', MockITermManager):
with patch('src.core.claude_manager.ClaudeCodeManager', MockClaudeCodeManager):
await server.initialize()
# Create some activity
await server._invoke_tool(
"create_session",
root_path="/test/shutdown",
session_name="ShutdownTest",
ctx=MockContext()
)
# Shutdown
await server.coordinator.shutdown()
# Verify clean shutdown
assert not server.coordinator._monitoring_active
assert len(server.coordinator._active_operations) == 0
@pytest.mark.asyncio
async def test_server_restart_capability(self):
"""Test server can be restarted after shutdown."""
server = IntegratedAgentOrchestrationServer()
# First initialization
with patch('src.core.iterm_manager.ITermManager', MockITermManager):
with patch('src.core.claude_manager.ClaudeCodeManager', MockClaudeCodeManager):
await server.initialize()
# Shutdown
await server.coordinator.shutdown()
# Re-initialize (would need to reset state in real implementation)
# This test verifies the concept
assert not server.coordinator._monitoring_active
async def _invoke_tool(self, tool_name: str, **kwargs):
"""Helper method for tool invocation."""
# Implementation would go here
pass
if __name__ == "__main__":
pytest.main([__file__, "-v"])