MCP Server
by GobinFan
import asyncio
import json
import os
from typing import Optional
from contextlib import AsyncExitStack
import time
from mcp import ClientSession
from mcp.client.sse import sse_client
from openai import AsyncOpenAI
from dotenv import load_dotenv
load_dotenv() # load environment variables from .env
class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.openai = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL"))
async def connect_to_sse_server(self, server_url: str):
"""Connect to an MCP server running with SSE transport"""
# Store the context managers so they stay alive
self._streams_context = sse_client(url=server_url)
streams = await self._streams_context.__aenter__()
self._session_context = ClientSession(*streams)
self.session: ClientSession = await self._session_context.__aenter__()
# Initialize
await self.session.initialize()
# List available tools to verify connection
print("Initialized SSE client...")
print("Listing tools...")
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
async def cleanup(self):
"""Properly clean up the session and streams"""
if self._session_context:
await self._session_context.__aexit__(None, None, None)
if self._streams_context:
await self._streams_context.__aexit__(None, None, None)
async def process_query(self, query: str) -> str:
"""Process a query using OpenAI API and available tools"""
messages = [
{
"role": "user",
"content": query
}
]
response = await self.session.list_tools()
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
} for tool in response.tools]
# Initial OpenAI API call
completion = await self.openai.chat.completions.create(
model=os.getenv("OPENAI_MODEL"),
max_tokens=1000,
messages=messages,
tools=available_tools
)
# Process response and handle tool calls
tool_results = []
final_text = []
assistant_message = completion.choices[0].message
if assistant_message.tool_calls:
for tool_call in assistant_message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# Execute tool call
result = await self.session.call_tool(tool_name, tool_args)
tool_results.append({"call": tool_name, "result": result})
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
# Continue conversation with tool results
messages.extend([
{
"role": "assistant",
"content": None,
"tool_calls": [tool_call]
},
{
"role": "tool",
"tool_call_id": tool_call.id,
"content": result.content[0].text
}
])
# print(f"Tool {tool_name} returned: {result.content[0].text}")
# print("messages", messages)
# Get next response from OpenAI
completion = await self.openai.chat.completions.create(
model=os.getenv("OPENAI_MODEL"),
max_tokens=1000,
messages=messages,
)
if isinstance(completion.choices[0].message.content, (dict, list)):
final_text.append(str(completion.choices[0].message.content))
else:
final_text.append(completion.choices[0].message.content)
else:
if isinstance(assistant_message.content, (dict, list)):
final_text.append(str(assistant_message.content))
else:
final_text.append(assistant_message.content)
return "\n".join(final_text)
async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("\n" + response)
except Exception as e:
print(f"\nError: {str(e)}")
async def main():
if len(sys.argv) < 2:
print("Usage: uv run client.py <URL of SSE MCP server (i.e. http://localhost:8080/sse)>")
sys.exit(1)
client = MCPClient()
try:
await client.connect_to_sse_server(server_url=sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
import sys
asyncio.run(main())
#uv run client.py http://0.0.0.0:8080/sse