Skip to main content
Glama
agent_management_handlers.py17.2 kB
""" Agent management handlers for MCP Memory Server. Handles agent lifecycle, permissions, memory queries, and action logging. """ from typing import Dict, Any from datetime import datetime try: from ..server_config import get_logger from ..error_handler import error_handler except ImportError: # Fallback for standalone usage import logging def get_logger(name: str): return logging.getLogger(name) class MockErrorHandler: def get_error_stats(self): return {"total_errors": 0} error_handler = MockErrorHandler() logger = get_logger("agent-management-handlers") class AgentManagementHandlers: """Handles agent lifecycle and management operations.""" def __init__(self, memory_manager): """Initialize with a memory manager instance.""" self.memory_manager = memory_manager async def handle_initialize_new_agent( self, arguments: Dict[str, Any] ) -> Dict[str, Any]: """Initialize a new agent with enhanced functionality from agent_startup.""" try: # Extract parameters with defaults matching agent_startup prompt agent_id = arguments.get("agent_id") agent_role = arguments.get("agent_role") memory_layers = arguments.get("memory_layers", ["global", "learned"]) policy_version = arguments.get("policy_version", "latest") policy_hash = arguments.get("policy_hash", "") load_policies = arguments.get("load_policies", True) # Auto-generate agent_id if not provided (like agent_startup) if not agent_id: import uuid agent_id = str(uuid.uuid4()) # Auto-generate agent_role if not provided if not agent_role: agent_role = "general" # Convert string memory_layers to list if needed (for compatibility) if isinstance(memory_layers, str): memory_layers = [ layer.strip() for layer in memory_layers.split(",") ] initialization_messages = [] errors = [] # Step 1: Register the agent try: if self.memory_manager: agent_result = await self.memory_manager.register_agent( agent_id=agent_id, agent_role=agent_role, memory_layers=memory_layers ) if agent_result["success"]: initialization_messages.append( f"✅ Agent '{agent_id}' registered successfully" ) else: errors.append( f"❌ Agent registration failed: " f"{agent_result.get('error', 'Unknown error')}" ) else: errors.append("❌ Memory manager not available") except Exception as e: errors.append(f"❌ Agent registration error: {str(e)}") # Step 2: Load and bind policies (if requested) if load_policies: try: from ..policy_processor import PolicyProcessor policy_processor = PolicyProcessor() policy_result = await policy_processor.build_canonical_policy( directory=None, # Use default policy directory policy_version=policy_version ) if policy_result["success"]: initialization_messages.append( f"✅ Policy version '{policy_version}' loaded" ) initialization_messages.append( f"📁 Files processed: " f"{policy_result.get('files_processed', 0)}" ) initialization_messages.append( f"📝 Rules loaded: " f"{policy_result.get('unique_rules', 0)}" ) # Update policy hash if we got one from policy load if policy_result.get('policy_hash') and not policy_hash: policy_hash = policy_result['policy_hash'] else: errors.append( f"❌ Policy loading failed: " f"{policy_result.get('error', 'Unknown error')}" ) except Exception as e: errors.append(f"❌ Policy loading error: {str(e)}") # Step 3: Get system info system_info = "" try: if self.memory_manager: agents_result = await self.memory_manager.list_agents() agent_count = len(agents_result) if agents_result else 0 system_info = ( f"\n📊 System Status: {agent_count} agents active" ) except Exception as e: system_info = ( f"\n⚠️ System Status: Unable to fetch ({str(e)})" ) # Determine overall status if errors: status = "error" status_icon = "❌" status_text = "FAILED" else: status = "success" status_icon = "✅" status_text = "SUCCESS" # Build response content (same format as agent_startup) response_lines = [ f"# Agent Startup {status_text}", "", "## Agent Identity", f"- **Agent ID:** `{agent_id}`", f"- **Role:** `{agent_role}`", f"- **Initialization Time:** {datetime.now().isoformat()}", "", "## Initialization Results" ] # Add success messages if initialization_messages: response_lines.extend(initialization_messages) # Add error messages if errors: response_lines.append("") response_lines.append("### Errors:") response_lines.extend(errors) # Calculate policy hash display policy_hash_display = ( policy_hash[:12] + '...' if policy_hash else 'Not available' ) response_lines.extend([ "", "## Configuration", f"- **Memory Layers:** {', '.join(memory_layers)}", f"- **Policy Version:** `{policy_version}`", f"- **Policy Hash:** `{policy_hash_display}`", "", f"{status_icon} Agent initialization {status_text.lower()}", system_info ]) prompt_content = "\n".join(response_lines) return { "content": [ { "type": "text", "text": prompt_content } ], "metadata": { "agent_id": agent_id, "agent_role": agent_role, "memory_layers": memory_layers, "policy_version": policy_version, "policy_hash": policy_hash, "initialization_success": len(errors) == 0, "errors": errors }, "isError": len(errors) > 0 } except Exception as e: logger.error(f"Error in enhanced initialize_new_agent: {e}") return { "isError": True, "content": [ {"type": "text", "text": f"Error initializing agent: {str(e)}"} ] } async def handle_initialize_development_agent( self, arguments: Dict[str, Any] ) -> Dict[str, Any]: """Initialize a development agent with developer-optimized defaults.""" dev_arguments = { "agent_id": arguments.get("agent_id"), "agent_role": "developer", "memory_layers": ["global", "learned", "agent"], "policy_version": "latest", "load_policies": True } return await self.handle_initialize_new_agent(dev_arguments) async def handle_initialize_testing_agent( self, arguments: Dict[str, Any] ) -> Dict[str, Any]: """Initialize a testing agent with testing-optimized defaults.""" test_arguments = { "agent_id": arguments.get("agent_id"), "agent_role": "tester", "memory_layers": ["global", "learned"], "policy_version": "latest", "load_policies": True } return await self.handle_initialize_new_agent(test_arguments) async def handle_configure_agent_permissions( self, arguments: Dict[str, Any] ) -> Dict[str, Any]: """Configure agent permissions for memory layer access.""" try: agent_id = arguments.get("agent_id") permissions = arguments.get("permissions", {}) if not agent_id: return { "isError": True, "content": [ {"type": "text", "text": "agent_id is required"} ] } # Update agent permissions result = await self.memory_manager.update_agent_permissions( agent_id=agent_id, permissions=permissions ) if result["success"]: return { "content": [ { "type": "text", "text": ( f"✅ Permissions updated for agent '{agent_id}'" f"\nRead access: {permissions.get('can_read', [])}" f"\nWrite access: {permissions.get('can_write', [])}" f"\nAdmin access: {permissions.get('can_admin', [])}" ) } ] } else: return { "isError": True, "content": [ {"type": "text", "text": f"Failed: {result['error']}"} ] } except Exception as e: logger.error(f"Error configuring agent permissions: {e}") return { "isError": True, "content": [ { "type": "text", "text": f"Error configuring permissions: {str(e)}" } ] } async def handle_query_memory_for_agent( self, arguments: Dict[str, Any] ) -> Dict[str, Any]: """Query memory for an agent with permission-based access control.""" try: agent_id = arguments.get("agent_id") query = arguments.get("query") memory_layers = arguments.get("memory_layers", ["global", "learned"]) limit = arguments.get("limit", 10) if not agent_id or not query: return { "isError": True, "content": [ {"type": "text", "text": "agent_id and query are required"} ] } # Check agent permissions for each memory layer allowed_layers = [] for memory_type in memory_layers: has_permission = await self.memory_manager.check_agent_permission( agent_id=agent_id, action="read", memory_type=memory_type ) if has_permission: allowed_layers.append(memory_type) if not allowed_layers: return { "isError": True, "content": [ { "type": "text", "text": ( f"Agent {agent_id} has no read permissions " f"for requested memory layers" ) } ] } # Query memory with allowed layers result = await self.memory_manager.query_memory( query=query, memory_types=allowed_layers, limit=limit, agent_id=agent_id ) if result["success"]: results_text = [] for i, memory_result in enumerate(result["results"], 1): results_text.append( f"**Result {i}** (Score: {memory_result['score']:.3f}, " f"Type: {memory_result['memory_type']})\n" f"{memory_result['content']}\n" ) return { "content": [ { "type": "text", "text": ( f"🔍 Found {len(result['results'])} results " f"for agent '{agent_id}'" f"\nAllowed memory layers: " f"{', '.join(allowed_layers)}\n\n" + "\n".join(results_text) ) } ] } else: return { "isError": True, "content": [ {"type": "text", "text": f"Query failed: {result['error']}"} ] } except Exception as e: logger.error(f"Error querying memory for agent: {e}") return { "isError": True, "content": [ { "type": "text", "text": f"Error querying memory: {str(e)}" } ] } async def handle_store_agent_action( self, arguments: Dict[str, Any] ) -> Dict[str, Any]: """Store an agent action with optional learned memory integration.""" try: agent_id = arguments.get("agent_id") action = arguments.get("action") context = arguments.get("context", {}) outcome = arguments.get("outcome") learn = arguments.get("learn", False) if not agent_id or not action or not outcome: return { "isError": True, "content": [ { "type": "text", "text": "agent_id, action, and outcome are required" } ] } # Log the action result = await self.memory_manager.log_agent_action( agent_id=agent_id, action=action, context=context, outcome=outcome, store_as_learned=learn ) if result["success"]: response_text = ( f"✅ Action logged for agent '{agent_id}'" f"\nAction: {action}" f"\nOutcome: {outcome}" ) if result["stored_as_learned"]: response_text += ( "\n📚 Stored as learned memory for future reference" ) return { "content": [ {"type": "text", "text": response_text} ] } else: return { "isError": True, "content": [ {"type": "text", "text": f"Failed: {result['error']}"} ] } except Exception as e: logger.error(f"Error storing agent action: {e}") return { "isError": True, "content": [ { "type": "text", "text": f"Error storing action: {str(e)}" } ] }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hannesnortje/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server