Skip to main content
Glama
agent.py6.79 kB
""" MCP Agent implementation for handling Model Context Protocol operations. """ import asyncio import logging from typing import Any, Dict, List, Optional, Callable import uuid from .protocol import MCPProtocol, MCPMessage, MessageType from .config import Config logger = logging.getLogger(__name__) class MCPAgent: """ Main MCP Agent class that handles protocol communications and tool execution. """ def __init__(self, config: Config): """ Initialize the MCP Agent. Args: config: Configuration object """ self.config = config self.protocol = MCPProtocol() self.logger = logging.getLogger(self.__class__.__name__) self.is_running = False self.message_handlers: Dict[str, Callable] = {} self.tools: Dict[str, Callable] = {} # Register default handlers self._register_default_handlers() def _register_default_handlers(self) -> None: """Register default message handlers.""" self.register_handler("ping", self._handle_ping) self.register_handler("list_tools", self._handle_list_tools) self.register_handler("call_tool", self._handle_call_tool) def register_handler(self, method: str, handler: Callable) -> None: """ Register a message handler for a specific method. Args: method: The method name to handle handler: The handler function """ self.message_handlers[method] = handler self.logger.info(f"Registered handler for method: {method}") def register_tool(self, name: str, tool_func: Callable) -> None: """ Register a tool that can be called by the agent. Args: name: The tool name tool_func: The tool function to execute """ self.tools[name] = tool_func self.logger.info(f"Registered tool: {name}") async def start(self) -> None: """Start the MCP Agent.""" self.logger.info("Starting MCP Agent...") self.is_running = True self.logger.info("MCP Agent started successfully") async def stop(self) -> None: """Stop the MCP Agent.""" self.logger.info("Stopping MCP Agent...") self.is_running = False self.logger.info("MCP Agent stopped") async def run(self) -> None: """Main run loop for the agent.""" while self.is_running: try: # Simulate message processing await asyncio.sleep(1) # In a real implementation, this would handle incoming messages except Exception as e: self.logger.error(f"Error in agent run loop: {e}") break async def handle_message(self, raw_message: str) -> Optional[str]: """ Handle an incoming MCP message. Args: raw_message: Raw message string Returns: Optional response message as JSON string """ try: message = self.protocol.parse_message(raw_message) if message.type == MessageType.REQUEST and message.method: return await self._handle_request(message) elif message.type == MessageType.NOTIFICATION and message.method: await self._handle_notification(message) return None else: self.logger.warning(f"Unhandled message type: {message.type}") return None except Exception as e: self.logger.error(f"Error handling message: {e}") # Return error response if it was a request try: message = self.protocol.parse_message(raw_message) if message.type == MessageType.REQUEST and message.id: error_response = self.protocol.create_response( message.id, error={"code": -1, "message": str(e)} ) return self.protocol.serialize_message(error_response) except: pass return None async def _handle_request(self, message: MCPMessage) -> str: """Handle MCP request messages.""" if not message.method or not message.id: raise ValueError("Request must have method and id") handler = self.message_handlers.get(message.method) if not handler: error_response = self.protocol.create_response( message.id, error={"code": -32601, "message": f"Method not found: {message.method}"} ) return self.protocol.serialize_message(error_response) try: result = await handler(message.params or {}) response = self.protocol.create_response(message.id, result=result) return self.protocol.serialize_message(response) except Exception as e: error_response = self.protocol.create_response( message.id, error={"code": -1, "message": str(e)} ) return self.protocol.serialize_message(error_response) async def _handle_notification(self, message: MCPMessage) -> None: """Handle MCP notification messages.""" if not message.method: return handler = self.message_handlers.get(message.method) if handler: try: await handler(message.params or {}) except Exception as e: self.logger.error(f"Error handling notification {message.method}: {e}") async def _handle_ping(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle ping requests.""" return {"pong": True} async def _handle_list_tools(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle list_tools requests.""" tools_list = [{"name": name} for name in self.tools.keys()] return {"tools": tools_list} async def _handle_call_tool(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle call_tool requests.""" tool_name = params.get("name") tool_args = params.get("arguments", {}) if not tool_name: raise ValueError("Tool name is required") tool_func = self.tools.get(tool_name) if not tool_func: raise ValueError(f"Tool not found: {tool_name}") try: result = await tool_func(**tool_args) return {"result": result} except Exception as e: raise ValueError(f"Tool execution failed: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bmaranan75/mcp-shopping-assistant-py'

If you have feedback or need assistance with the MCP directory API, please join our Discord server