MCP Server

import asyncio import json import os from typing import Optional from contextlib import AsyncExitStack import time from mcp import ClientSession from mcp.client.sse import sse_client from openai import AsyncOpenAI from dotenv import load_dotenv load_dotenv() # load environment variables from .env class MCPClient: def __init__(self): # Initialize session and client objects self.session: Optional[ClientSession] = None self.exit_stack = AsyncExitStack() self.openai = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL")) async def connect_to_sse_server(self, server_url: str): """Connect to an MCP server running with SSE transport""" # Store the context managers so they stay alive self._streams_context = sse_client(url=server_url) streams = await self._streams_context.__aenter__() self._session_context = ClientSession(*streams) self.session: ClientSession = await self._session_context.__aenter__() # Initialize await self.session.initialize() # List available tools to verify connection print("Initialized SSE client...") print("Listing tools...") response = await self.session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools]) async def cleanup(self): """Properly clean up the session and streams""" if self._session_context: await self._session_context.__aexit__(None, None, None) if self._streams_context: await self._streams_context.__aexit__(None, None, None) async def process_query(self, query: str) -> str: """Process a query using OpenAI API and available tools""" messages = [ { "role": "user", "content": query } ] response = await self.session.list_tools() available_tools = [{ "type": "function", "function": { "name": tool.name, "description": tool.description, "parameters": tool.inputSchema } } for tool in response.tools] # Initial OpenAI API call completion = await self.openai.chat.completions.create( model=os.getenv("OPENAI_MODEL"), max_tokens=1000, messages=messages, tools=available_tools ) # Process response and handle tool calls tool_results = [] final_text = [] assistant_message = completion.choices[0].message if assistant_message.tool_calls: for tool_call in assistant_message.tool_calls: tool_name = tool_call.function.name tool_args = json.loads(tool_call.function.arguments) # Execute tool call result = await self.session.call_tool(tool_name, tool_args) tool_results.append({"call": tool_name, "result": result}) final_text.append(f"[Calling tool {tool_name} with args {tool_args}]") # Continue conversation with tool results messages.extend([ { "role": "assistant", "content": None, "tool_calls": [tool_call] }, { "role": "tool", "tool_call_id": tool_call.id, "content": result.content[0].text } ]) # print(f"Tool {tool_name} returned: {result.content[0].text}") # print("messages", messages) # Get next response from OpenAI completion = await self.openai.chat.completions.create( model=os.getenv("OPENAI_MODEL"), max_tokens=1000, messages=messages, ) if isinstance(completion.choices[0].message.content, (dict, list)): final_text.append(str(completion.choices[0].message.content)) else: final_text.append(completion.choices[0].message.content) else: if isinstance(assistant_message.content, (dict, list)): final_text.append(str(assistant_message.content)) else: final_text.append(assistant_message.content) return "\n".join(final_text) async def chat_loop(self): """Run an interactive chat loop""" print("\nMCP Client Started!") print("Type your queries or 'quit' to exit.") while True: try: query = input("\nQuery: ").strip() if query.lower() == 'quit': break response = await self.process_query(query) print("\n" + response) except Exception as e: print(f"\nError: {str(e)}") async def main(): if len(sys.argv) < 2: print("Usage: uv run client.py <URL of SSE MCP server (i.e. http://localhost:8080/sse)>") sys.exit(1) client = MCPClient() try: await client.connect_to_sse_server(server_url=sys.argv[1]) await client.chat_loop() finally: await client.cleanup() if __name__ == "__main__": import sys asyncio.run(main()) #uv run client.py http://0.0.0.0:8080/sse