Skip to main content
Glama
mcp_connector.py9.95 kB
import asyncio import json from typing import List, Dict, Tuple, Optional from openai import OpenAI from fastmcp import Client import httpx import functools class MCPConnector: """ MCP Client implementation for connecting to MCP servers via SSE. Handles tool discovery, execution, and LLM integration. """ def __init__(self, server_url: str, openai_api_key: str): self.server_url = server_url.rstrip('/') self.sse_url = f"{self.server_url}/sse" self.openai_client = OpenAI(api_key=openai_api_key) self.client: Optional[Client] = None self.available_tools = [] self.conversation_history = [] self._connected = False async def connect(self) -> bool: try: print(f"Attempting to connect to: {self.sse_url}") self.client = Client(self.sse_url) await asyncio.wait_for(self.client.__aenter__(), timeout=10.0) print("Successfully connected to MCP server!") self._connected = True return True except asyncio.TimeoutError: print("Connection timeout after 10 seconds") self._connected = False return False except Exception as e: print(f"Error connecting to MCP server: {e}") import traceback; traceback.print_exc() self._connected = False return False def check_connection(self) -> bool: try: print(f"Testing connection to: {self.sse_url}") with httpx.Client(timeout=5.0) as client: with client.stream('GET', self.sse_url) as response: print(f"Response status: {response.status_code}") return response.status_code == 200 except Exception as e: print(f"Connection error: {e}") import traceback; traceback.print_exc() return False async def fetch_tools(self) -> Optional[List[Dict]]: try: if not self._connected or not self.client: print("Connecting to MCP server...") await self.connect() if not self.client: return None print("Listing tools from MCP server...") tools_response = await asyncio.wait_for(self.client.list_tools(), timeout=10.0) self.available_tools = [] for tool in tools_response: input_schema = getattr(tool, 'inputSchema', getattr(tool, 'input_schema', {})) tool_dict = { "name": tool.name, "description": getattr(tool, 'description', '') or "", "parameters": { "type": "object", "properties": input_schema.get("properties", {}) if isinstance(input_schema, dict) else {}, "required": input_schema.get("required", []) if isinstance(input_schema, dict) else [] } } self.available_tools.append(tool_dict) print(f" - {tool.name}") return self.available_tools except Exception as e: print(f"Error fetching tools: {e}") import traceback; traceback.print_exc() return None async def execute_tool(self, tool_name: str, arguments: Dict) -> Dict: """ Execute a tool on the MCP server asynchronously using a fresh connection. Returns: Tool execution result as dict. """ try: print(f"[DEBUG] Starting execute_tool for {tool_name}") print(f"[DEBUG] Calling tool '{tool_name}' with arguments: {arguments}") # Create a fresh client for this tool call to avoid SSE re-entrancy issues temp_client = None try: print("[DEBUG] Creating temporary client for tool execution...") temp_client = Client(self.sse_url) await asyncio.wait_for(temp_client.__aenter__(), timeout=5.0) print("[DEBUG] About to call tool with temporary client...") result = await asyncio.wait_for( temp_client.call_tool(tool_name, arguments), timeout=30.0 ) print(f"[DEBUG] Tool call completed: {result}") if hasattr(result, "content") and result.content: print(f"[DEBUG] Has content attribute: {result.content[0].text[:100] if len(str(result.content[0].text)) > 100 else result.content[0].text}") return {"result": result.content[0].text} else: print(f"[DEBUG] No content attribute, returning string: {str(result)[:100]}") return {"result": str(result)} except asyncio.TimeoutError: print(f"[ERROR] Tool execution timed out after 30 seconds: {tool_name}") return {"error": f"Tool execution timed out after 30 seconds"} finally: # Clean up temporary client if temp_client: try: await temp_client.__aexit__(None, None, None) except: pass except Exception as e: print(f"[ERROR] Tool execution error: {e}") import traceback traceback.print_exc() return {"error": str(e)} def convert_tools_to_openai_format(self) -> List[Dict]: openai_tools = [] for tool in self.available_tools: openai_tools.append({ "type": "function", "function": { "name": tool["name"], "description": tool.get("description", ""), "parameters": { "type": "object", "properties": tool.get("parameters", {}).get("properties", {}), "required": tool.get("parameters", {}).get("required", []) } } }) return openai_tools async def chat_async(self, user_message: str, model: str = "gpt-4-turbo-preview") -> Tuple[str, List[Dict]]: self.conversation_history.append({"role": "user", "content": user_message}) if not self.available_tools: print("Fetching tools before chat...") await self.fetch_tools() openai_tools = self.convert_tools_to_openai_format() try: # Use asyncio.to_thread to call synchronous create() safely response = await asyncio.to_thread( self.openai_client.chat.completions.create, model=model, messages=self.conversation_history, tools=openai_tools, tool_choice="auto" ) assistant_message = response.choices[0].message tool_calls_made = [] if getattr(assistant_message, "tool_calls", None): print(f"AI wants to call {len(assistant_message.tool_calls)} tools") self.conversation_history.append({ "role": "assistant", "content": getattr(assistant_message, "content", ""), "tool_calls": [ {"id": tc.id, "type": "function", "function": {"name": tc.function.name, "arguments": tc.function.arguments}} for tc in assistant_message.tool_calls ] }) for tool_call in assistant_message.tool_calls: tool_name = tool_call.function.name tool_args = json.loads(tool_call.function.arguments) print(f"Executing tool: {tool_name} with args: {tool_args}") tool_calls_made.append({"name": tool_name, "arguments": tool_args}) result = await self.execute_tool(tool_name, tool_args) self.conversation_history.append({ "role": "tool", "tool_call_id": tool_call.id, "content": json.dumps(result) }) # Get final AI response with tool results final_response = await asyncio.to_thread( self.openai_client.chat.completions.create, model=model, messages=self.conversation_history ) final_message = final_response.choices[0].message.content self.conversation_history.append({"role": "assistant", "content": final_message}) return final_message, tool_calls_made else: self.conversation_history.append({"role": "assistant", "content": assistant_message.content}) return assistant_message.content, [] except Exception as e: error_msg = f"Error communicating with AI: {e}" print(error_msg) return error_msg, [] def chat(self, user_message: str, model: str = "gpt-4-turbo-preview") -> Tuple[str, List[Dict]]: try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete(self.chat_async(user_message, model)) def clear_history(self): self.conversation_history = [] def get_tool_count(self) -> int: return len(self.available_tools) def get_tool_names(self) -> List[str]: return [tool["name"] for tool in self.available_tools] async def disconnect(self): if self.client: await self.client.__aexit__(None, None, None) self._connected = False self.client = None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/chakorabdellatif/filesystem-mcp-with-FastMCP-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server