Skip to main content
Glama
mcp_agent.py20.8 kB
try: import yaml # type: ignore except Exception: # pragma: no cover - optional dependency yaml = None # Import requests at module load so tests can patch MCPAgent.requests import requests # type: ignore import openai import os from typing import List, Dict, Any, Optional class MCPAgent: """Sophisticated agent for interacting with the MCP API with dynamic capability discovery.""" def __init__( self, base_url: str, context_path: str = "model_context.yaml", auto_discover: bool = True, api_prefix: str = "/api/v1", timeout: float = 30.0, openai_api_key: Optional[str] = None ): self.base_url = base_url.rstrip("/") self.api_prefix = api_prefix self.timeout = timeout self.capabilities = {} # Initialize OpenAI client self.openai_client = None if openai_api_key or os.getenv("OPENAI_API_KEY"): try: self.openai_client = openai.OpenAI( api_key=openai_api_key or os.getenv("OPENAI_API_KEY") ) print("✓ OpenAI client initialized successfully") except Exception as e: print(f"⚠ Failed to initialize OpenAI client: {e}") # Try dynamic discovery first, fall back to static context if auto_discover: try: self.capabilities = self._discover_capabilities() print(f"✓ Discovered {len(self.capabilities.get('paths', {}))} API endpoints dynamically") except Exception as e: print(f"⚠ Dynamic discovery failed ({e}), falling back to static context") self.context = self._parse_context(context_path) else: self.context = self._parse_context(context_path) @staticmethod def _parse_context(path: str) -> dict: """Parse the model context YAML file.""" if yaml is not None: try: with open(path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or {} if not isinstance(data, dict): raise ValueError("Model context must be a mapping") return data except FileNotFoundError: return {"api": {"tools": []}} except yaml.YAMLError as exc: # pragma: no cover - difficult to trigger raise ValueError(f"Error parsing {path}: {exc}") from exc # PyYAML not available, use fallback parser return MCPAgent._parse_context_fallback(path) @staticmethod def _parse_context_fallback(path: str) -> dict: """Parse the model context YAML file using a fallback parser.""" context = {"api": {"tools": []}} try: with open(path, "r", encoding="utf-8") as f: tool = None for raw_line in f: line = raw_line.strip() if line.startswith("schema_version:"): # Top level schema version value = line.split(":", 1)[1].strip() try: context["schema_version"] = int(value) except ValueError: context["schema_version"] = value if line.startswith("version:"): context.setdefault("api", {})["version"] = line.split(":", 1)[ 1 ].strip() elif line.startswith("- name:"): name = line.split(":", 1)[1].strip() tool = {"name": name} context["api"]["tools"].append(tool) elif line.startswith("method:") and tool is not None: tool["method"] = line.split(":", 1)[1].strip() elif line.startswith("path:") and tool is not None: tool["path"] = line.split(":", 1)[1].strip() elif line.startswith("description:") and tool is not None: tool["description"] = line.split(":", 1)[1].strip() elif line.startswith("scopes:") and tool is not None: scopes_part = line.split(":", 1)[1].strip() scopes_part = scopes_part.strip("[]") scopes = [ s.strip() for s in scopes_part.split(",") if s.strip() ] tool["scopes"] = scopes if scopes_part else [] except FileNotFoundError: # If the file is not found, return the default context, # which is an empty list of tools. This matches the behavior # of the original PyYAML based parser. pass return context def _discover_capabilities(self) -> dict: """Discover API capabilities through OpenAPI metadata.""" openapi_url = f"{self.base_url}{self.api_prefix}/openapi.json" try: response = requests.get(openapi_url, timeout=self.timeout) response.raise_for_status() openapi_spec = response.json() # Parse OpenAPI spec into our internal format capabilities = { "info": openapi_spec.get("info", {}), "servers": openapi_spec.get("servers", []), "paths": {}, "tools": [] } # Convert OpenAPI paths to our tool format for path, methods in openapi_spec.get("paths", {}).items(): capabilities["paths"][path] = methods for method, details in methods.items(): if method.upper() in ["GET", "POST", "PUT", "DELETE", "PATCH"]: tool = { "name": self._generate_tool_name(method, path, details), "method": method.upper(), "path": path, "description": details.get("summary", details.get("description", "")), "parameters": self._extract_parameters(details), "responses": details.get("responses", {}), "tags": details.get("tags", []) } capabilities["tools"].append(tool) return capabilities except requests.exceptions.RequestException as e: raise RuntimeError(f"Failed to fetch OpenAPI spec from {openapi_url}: {e}") except (KeyError, ValueError) as e: raise RuntimeError(f"Invalid OpenAPI specification: {e}") def _generate_tool_name(self, method: str, path: str, details: dict) -> str: """Generate a descriptive tool name from OpenAPI operation.""" # Use operationId if available if "operationId" in details: return details["operationId"] # Generate name from method and path path_parts = [part for part in path.split("/") if part and not part.startswith("{")] if path_parts: resource = path_parts[-1] else: resource = "unknown" method_map = { "GET": "list" if "{" not in path else "get", "POST": "create", "PUT": "update", "DELETE": "delete", "PATCH": "patch" } action = method_map.get(method.upper(), method.lower()) return f"{action}_{resource}" def _extract_parameters(self, operation_details: dict) -> list: """Extract parameter information from OpenAPI operation.""" parameters = [] # Path parameters for param in operation_details.get("parameters", []): parameters.append({ "name": param.get("name"), "in": param.get("in"), "required": param.get("required", False), "type": param.get("schema", {}).get("type", "string"), "description": param.get("description", "") }) # Request body (for POST/PUT operations) if "requestBody" in operation_details: req_body = operation_details["requestBody"] content = req_body.get("content", {}) if "application/json" in content: schema = content["application/json"].get("schema", {}) parameters.append({ "name": "body", "in": "body", "required": req_body.get("required", False), "schema": schema, "description": req_body.get("description", "Request body") }) return parameters def get_available_tools(self) -> list: """Get list of all available tools/operations.""" if hasattr(self, 'capabilities') and self.capabilities: return self.capabilities.get("tools", []) else: # Fallback to static context api_tools = self.context.get("api", {}).get("tools", []) return api_tools def find_tool(self, tool_name: str) -> dict: """Find a specific tool by name.""" for tool in self.get_available_tools(): if tool.get("name") == tool_name: return tool return None def execute_operation(self, tool_name: str, token: str, **kwargs) -> dict: """Execute any discovered operation by tool name.""" tool = self.find_tool(tool_name) if not tool: raise ValueError(f"Tool '{tool_name}' not found in discovered capabilities") method = tool.get("method", "GET") path = tool.get("path", "") # Replace path parameters for key, value in kwargs.items(): if f"{{{key}}}" in path: path = path.replace(f"{{{key}}}", str(value)) url = self.base_url + path headers = {"Authorization": f"Bearer {token}"} # Prepare request based on method request_kwargs = {"headers": headers} if method in ["POST", "PUT", "PATCH"]: # Extract body data from kwargs body_data = {k: v for k, v in kwargs.items() if f"{{{k}}}" not in tool.get("path", "")} if body_data: request_kwargs["json"] = body_data elif method == "GET": # Extract query parameters query_params = {k: v for k, v in kwargs.items() if f"{{{k}}}" not in tool.get("path", "")} if query_params: request_kwargs["params"] = query_params # Add timeout to request request_kwargs["timeout"] = self.timeout try: response = requests.request(method, url, **request_kwargs) response.raise_for_status() return response.json() except requests.exceptions.Timeout as exc: raise RuntimeError(f"Request timeout after {self.timeout}s: {exc}") except requests.exceptions.HTTPError as exc: raise RuntimeError(f"HTTP error {exc.response.status_code}: {exc.response.reason}") except requests.exceptions.RequestException as exc: raise RuntimeError(f"Request error: {exc}") def list_herd(self, token: str) -> list: """Call the listHerd endpoint and return JSON data.""" tool = next( ( t for t in self.context.get("api", {}).get("tools", []) if t.get("name") == "listHerd" ), None, ) if tool is None: raise ValueError("listHerd tool not found in model context") path = tool.get("path") if not path: raise ValueError("listHerd tool path not found in model context") url = self.base_url + path headers = { "Authorization": f"Bearer {token}", } try: response = requests.get(url, headers=headers, timeout=self.timeout) response.raise_for_status() # Raises HTTPError for 4XX/5XX responses return response.json() except requests.exceptions.Timeout as exc: raise RuntimeError(f"Request timeout after {self.timeout}s: {exc}") except requests.exceptions.HTTPError as exc: raise RuntimeError( f"HTTP error {exc.response.status_code}: {exc.response.reason}" ) except requests.exceptions.RequestException as exc: # This catches other exceptions like ConnectionError, Timeout, etc. raise RuntimeError(f"Request error: {exc}") def chat_with_openai( self, user_message: str, conversation_history: List[Dict[str, str]] = None, system_prompt: str = None, model: str = "gpt-4o-mini", stream: bool = False ) -> Dict[str, Any]: """ Send a message to OpenAI and get a response. Args: user_message: The user's message conversation_history: Previous conversation messages system_prompt: System prompt to guide the AI behavior model: OpenAI model to use Returns: Dict containing the response and updated conversation history """ if not self.openai_client: raise RuntimeError("OpenAI client not initialized. Please provide an API key.") if conversation_history is None: conversation_history = [] # Prepare messages messages = [] # Add system prompt if provided if system_prompt: messages.append({"role": "system", "content": system_prompt}) # Add conversation history messages.extend(conversation_history) # Add current user message messages.append({"role": "user", "content": user_message}) try: response = self.openai_client.chat.completions.create( model=model, messages=messages, temperature=0.7, max_tokens=1500, stream=stream ) if stream: # Return generator for streaming def stream_generator(): full_response = "" for chunk in response: if chunk.choices[0].delta.content: content = chunk.choices[0].delta.content full_response += content yield content # Update conversation history after streaming updated_history = conversation_history.copy() updated_history.append({"role": "user", "content": user_message}) updated_history.append({"role": "assistant", "content": full_response}) return { "stream": stream_generator(), "conversation_history": conversation_history, # Will be updated after streaming "is_streaming": True } else: assistant_response = response.choices[0].message.content # Update conversation history updated_history = conversation_history.copy() updated_history.append({"role": "user", "content": user_message}) updated_history.append({"role": "assistant", "content": assistant_response}) return { "response": assistant_response, "conversation_history": updated_history, "usage": response.usage.model_dump() if response.usage else None, "is_streaming": False } except Exception as e: raise RuntimeError(f"OpenAI API error: {e}") def intelligent_mcp_query( self, user_request: str, token: str, conversation_history: List[Dict[str, str]] = None ) -> Dict[str, Any]: """ Use OpenAI to understand user intent and execute MCP operations accordingly. Args: user_request: Natural language request from user token: Authentication token for MCP API conversation_history: Previous conversation context Returns: Dict containing the response and any MCP operation results """ if not self.openai_client: raise RuntimeError("OpenAI client not initialized. Please provide an API key.") # Get available tools for context available_tools = self.get_available_tools() tools_description = "\n".join([ f"- {tool.get('name', 'Unknown')}: {tool.get('description', 'No description')}" for tool in available_tools ]) system_prompt = f"""You are an intelligent MCP (Model Context Protocol) agent assistant. You have access to the following MCP API tools: {tools_description} Your job is to: 1. Understand the user's natural language request 2. Determine which MCP API tool(s) to use 3. Extract any necessary parameters 4. Provide a helpful response If the user asks for something that requires an MCP API call, respond with a JSON object containing: - "action": "mcp_call" - "tool_name": the name of the tool to use - "parameters": object with the required parameters - "explanation": brief explanation of what you're doing If the user asks a general question or needs clarification, respond normally as a helpful assistant. Current conversation context: The user is interacting with an MCP system that manages herds and other resources.""" # Get OpenAI's interpretation of the request chat_response = self.chat_with_openai( user_message=user_request, conversation_history=conversation_history, system_prompt=system_prompt, model="gpt-4o-mini" ) assistant_response = chat_response["response"] # Try to parse if this is an MCP action request try: import json # Look for JSON in the response if "action" in assistant_response and "mcp_call" in assistant_response: # Extract JSON from response start = assistant_response.find("{") end = assistant_response.rfind("}") + 1 if start != -1 and end != 0: action_data = json.loads(assistant_response[start:end]) if action_data.get("action") == "mcp_call": tool_name = action_data.get("tool_name") parameters = action_data.get("parameters", {}) explanation = action_data.get("explanation", "") # Execute the MCP operation try: mcp_result = self.execute_operation(tool_name, token, **parameters) # Format the response formatted_response = f"{explanation}\n\nResults:\n{json.dumps(mcp_result, indent=2)}" return { "response": formatted_response, "mcp_result": mcp_result, "conversation_history": chat_response["conversation_history"], "action_taken": { "tool": tool_name, "parameters": parameters } } except Exception as e: error_response = f"{explanation}\n\nError executing MCP operation: {str(e)}" return { "response": error_response, "error": str(e), "conversation_history": chat_response["conversation_history"] } except: # If parsing fails, treat as normal conversation pass # Return normal chat response return { "response": assistant_response, "conversation_history": chat_response["conversation_history"] }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DPoitrast/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server