Fused MCP Agents

Official
import asyncio import json from mcp import ClientSession from mcp.client.sse import sse_client from anthropic import Anthropic from anthropic.types import MessageParam, ToolParam from dotenv import load_dotenv # NOTE this needs a ANTHROPIC_API_KEY in .env to work load_dotenv() # load environment variables from .env CLAUDE_MODEL = "claude-3-7-sonnet-20250219" class MCPClient: def __init__(self): # Initialize session and client objects self.session: ClientSession | None self._session_context = None self._streams_context = None self.anthropic = Anthropic() async def connect_to_sse_server(self, server_url: str): """ Connect to an MCP server running with SSE transport Args: server_url: The URL of the SSE endpoint """ print(f"Connecting to SSE endpoint: {server_url}") try: # Create the SSE client self._streams_context = sse_client(url=server_url) streams = await self._streams_context.__aenter__() self._session_context = ClientSession(*streams) self.session = await self._session_context.__aenter__() # Initialize print("Initializing SSE client...") await self.session.initialize() # List available tools to verify connection print("Connection established, listing tools...") response = await self.session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools]) return True except Exception as e: print(f"Error during initialization: {str(e)}") await self.cleanup() return False async def cleanup(self): """Properly clean up the session and streams""" try: if self._session_context: await self._session_context.__aexit__(None, None, None) if self._streams_context: await self._streams_context.__aexit__(None, None, None) print("Cleaned up connection resources") except Exception as e: print(f"Error during cleanup: {str(e)}") async def process_query(self, query: str) -> str: """Process a query using Claude and available tools""" if not self.session: raise ValueError("Session not initialized") messages = [MessageParam(role="user", content=query)] response = await self.session.list_tools() available_tools = [ ToolParam( name=tool.name, description=tool.description or "", input_schema=tool.inputSchema, ) for tool in response.tools ] for t in available_tools: print(f"Available tool: {t['name']}") print( f"Available tool description: {t.get('description', 'No description')}" ) print(f"Available input_schema: {t['input_schema']}") # Initial Claude API call print("Sending query to Claude...") response = self.anthropic.messages.create( model=CLAUDE_MODEL, max_tokens=1000, messages=messages, tools=available_tools, ) # Process response and handle tool calls tool_results = [] final_text = [] assistant_message_content = [] for content in response.content: if content.type == "text": print("Received text response from Claude") final_text.append(content.text) assistant_message_content.append(content) elif content.type == "tool_use": tool_name = content.name tool_args = content.input print(f"Claude is calling tool: {tool_name} with args: {tool_args}") # Execute tool call result = await self.session.call_tool(tool_name, tool_args) tool_results.append({"call": tool_name, "result": result}) final_text.append(f"[Calling tool {tool_name} with args {tool_args}]") # Continue conversation with tool results assistant_message_content.append(content) messages.append( {"role": "assistant", "content": assistant_message_content} ) messages.append( MessageParam( role="user", content=json.dumps( [ { "type": "tool_result", "tool_use_id": content.id, "content": result.content, } ] ), ) ) # Get next response from Claude print("Sending tool results to Claude for follow-up...") response = self.anthropic.messages.create( model=CLAUDE_MODEL, max_tokens=1000, messages=messages, tools=available_tools, ) final_text.append(response.content[0].text) return "\n".join(final_text) async def chat_loop(self): """Run an interactive chat loop""" print("\nMCP Client Started!") print("Type your queries or 'quit' to exit.") while True: try: query = input("\nQuery: ").strip() if query.lower() == "quit": break print("Processing query...") response = await self.process_query(query) print("\n" + response) except Exception as e: print(f"\nError: {str(e)}") import traceback traceback.print_exc() async def main(): if len(sys.argv) < 2: print("Usage: python client.py <URL of SSE MCP server>") print( "Example: python client.py https://dev.udf.ai/chat/4a693c58-dbd3-4f08-a07e-2ec305a8bf29/sse" ) sys.exit(1) server_url = sys.argv[1] client = MCPClient() try: success = await client.connect_to_sse_server(server_url=server_url) if success: await client.chat_loop() else: print("Failed to connect. Exiting.") except Exception as e: print(f"Error in main: {str(e)}") import traceback traceback.print_exc() finally: print("Cleaning up client...") await client.cleanup() if __name__ == "__main__": import sys asyncio.run(main())