Skip to main content
Glama
agent_communication.md40.2 kB
# Agent Communication Protocol Specification **Status**: Phase 1 Implementation - Task 1.3 **Version**: 1.0 **Author**: Task Graph System Team **Last Updated**: 2025-08-10 ## Overview This document specifies the standardized communication protocols for agent-to-agent interaction within the AutoDocs MCP Task Graph Workflow System. These protocols enable coordinated multi-agent workflows while maintaining backward compatibility with existing MCP protocol implementation. ## Architecture Overview ### Communication Layers ``` ┌─────────────────────────────────────────────────────────────┐ │ MCP Protocol Layer │ │ (Existing MCP tools: scan_dependencies, get_package_docs) │ └─────────────────────────────────────────────────────────────┘ │ ┌─────────────────────────────────────────────────────────────┐ │ Agent Communication Protocol Layer │ │ (New workflow tools: submit_complex_request, etc.) │ └─────────────────────────────────────────────────────────────┘ │ ┌─────────────────────────────────────────────────────────────┐ │ Message Routing & Serialization │ │ (Communication service implementation) │ └─────────────────────────────────────────────────────────────┘ ``` ## Core Communication Patterns ### 1. Task Assignment Pattern **Purpose**: Orchestrator assigns tasks to specialized agents **Direction**: Orchestrator → Specialized Agent **Message Type**: `TASK_ASSIGNMENT` ```json { "message_type": "TASK_ASSIGNMENT", "message_id": "task_001_assign", "source_agent": "workflow_orchestrator", "target_agent": "core_services", "timestamp": "2025-08-10T10:30:00Z", "task": { "task_id": "task_001", "task_type": "dependency_analysis", "priority": "high", "parameters": { "package_name": "fastapi", "version_constraint": ">=0.68.0", "context_scope": "runtime" }, "dependencies": ["task_000"], "expected_output": "PackageDependencyContext", "timeout_seconds": 60 }, "workflow_context": { "workflow_id": "wf_12345", "user_request": "Analyze FastAPI ecosystem for project integration", "context_budget": 50000, "quality_requirements": ["validation", "completeness"] } } ``` ### 2. Progress Reporting Pattern **Purpose**: Agents report task progress to orchestrator **Direction**: Specialized Agent → Orchestrator **Message Type**: `PROGRESS_REPORT` ```json { "message_type": "PROGRESS_REPORT", "message_id": "task_001_progress", "source_agent": "core_services", "target_agent": "workflow_orchestrator", "timestamp": "2025-08-10T10:31:30Z", "task_id": "task_001", "status": "in_progress", "progress_percentage": 65, "intermediate_results": { "discovered_dependencies": 12, "analyzed_packages": 8, "estimated_completion": "2025-08-10T10:32:15Z" }, "metrics": { "cache_hits": 6, "cache_misses": 2, "api_calls": 2, "token_usage": 1250 }, "issues": [] } ``` ### 3. Task Completion Pattern **Purpose**: Agents deliver final results to orchestrator **Direction**: Specialized Agent → Orchestrator **Message Type**: `TASK_COMPLETION` ```json { "message_type": "TASK_COMPLETION", "message_id": "task_001_complete", "source_agent": "core_services", "target_agent": "workflow_orchestrator", "timestamp": "2025-08-10T10:32:10Z", "task_id": "task_001", "status": "completed", "execution_time_seconds": 95, "result": { "primary_package": { "name": "fastapi", "version": "0.104.1", "documentation": "...", "dependencies": [...] }, "runtime_dependencies": [...], "context_metadata": { "token_count": 2450, "quality_score": 0.95 } }, "performance_metrics": { "cache_hit_rate": 0.75, "api_response_time_avg": 120, "total_token_usage": 2450 }, "quality_validation": { "validation_passed": true, "completeness_score": 0.98, "accuracy_indicators": {...} } } ``` ### 4. Agent Collaboration Pattern **Purpose**: Agents coordinate directly for complex tasks **Direction**: Bidirectional between Specialized Agents **Message Type**: `COLLABORATION_REQUEST` ```json { "message_type": "COLLABORATION_REQUEST", "message_id": "collab_001", "source_agent": "core_services", "target_agent": "testing_specialist", "timestamp": "2025-08-10T10:33:00Z", "collaboration_type": "validation_request", "context": { "workflow_id": "wf_12345", "requesting_task_id": "task_001", "shared_context": { "package_analysis": {...}, "quality_requirements": [...] } }, "request": { "validation_type": "dependency_compatibility", "target_data": {...}, "validation_criteria": [...], "urgency": "normal" } } ``` ### 5. Error Handling Pattern **Purpose**: Report and coordinate error recovery **Direction**: Any Agent → Orchestrator **Message Type**: `ERROR_REPORT` ```json { "message_type": "ERROR_REPORT", "message_id": "error_001", "source_agent": "core_services", "target_agent": "workflow_orchestrator", "timestamp": "2025-08-10T10:35:00Z", "task_id": "task_002", "error": { "error_type": "NetworkTimeout", "error_code": "NET_001", "message": "PyPI API request timed out", "recoverable": true, "suggested_action": "retry_with_backoff", "context": { "package_name": "requests", "attempt_number": 2, "timeout_duration": 30 } }, "recovery_options": [ { "option": "retry_task", "probability_success": 0.85, "estimated_delay": 15 }, { "option": "fallback_cached", "probability_success": 0.95, "quality_impact": "minor" } ] } ``` ## Message Serialization and Validation ### Message Schema Structure All agent communication messages follow this base schema: ```python from pydantic import BaseModel, Field from typing import Any, Dict, List, Optional, Union from datetime import datetime from enum import Enum class MessageType(str, Enum): TASK_ASSIGNMENT = "TASK_ASSIGNMENT" PROGRESS_REPORT = "PROGRESS_REPORT" TASK_COMPLETION = "TASK_COMPLETION" COLLABORATION_REQUEST = "COLLABORATION_REQUEST" COLLABORATION_RESPONSE = "COLLABORATION_RESPONSE" ERROR_REPORT = "ERROR_REPORT" CONTEXT_SYNC = "CONTEXT_SYNC" WORKFLOW_STATUS = "WORKFLOW_STATUS" class AgentMessage(BaseModel): """Base message for agent-to-agent communication.""" message_type: MessageType message_id: str = Field(..., description="Unique message identifier") source_agent: str = Field(..., description="Sending agent identifier") target_agent: str = Field(..., description="Target agent identifier") timestamp: datetime = Field(default_factory=datetime.utcnow) correlation_id: Optional[str] = Field(None, description="For message threading") priority: str = Field(default="normal", description="Message priority level") payload: Dict[str, Any] = Field(..., description="Message-specific data") ``` ### Message Routing Rules 1. **Direct Agent Communication** - Messages between specialized agents route directly - No orchestrator intervention required - Used for collaboration and validation requests 2. **Orchestrator-Mediated Communication** - Task assignments and completions always go through orchestrator - Progress reports always go to orchestrator - Error reports typically go to orchestrator unless agent-specific 3. **Broadcast Communication** - Context synchronization messages broadcast to relevant agents - Workflow status updates broadcast to all participating agents - System notifications broadcast as needed ### Message Persistence and Reliability ```python class MessageDelivery(BaseModel): """Message delivery tracking and reliability.""" message_id: str attempts: int = 0 max_attempts: int = 3 delivery_status: str = "pending" # pending, delivered, failed, expired created_at: datetime delivered_at: Optional[datetime] = None error_details: Optional[str] = None def should_retry(self) -> bool: return self.attempts < self.max_attempts and self.delivery_status == "pending" ``` ## Communication Service Implementation ### Core Service Architecture ```python from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Callable import asyncio import json from datetime import datetime, timedelta class CommunicationService(ABC): """Abstract base for agent communication service.""" @abstractmethod async def send_message(self, message: AgentMessage) -> bool: """Send message to target agent.""" pass @abstractmethod async def register_handler(self, message_type: MessageType, handler: Callable) -> None: """Register message handler for specific message type.""" pass @abstractmethod async def start_listening(self) -> None: """Start listening for incoming messages.""" pass @abstractmethod async def stop_listening(self) -> None: """Stop listening and cleanup resources.""" pass class InProcessCommunicationService(CommunicationService): """In-process message routing for single-process deployment.""" def __init__(self): self.message_handlers: Dict[str, Dict[MessageType, Callable]] = {} self.message_queue: asyncio.Queue = asyncio.Queue() self.delivery_tracking: Dict[str, MessageDelivery] = {} self.running = False async def send_message(self, message: AgentMessage) -> bool: """Route message to target agent handler.""" delivery = MessageDelivery( message_id=message.message_id, created_at=datetime.utcnow() ) self.delivery_tracking[message.message_id] = delivery try: target_handlers = self.message_handlers.get(message.target_agent, {}) handler = target_handlers.get(message.message_type) if handler: await self.message_queue.put((handler, message)) delivery.delivery_status = "delivered" delivery.delivered_at = datetime.utcnow() return True else: delivery.delivery_status = "failed" delivery.error_details = f"No handler for {message.message_type} on {message.target_agent}" return False except Exception as e: delivery.attempts += 1 delivery.error_details = str(e) if delivery.should_retry(): # Schedule retry asyncio.create_task(self._retry_message(message, delivery)) else: delivery.delivery_status = "failed" return False async def register_handler(self, agent_id: str, message_type: MessageType, handler: Callable) -> None: """Register message handler for agent and message type.""" if agent_id not in self.message_handlers: self.message_handlers[agent_id] = {} self.message_handlers[agent_id][message_type] = handler async def start_listening(self) -> None: """Start processing message queue.""" self.running = True asyncio.create_task(self._process_messages()) async def stop_listening(self) -> None: """Stop processing and cleanup.""" self.running = False async def _process_messages(self) -> None: """Process messages from queue.""" while self.running: try: handler, message = await asyncio.wait_for( self.message_queue.get(), timeout=1.0 ) await handler(message) except asyncio.TimeoutError: continue except Exception as e: # Log error but continue processing print(f"Error processing message: {e}") async def _retry_message(self, message: AgentMessage, delivery: MessageDelivery) -> None: """Retry failed message delivery.""" await asyncio.sleep(2 ** delivery.attempts) # Exponential backoff await self.send_message(message) ``` ## MCP Tool Extensions ### New MCP Tools for Workflow Operations #### 1. submit_complex_request ```python @mcp.tool async def submit_complex_request( request_description: str, context_scope: str = "smart", priority: str = "normal", quality_requirements: List[str] = None ) -> Dict[str, Any]: """ Submit a complex request for multi-agent workflow processing. Args: request_description: Natural language description of the complex task context_scope: Scope for dependency context ("primary_only", "runtime", "smart") priority: Request priority ("low", "normal", "high", "urgent") quality_requirements: List of quality requirements ("validation", "completeness", "accuracy") Returns: Workflow execution details and tracking information """ if communication_service is None or workflow_orchestrator is None: return { "success": False, "error": { "code": "workflow_not_initialized", "message": "Workflow orchestration not available", "suggestion": "System may be starting up or in maintenance mode" } } try: # Create workflow request workflow_request = ComplexWorkflowRequest( description=request_description, context_scope=context_scope, priority=priority, quality_requirements=quality_requirements or ["validation"], submitted_at=datetime.utcnow() ) # Submit to orchestrator workflow_id = await workflow_orchestrator.submit_request(workflow_request) return { "success": True, "workflow_id": workflow_id, "status": "submitted", "estimated_completion": "calculating", "tracking_url": f"/workflow/{workflow_id}/status" } except Exception as e: return { "success": False, "error": { "code": "workflow_submission_failed", "message": str(e), "suggestion": "Check request parameters and try again" } } ``` #### 2. get_workflow_status ```python @mcp.tool async def get_workflow_status(workflow_id: str) -> Dict[str, Any]: """ Get detailed status of a workflow execution. Args: workflow_id: Unique identifier for the workflow Returns: Comprehensive workflow status and progress information """ if workflow_orchestrator is None: return { "success": False, "error": { "code": "workflow_not_initialized", "message": "Workflow orchestration not available" } } try: status = await workflow_orchestrator.get_workflow_status(workflow_id) return { "success": True, "workflow_id": workflow_id, "status": status.status, "progress_percentage": status.progress_percentage, "current_phase": status.current_phase, "active_tasks": status.active_tasks, "completed_tasks": status.completed_tasks, "failed_tasks": status.failed_tasks, "estimated_completion": status.estimated_completion, "performance_metrics": status.performance_metrics, "intermediate_results": status.intermediate_results } except WorkflowNotFound: return { "success": False, "error": { "code": "workflow_not_found", "message": f"Workflow {workflow_id} not found", "suggestion": "Check workflow ID and ensure it exists" } } except Exception as e: return { "success": False, "error": { "code": "status_retrieval_failed", "message": str(e) } } ``` #### 3. agent_collaboration_request ```python @mcp.tool async def agent_collaboration_request( source_agent: str, target_agent: str, collaboration_type: str, request_data: Dict[str, Any], timeout_seconds: int = 60 ) -> Dict[str, Any]: """ Request direct collaboration between agents. Args: source_agent: Agent making the collaboration request target_agent: Agent being requested to collaborate collaboration_type: Type of collaboration ("validation", "enhancement", "review") request_data: Collaboration-specific data and parameters timeout_seconds: Maximum time to wait for collaboration response Returns: Collaboration response and results """ if communication_service is None: return { "success": False, "error": { "code": "communication_not_initialized", "message": "Agent communication not available" } } try: # Create collaboration message collaboration_message = AgentMessage( message_type=MessageType.COLLABORATION_REQUEST, message_id=f"collab_{datetime.utcnow().timestamp()}", source_agent=source_agent, target_agent=target_agent, payload={ "collaboration_type": collaboration_type, "request_data": request_data, "timeout_seconds": timeout_seconds } ) # Send collaboration request success = await communication_service.send_message(collaboration_message) if success: # Wait for response response = await communication_service.wait_for_response( collaboration_message.message_id, timeout_seconds=timeout_seconds ) return { "success": True, "collaboration_id": collaboration_message.message_id, "response": response, "collaboration_type": collaboration_type } else: return { "success": False, "error": { "code": "collaboration_delivery_failed", "message": "Failed to deliver collaboration request", "suggestion": "Check target agent availability" } } except asyncio.TimeoutError: return { "success": False, "error": { "code": "collaboration_timeout", "message": f"Collaboration timed out after {timeout_seconds} seconds", "suggestion": "Try with longer timeout or check target agent status" } } except Exception as e: return { "success": False, "error": { "code": "collaboration_failed", "message": str(e) } } ``` ## Quality Assurance and Validation ### Message Validation ```python class MessageValidator: """Validates agent communication messages.""" @staticmethod def validate_message(message: AgentMessage) -> List[str]: """Validate message structure and content.""" errors = [] # Required field validation if not message.message_id: errors.append("message_id is required") if not message.source_agent: errors.append("source_agent is required") if not message.target_agent: errors.append("target_agent is required") # Agent ID validation valid_agents = { "workflow_orchestrator", "context_coordinator", "core_services", "testing_specialist", "production_ops", "docs_integration", "mcp_protocol", "technical_writer", "product_manager", "agent_design_architect" } if message.source_agent not in valid_agents: errors.append(f"Invalid source_agent: {message.source_agent}") if message.target_agent not in valid_agents: errors.append(f"Invalid target_agent: {message.target_agent}") # Message type specific validation if message.message_type == MessageType.TASK_ASSIGNMENT: if "task" not in message.payload: errors.append("TASK_ASSIGNMENT requires task in payload") elif message.message_type == MessageType.PROGRESS_REPORT: if "task_id" not in message.payload: errors.append("PROGRESS_REPORT requires task_id in payload") return errors ``` ### Protocol Compatibility ```python class ProtocolCompatibility: """Ensures backward compatibility with existing MCP protocol.""" @staticmethod def wrap_legacy_tool(tool_name: str, tool_function: Callable) -> Callable: """Wrap existing MCP tool to work with workflow system.""" async def wrapped_tool(*args, **kwargs): # Execute original tool result = await tool_function(*args, **kwargs) # Add workflow compatibility metadata if isinstance(result, dict) and result.get("success"): result["workflow_compatible"] = True result["agent_source"] = "legacy_mcp_tool" result["communication_protocol"] = "mcp_direct" return result return wrapped_tool @staticmethod def ensure_backward_compatibility(workflow_result: Dict[str, Any]) -> Dict[str, Any]: """Ensure workflow results are compatible with existing clients.""" # Maintain existing response structure if "context" in workflow_result: # Transform workflow context to legacy format workflow_result["data"] = workflow_result.get("context", {}) # Preserve success/error structure if "workflow_status" in workflow_result: workflow_result["success"] = workflow_result["workflow_status"] == "completed" return workflow_result ``` ## Integration with Existing MCP Server ### Server Extension Points ```python # In main.py - Service initialization async def initialize_communication_services() -> None: """Initialize agent communication services alongside existing services.""" global communication_service, workflow_orchestrator # Initialize communication service communication_service = InProcessCommunicationService() await communication_service.start_listening() # Initialize workflow orchestrator workflow_orchestrator = WorkflowOrchestrator(communication_service) # Register message handlers for existing agents await register_agent_handlers() logger.info("Agent communication services initialized") async def register_agent_handlers() -> None: """Register message handlers for all agent types.""" # Core services agent handlers await communication_service.register_handler( "core_services", MessageType.TASK_ASSIGNMENT, handle_core_services_task ) # Testing specialist handlers await communication_service.register_handler( "testing_specialist", MessageType.COLLABORATION_REQUEST, handle_testing_collaboration ) # Additional agent handlers... ``` ### Error Handling Integration ```python def integrate_workflow_errors_with_existing_error_handling(): """Integrate workflow errors with existing AutoDocs error handling.""" # Extend existing ErrorFormatter class WorkflowErrorFormatter(ErrorFormatter): @classmethod def format_workflow_error(cls, error: Exception, workflow_context: Dict[str, Any]) -> Dict[str, Any]: base_error = cls.format_exception(error, workflow_context) # Add workflow-specific error context base_error.update({ "workflow_id": workflow_context.get("workflow_id"), "failed_task": workflow_context.get("failed_task"), "agent_involved": workflow_context.get("agent_involved"), "recovery_options": cls._get_workflow_recovery_options(error, workflow_context) }) return base_error ``` ## Security and Authentication ### Message Security ```python class SecureMessageTransport: """Secure transport for agent messages.""" def __init__(self, encryption_key: Optional[str] = None): self.encryption_enabled = encryption_key is not None self.encryption_key = encryption_key def encrypt_message(self, message: AgentMessage) -> str: """Encrypt sensitive message content.""" if not self.encryption_enabled: return message.json() # Implement encryption for sensitive payloads # (Implementation depends on security requirements) return message.json() # Placeholder def decrypt_message(self, encrypted_data: str) -> AgentMessage: """Decrypt message content.""" if not self.encryption_enabled: return AgentMessage.parse_raw(encrypted_data) # Implement decryption return AgentMessage.parse_raw(encrypted_data) # Placeholder ``` ### Agent Authentication ```python class AgentAuthenticator: """Authenticate agent communications.""" def __init__(self): self.trusted_agents = { "workflow_orchestrator": {"permissions": ["all"]}, "context_coordinator": {"permissions": ["context_management"]}, "core_services": {"permissions": ["task_execution", "api_access"]}, # ... other agents } def authenticate_message(self, message: AgentMessage) -> bool: """Verify message authenticity.""" source_agent = message.source_agent # Check if agent is trusted if source_agent not in self.trusted_agents: return False # Verify message signature (if implemented) # return self._verify_signature(message) return True def authorize_message(self, message: AgentMessage) -> bool: """Authorize message based on agent permissions.""" source_agent = message.source_agent agent_config = self.trusted_agents.get(source_agent, {}) permissions = agent_config.get("permissions", []) # Check if agent has permission for message type required_permission = self._get_required_permission(message.message_type) return "all" in permissions or required_permission in permissions def _get_required_permission(self, message_type: MessageType) -> str: """Get required permission for message type.""" permission_map = { MessageType.TASK_ASSIGNMENT: "task_management", MessageType.PROGRESS_REPORT: "progress_reporting", MessageType.COLLABORATION_REQUEST: "agent_collaboration", MessageType.ERROR_REPORT: "error_reporting" } return permission_map.get(message_type, "unknown") ``` ## Performance Considerations ### Message Throughput Optimization ```python class PerformanceOptimizedCommunication(InProcessCommunicationService): """Performance optimized version of communication service.""" def __init__(self, batch_size: int = 10, batch_timeout: float = 0.1): super().__init__() self.batch_size = batch_size self.batch_timeout = batch_timeout self.message_batch: List[Tuple[Callable, AgentMessage]] = [] async def _process_messages(self) -> None: """Process messages in batches for better throughput.""" while self.running: # Collect batch of messages batch_start = asyncio.get_event_loop().time() while (len(self.message_batch) < self.batch_size and (asyncio.get_event_loop().time() - batch_start) < self.batch_timeout): try: handler, message = await asyncio.wait_for( self.message_queue.get(), timeout=self.batch_timeout ) self.message_batch.append((handler, message)) except asyncio.TimeoutError: break # Process batch concurrently if self.message_batch: tasks = [ asyncio.create_task(handler(message)) for handler, message in self.message_batch ] await asyncio.gather(*tasks, return_exceptions=True) self.message_batch.clear() ``` ### Memory Management ```python class MessageMemoryManager: """Manage memory usage for message storage and tracking.""" def __init__(self, max_messages: int = 10000, cleanup_interval: int = 300): self.max_messages = max_messages self.cleanup_interval = cleanup_interval self.message_history: Dict[str, Tuple[datetime, AgentMessage]] = {} self._start_cleanup_task() def store_message(self, message: AgentMessage) -> None: """Store message with automatic cleanup.""" if len(self.message_history) >= self.max_messages: self._cleanup_old_messages() self.message_history[message.message_id] = (datetime.utcnow(), message) def _cleanup_old_messages(self) -> None: """Remove old messages to free memory.""" cutoff_time = datetime.utcnow() - timedelta(seconds=self.cleanup_interval) messages_to_remove = [ msg_id for msg_id, (timestamp, _) in self.message_history.items() if timestamp < cutoff_time ] for msg_id in messages_to_remove: del self.message_history[msg_id] def _start_cleanup_task(self) -> None: """Start periodic cleanup task.""" asyncio.create_task(self._periodic_cleanup()) async def _periodic_cleanup(self) -> None: """Periodic cleanup of old messages.""" while True: await asyncio.sleep(self.cleanup_interval) self._cleanup_old_messages() ``` ## Testing and Validation ### Communication Protocol Testing ```python import pytest from unittest.mock import AsyncMock, MagicMock class TestAgentCommunication: """Test suite for agent communication protocols.""" @pytest.fixture async def communication_service(self): service = InProcessCommunicationService() await service.start_listening() yield service await service.stop_listening() @pytest.mark.asyncio async def test_task_assignment_flow(self, communication_service): """Test complete task assignment and completion flow.""" # Setup mock handlers task_handler = AsyncMock() await communication_service.register_handler( "core_services", MessageType.TASK_ASSIGNMENT, task_handler ) # Create task assignment message task_message = AgentMessage( message_type=MessageType.TASK_ASSIGNMENT, message_id="test_task_001", source_agent="workflow_orchestrator", target_agent="core_services", payload={ "task": { "task_id": "test_task_001", "task_type": "dependency_analysis", "parameters": {"package_name": "requests"} } } ) # Send message success = await communication_service.send_message(task_message) assert success # Verify handler was called await asyncio.sleep(0.1) # Allow message processing task_handler.assert_called_once() # Verify message content called_message = task_handler.call_args[0][0] assert called_message.message_type == MessageType.TASK_ASSIGNMENT assert called_message.payload["task"]["task_id"] == "test_task_001" @pytest.mark.asyncio async def test_collaboration_pattern(self, communication_service): """Test agent-to-agent collaboration pattern.""" # Setup handlers for both agents validation_handler = AsyncMock() await communication_service.register_handler( "testing_specialist", MessageType.COLLABORATION_REQUEST, validation_handler ) # Create collaboration request collab_message = AgentMessage( message_type=MessageType.COLLABORATION_REQUEST, message_id="collab_001", source_agent="core_services", target_agent="testing_specialist", payload={ "collaboration_type": "validation_request", "request_data": {"package_analysis": {...}} } ) # Send collaboration request success = await communication_service.send_message(collab_message) assert success # Verify handler was called await asyncio.sleep(0.1) validation_handler.assert_called_once() @pytest.mark.asyncio async def test_message_validation(self): """Test message validation functionality.""" # Valid message valid_message = AgentMessage( message_type=MessageType.TASK_ASSIGNMENT, message_id="valid_001", source_agent="workflow_orchestrator", target_agent="core_services", payload={"task": {"task_id": "test"}} ) errors = MessageValidator.validate_message(valid_message) assert len(errors) == 0 # Invalid message - missing required field invalid_message = AgentMessage( message_type=MessageType.TASK_ASSIGNMENT, message_id="", # Empty message ID source_agent="workflow_orchestrator", target_agent="core_services", payload={} ) errors = MessageValidator.validate_message(invalid_message) assert len(errors) > 0 assert any("message_id is required" in error for error in errors) @pytest.mark.asyncio async def test_error_handling_and_recovery(self, communication_service): """Test error handling and message recovery.""" # Setup handler that raises an exception failing_handler = AsyncMock(side_effect=Exception("Handler failed")) await communication_service.register_handler( "core_services", MessageType.TASK_ASSIGNMENT, failing_handler ) # Create message message = AgentMessage( message_type=MessageType.TASK_ASSIGNMENT, message_id="failing_task_001", source_agent="workflow_orchestrator", target_agent="core_services", payload={"task": {"task_id": "failing_task"}} ) # Send message success = await communication_service.send_message(message) # Should initially fail but have retry tracking delivery = communication_service.delivery_tracking.get(message.message_id) assert delivery is not None assert delivery.attempts > 0 ``` ## Documentation and Examples ### Usage Examples ```python # Example 1: Simple task assignment async def assign_documentation_task(): """Example of assigning a documentation task to an agent.""" message = AgentMessage( message_type=MessageType.TASK_ASSIGNMENT, message_id="doc_task_001", source_agent="workflow_orchestrator", target_agent="core_services", payload={ "task": { "task_id": "doc_task_001", "task_type": "get_package_docs_with_context", "parameters": { "package_name": "fastapi", "version_constraint": ">=0.68.0", "include_dependencies": True, "context_scope": "smart" } } } ) await communication_service.send_message(message) # Example 2: Agent collaboration for validation async def request_validation_collaboration(): """Example of requesting validation collaboration between agents.""" message = AgentMessage( message_type=MessageType.COLLABORATION_REQUEST, message_id="validation_collab_001", source_agent="core_services", target_agent="testing_specialist", payload={ "collaboration_type": "dependency_validation", "request_data": { "package_analysis": {...}, "validation_criteria": ["compatibility", "security", "performance"] } } ) await communication_service.send_message(message) # Example 3: Progress reporting async def report_task_progress(): """Example of reporting task progress to orchestrator.""" message = AgentMessage( message_type=MessageType.PROGRESS_REPORT, message_id="progress_001", source_agent="core_services", target_agent="workflow_orchestrator", payload={ "task_id": "doc_task_001", "status": "in_progress", "progress_percentage": 65, "intermediate_results": { "packages_analyzed": 5, "dependencies_resolved": 12 } } ) await communication_service.send_message(message) ``` ## Future Enhancements ### Planned Protocol Extensions 1. **Stream Processing Support** - Support for streaming large datasets between agents - Incremental result delivery for long-running tasks - Real-time progress updates 2. **Advanced Routing** - Load balancing across multiple instances of the same agent type - Geographic routing for distributed deployments - Priority-based message queuing 3. **Enhanced Security** - Message encryption for sensitive data - Digital signatures for message authentication - Role-based access control for message types 4. **Performance Optimizations** - Message compression for large payloads - Connection pooling for external communications - Adaptive batching based on system load 5. **Monitoring and Observability** - Detailed message flow tracing - Performance metrics collection - Anomaly detection in communication patterns ## Conclusion This Agent Communication Protocol Framework provides a robust, scalable foundation for multi-agent coordination within the AutoDocs MCP Task Graph Workflow System. The protocol maintains backward compatibility with existing MCP tools while enabling sophisticated agent-to-agent collaboration patterns. Key benefits of this implementation: 1. **Standardized Communication**: Consistent message formats across all agent interactions 2. **Backward Compatibility**: Existing MCP tools continue to work without modification 3. **Extensible Architecture**: New message types and agents can be easily added 4. **Robust Error Handling**: Comprehensive error detection and recovery mechanisms 5. **Performance Optimized**: Efficient message routing and processing 6. **Security Aware**: Authentication and authorization for agent communications The protocol supports both simple point-to-point communication and complex multi-agent workflows, providing the foundation for emergent intelligence through coordinated agent collaboration.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bradleyfay/autodoc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server