Skip to main content
Glama

Fused MCP Agents

Official
by fusedio
client.py7.76 kB
import asyncio import json import os from typing import Optional from contextlib import AsyncExitStack from mcp import ClientSession from mcp.client.sse import sse_client from anthropic import Anthropic from dotenv import load_dotenv load_dotenv() # load environment variables from .env class MCPClient: def __init__(self): # Initialize session and client objects self.session: Optional[ClientSession] = None self._session_context = None self._streams_context = None self.anthropic = Anthropic() async def connect_to_sse_server(self, server_url: str): """ Connect to an MCP server running with SSE transport Args: server_url: The URL of the SSE endpoint """ print(f"Connecting to SSE endpoint: {server_url}") try: # Create the SSE client self._streams_context = sse_client(url=server_url) streams = await self._streams_context.__aenter__() self._session_context = ClientSession(*streams) self.session = await self._session_context.__aenter__() # Initialize print("Initializing SSE client...") await self.session.initialize() # List available tools to verify connection print("Connection established, listing tools...") response = await self.session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools]) return True except Exception as e: print(f"Error during initialization: {str(e)}") await self.cleanup() return False async def cleanup(self): """Properly clean up the session and streams""" try: if self._session_context: await self._session_context.__aexit__(None, None, None) if self._streams_context: await self._streams_context.__aexit__(None, None, None) print("Cleaned up connection resources") except Exception as e: print(f"Error during cleanup: {str(e)}") async def process_query(self, query: str) -> str: """Process a query using Claude and available tools""" messages = [{"role": "user", "content": query}] response = await self.session.list_tools() available_tools = [ { "name": tool.name, "description": tool.description, "input_schema": tool.inputSchema, } for tool in response.tools ] print(f"Available tools: {[t['name'] for t in available_tools]}") # Initial Claude API call print("Sending query to Claude...") response = self.anthropic.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1000, messages=messages, tools=available_tools, ) # Track complete conversation final_text = [] # Continue conversation until no more tool calls while True: # Process the most recent response contains_tool_calls = False assistant_message_content = [] # First, collect all tool calls without executing them pending_tool_calls = [] # Process all content blocks from this response for content in response.content: if content.type == "text": print("Received text response from Claude") final_text.append(content.text) assistant_message_content.append(content) elif content.type == "tool_use": contains_tool_calls = True # Store the tool call info for parallel execution pending_tool_calls.append({ "tool_name": content.name, "tool_args": content.input, "tool_id": content.id }) final_text.append(f"[Queueing tool call: {content.name} with args {content.input}]") assistant_message_content.append(content) # If no tool calls, we're done if not contains_tool_calls: break # Execute all tool calls in parallel print(f"Executing {len(pending_tool_calls)} tool calls in parallel...") tool_tasks = [ self.session.call_tool(call["tool_name"], call["tool_args"]) for call in pending_tool_calls ] # Wait for all tool calls to complete tool_results = await asyncio.gather(*tool_tasks) # Process results and build the response message tool_result_content = [] for i, result in enumerate(tool_results): tool_call = pending_tool_calls[i] tool_result_content.append({ "type": "tool_result", "tool_use_id": tool_call["tool_id"], "content": result.content }) # Add to final text for display final_text.append( f"[Tool result for {tool_call['tool_name']}: {result.content}]" ) # Append assistant message with all tool calls messages.append({ "role": "assistant", "content": assistant_message_content }) # Add a single user message with all tool results messages.append({ "role": "user", "content": tool_result_content }) # Get next response from Claude with all tool results print(f"Sending {len(tool_results)} tool results to Claude for follow-up...") response = self.anthropic.messages.create( model="claude-3-7-sonnet-20250219", max_tokens=1000, messages=messages, tools=available_tools, ) return "\n".join(final_text) async def chat_loop(self): """Run an interactive chat loop""" print("\nMCP Client Started!") print("Type your queries or 'quit' to exit.") while True: try: query = input("\nQuery: ").strip() if query.lower() == "quit": break print("Processing query...") response = await self.process_query(query) print("\n" + response) except Exception as e: print(f"\nError: {str(e)}") import traceback traceback.print_exc() async def main(): if len(sys.argv) < 2: print("Usage: python clientsse.py <URL of SSE MCP server>") print( "Example: python clientsse.py https://dev.udf.ai/chat/4a693c58-dbd3-4f08-a07e-2ec305a8bf29/sse" ) sys.exit(1) server_url = sys.argv[1] client = MCPClient() try: success = await client.connect_to_sse_server(server_url=server_url) if success: await client.chat_loop() else: print("Failed to connect. Exiting.") except Exception as e: print(f"Error in main: {str(e)}") import traceback traceback.print_exc() finally: print("Cleaning up client...") await client.cleanup() if __name__ == "__main__": import sys asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fusedio/fused-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server