client.py•5.13 kB
import asyncio
import json
from contextlib import AsyncExitStack
from typing import Any, Dict, List, Optional
import nest_asyncio
from dotenv import load_dotenv
from mcp import ClientSession
from mcp.client.sse import sse_client
from openai import AsyncOpenAI
# Apply nest_asyncio to allow nested event loops (needed for Jupyter/IPython)
nest_asyncio.apply()
# Load environment variables
load_dotenv("../.env")
class MCPOpenAIClient:
"""Client for interacting with OpenAI models using MCP tools."""
def __init__(self, model: str = "gpt-4o"):
"""Initialize the OpenAI MCP client.
Args:
model: The OpenAI model to use.
"""
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.openai_client = AsyncOpenAI()
self.model = model
self.stdio: Optional[Any] = None
self.write: Optional[Any] = None
async def connect_to_server(self, url: str = "http://localhost:8050/sse"):
"""Connect to an MCP server using SSE.
Args:
url: URL of the SSE endpoint.
"""
# Connect to the server
sse_transport = await self.exit_stack.enter_async_context(
sse_client(url)
)
self.stdio, self.write = sse_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
# Initialize the connection
await self.session.initialize()
# List available tools
tools_result = await self.session.list_tools()
print("\nConnected to server with tools:")
for tool in tools_result.tools:
print(f" - {tool.name}: {tool.description}")
async def get_mcp_tools(self) -> List[Dict[str, Any]]:
"""Get available tools from the MCP server in OpenAI format.
Returns:
A list of tools in OpenAI format.
"""
tools_result = await self.session.list_tools()
return [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema,
},
}
for tool in tools_result.tools
]
async def process_query(self, query: str) -> str:
"""Process a query using OpenAI and available MCP tools.
Args:
query: The user query.
Returns:
The response from OpenAI.
"""
# Get available tools
tools = await self.get_mcp_tools()
# Initial OpenAI API call
response = await self.openai_client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": query}],
tools=tools,
tool_choice="auto",
)
# Get assistant's response
assistant_message = response.choices[0].message
# Initialize conversation with user query and assistant response
messages = [
{"role": "user", "content": query},
assistant_message,
]
# Handle tool calls if present
if assistant_message.tool_calls:
# Process each tool call
for tool_call in assistant_message.tool_calls:
# Execute tool call
result = await self.session.call_tool(
tool_call.function.name,
arguments=json.loads(tool_call.function.arguments),
)
# Add tool response to conversation
messages.append(
{
"role": "tool",
"tool_call_id": tool_call.id,
"content": result.content[0].text,
}
)
# Get final response from OpenAI with tool results
final_response = await self.openai_client.chat.completions.create(
model=self.model,
messages=messages,
tools=tools,
tool_choice="none", # Don't allow more tool calls
)
return final_response.choices[0].message.content
# No tool calls, just return the direct response
return assistant_message.content
async def cleanup(self):
"""Clean up resources."""
await self.exit_stack.aclose()
await self.openai_client.close()
async def main():
"""Main entry point for the client."""
client = MCPOpenAIClient()
try:
await client.connect_to_server("http://localhost:8050/sse")
# Example: Ask about company vacation policy
query = "What is our company's vacation policy?"
print(f"\nQuery: {query}")
response = await client.process_query(query)
print(f"\nResponse: {response}")
finally:
# Ensure cleanup happens even if there's an error
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())