client.py•9.3 kB
#!/usr/bin/env python3
"""
Azure OpenAI GPT-4o Client with MCP Server Integration
"""
import json
import asyncio
import aiohttp
import uuid
from typing import Dict, Any, List, Optional
from openai import AsyncAzureOpenAI
import os
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
class MCPClient:
"""Client for communicating with MCP server"""
def __init__(self, base_url: str = "http://localhost:8000"):
self.base_url = base_url
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def initialize(self) -> Dict[str, Any]:
"""Initialize MCP connection"""
request = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "azure-openai-client",
"version": "1.0.0"
}
}
}
return await self._send_request(request)
async def list_tools(self) -> Dict[str, Any]:
"""List available tools from MCP server"""
request = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "tools/list",
"params": {}
}
return await self._send_request(request)
async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Call a tool on the MCP server"""
request = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
}
}
return await self._send_request(request)
async def _send_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Send request to MCP server via SSE"""
async with self.session.post(
f"{self.base_url}/sse",
json=request,
headers={"Content-Type": "application/json"}
) as response:
async for line in response.content:
line = line.decode('utf-8').strip()
if line.startswith('data: '):
data = line[6:] # Remove 'data: ' prefix
try:
result = json.loads(data)
return result
except json.JSONDecodeError:
continue
return {"error": "No response received"}
class GPT4OMCPClient:
"""GPT-4o client with MCP integration"""
def __init__(self):
# Initialize Azure OpenAI client
self.azure_client = AsyncAzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-01"),
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT")
)
self.mcp_client = MCPClient()
self.available_tools = []
async def initialize(self):
"""Initialize MCP connection and get available tools"""
async with self.mcp_client:
await self.mcp_client.initialize()
tools_response = await self.mcp_client.list_tools()
self.available_tools = tools_response.get("result", {}).get("tools", [])
print(f"Available tools: {[tool['name'] for tool in self.available_tools]}")
def _convert_mcp_tools_to_openai_format(self) -> List[Dict[str, Any]]:
"""Convert MCP tools to OpenAI function format"""
openai_tools = []
for tool in self.available_tools:
openai_tool = {
"type": "function",
"function": {
"name": tool["name"],
"description": tool["description"],
"parameters": tool["inputSchema"]
}
}
openai_tools.append(openai_tool)
return openai_tools
async def call_mcp_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""Call MCP tool and return result"""
async with self.mcp_client:
result = await self.mcp_client.call_tool(tool_name, arguments)
if "result" in result and "content" in result["result"]:
content = result["result"]["content"]
if content and len(content) > 0:
return content[0].get("text", "No result")
return json.dumps(result)
async def chat_with_tools(self, message: str, max_iterations: int = 5) -> str:
"""Chat with GPT-4o using MCP tools"""
messages = [
{"role": "system", "content": "You are a helpful assistant with access to various tools. Use them when appropriate to answer user questions."},
{"role": "user", "content": message}
]
openai_tools = self._convert_mcp_tools_to_openai_format()
for iteration in range(max_iterations):
try:
response = await self.azure_client.chat.completions.create(
model=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME", "gpt-4o"),
messages=messages,
tools=openai_tools,
tool_choice="auto"
)
message_response = response.choices[0].message
# Check if the model wants to call a tool
if message_response.tool_calls:
# Add the assistant's response to messages
messages.append({
"role": "assistant",
"content": message_response.content,
"tool_calls": [
{
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
}
for tc in message_response.tool_calls
]
})
# Process each tool call
for tool_call in message_response.tool_calls:
tool_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
print(f"Calling tool: {tool_name} with arguments: {arguments}")
# Call the MCP tool
tool_result = await self.call_mcp_tool(tool_name, arguments)
# Add tool result to messages
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_result
})
# Continue to next iteration to get final response
continue
else:
# No tool calls, return the response
return message_response.content
except Exception as e:
return f"Error: {str(e)}"
return "Maximum iterations reached"
async def main():
"""Main example usage"""
# Check environment variables
required_vars = ["AZURE_OPENAI_API_KEY", "AZURE_OPENAI_ENDPOINT"]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
print(f"Missing required environment variables: {missing_vars}")
print("Please set the following environment variables:")
print("- AZURE_OPENAI_API_KEY")
print("- AZURE_OPENAI_ENDPOINT")
print("- AZURE_OPENAI_DEPLOYMENT_NAME (optional, defaults to 'gpt-4o')")
print("- AZURE_OPENAI_API_VERSION (optional, defaults to '2024-02-01')")
return
client = GPT4OMCPClient()
try:
# Initialize the client
await client.initialize()
# Example queries
test_queries = [
"What's the current time?",
"What's the weather like in New York?",
"Calculate 15 * 42 + 33",
"Can you get the weather for London and then calculate the percentage if the temperature was 20 degrees and now it's 25 degrees?"
]
for query in test_queries:
print(f"\n{'='*50}")
print(f"Query: {query}")
print(f"{'='*50}")
response = await client.chat_with_tools(query)
print(f"Response: {response}")
print()
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
asyncio.run(main())