import asyncio
import os
import json
import sys
from typing import Optional
from contextlib import AsyncExitStack
from openai import OpenAI
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# Load .env file to ensure API Key is protected
load_dotenv()
class MCPClient:
def __init__(self):
"""Initialize MCP client"""
self.exit_stack = AsyncExitStack()
self.openai_api_key = os.getenv("OPENAI_API_KEY") # Read OpenAI API Key
self.base_url = os.getenv("BASE_URL") # Read BASE URL
self.model = os.getenv("MODEL") # Read model
if not self.openai_api_key:
raise ValueError("❌ OpenAI API Key not found. Please set OPENAI_API_KEY in .env file")
self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url) # Create OpenAI client
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
async def connect_to_server(self, server_script_path: str):
"""Connect to MCP server and list available tools"""
is_python = server_script_path.endswith('.py')
is_js = server_script_path.endswith('.js')
if not (is_python or is_js):
raise ValueError("Server script must be either .py or .js file")
command = "python" if is_python else "node"
server_params = StdioServerParameters(
command=command,
args=[server_script_path],
env=None
)
# Start MCP server and establish communication
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# List tools available on MCP server
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server. Available tools:", [tool.name for tool in tools])
async def process_query(self, query: str) -> str:
"""
Process query using LLM and call available MCP tools (Function Calling)
"""
messages = [{"role": "user", "content": query}]
response = await self.session.list_tools()
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
}
} for tool in response.tools]
# print(available_tools)
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=available_tools
)
# Process the returned content
content = response.choices[0]
if content.finish_reason == "tool_calls":
# If tool usage is required, parse the tool
tool_call = content.message.tool_calls[0]
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# Execute the tool
result = await self.session.call_tool(tool_name, tool_args)
print(f"\n\n[Calling tool {tool_name} with args {tool_args}]\n\n")
# Store both the model's tool call data and the tool execution results in messages
messages.append(content.message.model_dump())
messages.append({
"role": "tool",
"content": result.content[0].text,
"tool_call_id": tool_call.id,
})
# Send the results back to the LLM to generate final output
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
)
return response.choices[0].message.content
return content.message.content
async def chat_loop(self):
"""Run interactive chat loop"""
print("\n🤖 MCP client started! Type 'quit' to exit")
while True:
try:
query = input("\nYou: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query) # Send user input to OpenAI API
print(f"\n🤖 OpenAI: {response}")
except Exception as e:
print(f"\n⚠️ Error occurred: {str(e)}")
async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
async def main():
if len(sys.argv) < 2:
print("Usage: python client.py <path_to_server_script>")
sys.exit(1)
client = MCPClient()
try:
await client.connect_to_server(sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())