Skip to main content
Glama

MCP Orchestration Server

agent_template.py14.7 kB
#!/usr/bin/env python3 """ Agent Template - Production Ready Template for creating new MCP agents with full compliance and auto-discovery """ import asyncio from datetime import datetime from typing import Dict, List, Any, Optional import logging # Add project root to path import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from agents.base_agent import BaseMCPAgent, AgentCapability, MCPMessage # MongoDB integration (optional) try: from mcp_mongodb_integration import MCPMongoDBIntegration MONGODB_AVAILABLE = True except ImportError: MONGODB_AVAILABLE = False # Agent metadata for auto-discovery (REQUIRED) AGENT_METADATA = { "id": "template_agent", # CHANGE THIS: Unique agent identifier "name": "Template Agent", # CHANGE THIS: Human-readable agent name "version": "1.0.0", # CHANGE THIS: Agent version "author": "Your Name", # CHANGE THIS: Agent author "description": "Template agent for creating new MCP agents", # CHANGE THIS: Agent description "category": "template", # CHANGE THIS: Agent category (computation, data, communication, processing, etc.) "status": "template", # CHANGE THIS: live, inactive, future, or template "dependencies": [], # CHANGE THIS: List of required Python packages "auto_load": False, # CHANGE THIS: Whether to auto-load this agent "priority": 5, # CHANGE THIS: Loading priority (1=highest, 10=lowest) "health_check_interval": 60, # CHANGE THIS: Health check interval in seconds "max_failures": 3, # CHANGE THIS: Maximum failures before marking unhealthy "recovery_timeout": 120 # CHANGE THIS: Recovery timeout in seconds } class TemplateAgent(BaseMCPAgent): """Template Agent - Copy and modify this class for new agents.""" def __init__(self): # CHANGE THIS: Define your agent's capabilities capabilities = [ AgentCapability( name="template_capability", # CHANGE THIS: Capability name description="Template capability for demonstration", # CHANGE THIS: Capability description input_types=["text", "dict"], # CHANGE THIS: Supported input types output_types=["dict"], # CHANGE THIS: Supported output types methods=["process", "info"], # CHANGE THIS: Supported methods version="1.0.0" # CHANGE THIS: Capability version ) ] # CHANGE THIS: Agent ID and name super().__init__("template_agent", "Template Agent", capabilities) # Production configuration self.failure_count = 0 self.last_health_check = datetime.now() # CHANGE THIS: Add your agent-specific configuration self.max_input_length = 1000 self.processing_timeout = 30 # Initialize MongoDB integration (optional) self.mongodb_integration = None if MONGODB_AVAILABLE: try: self.mongodb_integration = MCPMongoDBIntegration() asyncio.create_task(self._init_mongodb()) except Exception as e: self.logger.error(f"Failed to initialize MongoDB: {e}") self.logger.info("Template Agent initialized") async def _init_mongodb(self): """Initialize MongoDB connection (optional).""" if self.mongodb_integration: try: connected = await self.mongodb_integration.connect() if connected: self.logger.info("Template Agent connected to MongoDB") else: self.logger.warning("Template Agent failed to connect to MongoDB") self.failure_count += 1 except Exception as e: self.logger.error(f"Template Agent MongoDB initialization error: {e}") self.failure_count += 1 async def health_check(self) -> Dict[str, Any]: """Perform health check for production monitoring (REQUIRED).""" try: # CHANGE THIS: Add your agent-specific health checks # Example: Test core functionality test_result = await self.test_core_functionality() health_status = { "agent_id": self.agent_id, "status": "healthy" if test_result else "unhealthy", "last_check": datetime.now().isoformat(), "failure_count": self.failure_count, "mongodb_connected": self.mongodb_integration is not None, "uptime": (datetime.now() - self.last_health_check).total_seconds(), "version": AGENT_METADATA["version"], # CHANGE THIS: Add your agent-specific health metrics "custom_metrics": { "test_functionality": "passed" if test_result else "failed" } } self.last_health_check = datetime.now() # Reset failure count on successful health check if health_status["status"] == "healthy": self.failure_count = 0 return health_status except Exception as e: self.failure_count += 1 self.logger.error(f"Health check failed: {e}") return { "agent_id": self.agent_id, "status": "unhealthy", "error": str(e), "failure_count": self.failure_count, "last_check": datetime.now().isoformat() } async def test_core_functionality(self) -> bool: """Test core functionality for health check (CHANGE THIS).""" try: # CHANGE THIS: Implement your agent's core functionality test # Example: Test basic processing test_input = "test" result = await self.process_input(test_input) return result is not None except: return False async def _store_result(self, input_data: Dict[str, Any], result: Dict[str, Any]): """Store result in MongoDB (optional).""" if self.mongodb_integration: try: # Primary storage method mongodb_id = await self.mongodb_integration.save_agent_output( self.agent_id, input_data, result, { "storage_type": "template_processing", # CHANGE THIS "agent_version": AGENT_METADATA["version"] } ) self.logger.info(f"✅ Result stored in MongoDB: {mongodb_id}") # Also force store as backup await self.mongodb_integration.force_store_result( self.agent_id, input_data.get("query", "unknown"), result ) self.logger.info("✅ Result force stored as backup") except Exception as e: self.logger.error(f"❌ Failed to store result: {e}") self.failure_count += 1 # Try force storage as fallback try: await self.mongodb_integration.force_store_result( self.agent_id, input_data.get("query", "unknown"), result ) self.logger.info("✅ Result fallback storage successful") except Exception as e2: self.logger.error(f"❌ Result fallback storage failed: {e2}") self.failure_count += 1 async def handle_process(self, message: MCPMessage) -> Dict[str, Any]: """Handle the main process method (REQUIRED - CHANGE THIS).""" try: params = message.params query = params.get("query", "") or params.get("text", "") if not query: return { "status": "error", "message": "No input provided", # CHANGE THIS: Customize error message "agent": self.agent_id, "version": AGENT_METADATA["version"] } # CHANGE THIS: Validate input according to your agent's requirements if len(query) > self.max_input_length: return { "status": "error", "message": f"Input too long (max {self.max_input_length} characters)", "agent": self.agent_id } # CHANGE THIS: Process the input with your agent's logic result = await self.process_input(query) # CHANGE THIS: Format the result according to your agent's output formatted_result = { "status": "success", "result": result, "input": query, "agent": self.agent_id, "timestamp": datetime.now().isoformat(), "version": AGENT_METADATA["version"] } # Store in MongoDB if successful if formatted_result.get("status") == "success": await self._store_result( {"query": query, "type": "template_processing"}, formatted_result ) return formatted_result except Exception as e: self.failure_count += 1 self.logger.error(f"Error in template agent process: {e}") return { "status": "error", "message": f"Processing failed: {str(e)}", "agent": self.agent_id, "failure_count": self.failure_count } async def process_input(self, input_text: str) -> Any: """Process input according to agent logic (CHANGE THIS).""" # CHANGE THIS: Implement your agent's core processing logic # This is just a template example try: # Example processing: reverse the input text processed = input_text[::-1] return { "original": input_text, "processed": processed, "processing_type": "template_reverse", "length": len(input_text) } except Exception as e: self.logger.error(f"Error processing input: {e}") raise async def handle_info(self, message: MCPMessage) -> Dict[str, Any]: """Handle info request with production metadata (REQUIRED).""" return { "status": "success", "info": self.get_info(), "metadata": AGENT_METADATA, "health": await self.health_check(), "capabilities": [cap.name for cap in self.capabilities], # CHANGE THIS: Add your agent-specific information "configuration": { "max_input_length": self.max_input_length, "processing_timeout": self.processing_timeout }, "agent": self.agent_id } # Agent registration functions for auto-discovery (REQUIRED) def get_agent_metadata(): """Get agent metadata for auto-discovery (REQUIRED).""" return AGENT_METADATA def create_agent(): """Create and return the agent instance (REQUIRED).""" return TemplateAgent() def get_agent_info(): """Get agent information for compatibility (REQUIRED).""" return { "name": AGENT_METADATA["name"], "description": AGENT_METADATA["description"], "version": AGENT_METADATA["version"], "author": AGENT_METADATA["author"], "capabilities": ["template_capability"], # CHANGE THIS "category": AGENT_METADATA["category"] } # Test function (optional but recommended) if __name__ == "__main__": # Test the agent import asyncio async def test_agent(): print(f"🧪 Testing {AGENT_METADATA['name']}") print("=" * 50) try: agent = TemplateAgent() # Test health check health = await agent.health_check() print(f"Health Status: {health['status']}") # Test processing test_inputs = [ "Hello World", "Test input", "Template agent test" ] for test_input in test_inputs: print(f"\n🔍 Testing: {test_input}") message = MCPMessage( id=f"test_{datetime.now().timestamp()}", method="process", params={"query": test_input}, timestamp=datetime.now() ) result = await agent.process_message(message) if result["status"] == "success": print(f"✅ Result: {result['result']}") else: print(f"❌ Error: {result['message']}") print(f"\n✅ {AGENT_METADATA['name']} test completed!") except Exception as e: print(f"❌ Failed to test agent: {e}") asyncio.run(test_agent()) """ INSTRUCTIONS FOR CREATING NEW AGENTS: 1. Copy this template file to your desired location (live/, inactive/, or future/) 2. Rename the file to your agent's name (e.g., my_agent.py) 3. Search for "CHANGE THIS" comments and modify accordingly: - Update AGENT_METADATA with your agent's information - Modify the class name and capabilities - Implement your agent's core processing logic in process_input() - Update health check logic in test_core_functionality() - Customize error messages and validation - Add any agent-specific configuration 4. Test your agent by running: python your_agent.py 5. Place the agent in the appropriate folder: - live/ for active agents - inactive/ for disabled agents - future/ for planned agents 6. The MCP system will automatically discover and load your agent! REQUIRED FUNCTIONS: - get_agent_metadata(): Returns agent metadata for discovery - create_agent(): Creates and returns agent instance - get_agent_info(): Returns agent information for compatibility REQUIRED METHODS: - health_check(): Production health monitoring - handle_process(): Main processing logic - handle_info(): Agent information endpoint OPTIONAL FEATURES: - MongoDB integration for data storage - Custom validation and error handling - Agent-specific configuration - Inter-agent communication capabilities """

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Nisarg-123-web/MCP2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server