import json
from huggingface_hub import get_token
from dotenv import load_dotenv
from openai import AsyncOpenAI
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import Any, List
import asyncio
load_dotenv()
# MODEL_ID = "Qwen/Qwen2.5-VL-7B-Instruct"
MODEL_ID = "gpt-4o"
# System prompt that guides the LLM's behavior and capabilities
# This helps the model understand its role and available tools
SYSTEM_PROMPT = """You are a helpful islamic assistant capable of accessing external functions and engaging in casual chat.
Use the responses only if needed from these function calls to provide accurate and informative answers.
The answers should be natural and hide the fact that you are using tools to access real-time information.
Guide the user about available tools and their capabilities.
Always utilize tools whenever the user asked for islamic information to access real-time information when required.
Engage in a friendly manner to enhance the chat experience.
# Tools
{tools}
# Notes
- Ensure responses are based on the latest information available from function calls.
- Maintain an engaging, supportive, and friendly tone throughout the dialogue.
- Always highlight the potential of available tools to assist users comprehensively."""
try:
# First, try to initialize OpenAI client with OpenAPI key
client = AsyncOpenAI()
except:
# Initialize OpenAI client with HuggingFace inference API
client = AsyncOpenAI(
base_url="https://api-inference.huggingface.co/v1/",
api_key=get_token(),
)
class MCPClient:
"""
A client class for interacting with the MCP (Model Control Protocol) server.
This class manages the connection and communication with the SQLite database through MCP.
"""
def __init__(self, server_params: StdioServerParameters):
"""Initialize the MCP client with server parameters"""
self.server_params = server_params
self.session = None
self._client = None
async def __aenter__(self):
"""Async context manager entry"""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
if self.session:
await self.session.__aexit__(exc_type, exc_val, exc_tb)
if self._client:
await self._client.__aexit__(exc_type, exc_val, exc_tb)
async def connect(self):
"""Establishes connection to MCP server"""
self._client = stdio_client(self.server_params)
self.read, self.write = await self._client.__aenter__()
session = ClientSession(self.read, self.write)
self.session = await session.__aenter__()
await self.session.initialize()
async def get_available_tools(self) -> List[Any]:
"""
Retrieve a list of available tools from the MCP server.
"""
if not self.session:
raise RuntimeError("Not connected to MCP server")
tools = await self.session.list_tools()
_, _, tools_list = tools
_, tools_list = tools_list
return tools_list
def call_tool(self, tool_name: str) -> Any:
"""
Create a callable function for a specific tool.
This allows us to execute database operations through the MCP server.
Args:
tool_name: The name of the tool to create a callable for
Returns:
A callable async function that executes the specified tool
"""
if not self.session:
raise RuntimeError("Not connected to MCP server")
async def callable(*args, **kwargs):
response = await self.session.call_tool(tool_name, arguments=kwargs)
return response.content[0].text
return callable
async def agent_loop(query: str, tools: dict, messages: List[dict] = None):
"""
Main interaction loop that processes user queries using the LLM and available tools.
This function:
1. Sends the user query to the LLM with context about available tools
2. Processes the LLM's response, including any tool calls
3. Returns the final response to the user
Args:
query: User's input question or command
tools: Dictionary of available database tools and their schemas
messages: List of messages to pass to the LLM, defaults to None
"""
messages = (
[
{
"role": "system",
"content": SYSTEM_PROMPT.format(
tools="\n- ".join(
[
f"{t['name']}: {t['schema']['function']['description']}"
for t in tools.values()
]
)
), # Creates System prompt based on available MCP server tools
},
]
if messages is None
else messages # reuse existing messages if provided
)
# add user query to the messages list
messages.append({"role": "user", "content": query})
# Query LLM with the system prompt, user query, and available tools
first_response = await client.chat.completions.create(
model=MODEL_ID,
messages=messages,
tools=([t["schema"] for t in tools.values()] if len(tools) > 0 else None),
max_tokens=4096,
temperature=0.5,
)
# detect how the LLM call was completed:
# tool_calls: if the LLM used a tool
# stop: If the LLM generated a general response, e.g. "Hello, how can I help you today?"
stop_reason = (
"tool_calls"
if first_response.choices[0].message.tool_calls is not None
else first_response.choices[0].finish_reason
)
if stop_reason == "tool_calls":
# Extract tool use details from response
for tool_call in first_response.choices[0].message.tool_calls:
arguments = (
json.loads(tool_call.function.arguments)
if isinstance(tool_call.function.arguments, str)
else tool_call.function.arguments
)
# Call the tool with the arguments using our callable initialized in the tools dict
tool_result = await tools[tool_call.function.name]["callable"](**arguments)
# Add the tool result to the messages list
messages.append(
{
"role": "assistant",
"tool_call_id": tool_call.id,
"name": tool_call.function.name,
"content": json.dumps(tool_result, ensure_ascii=False),
}
)
# Query LLM with the user query and the tool results
new_response = await client.chat.completions.create(
model=MODEL_ID,
messages=messages,
)
elif stop_reason == "stop":
# If the LLM stopped on its own, use the first response
new_response = first_response
else:
raise ValueError(f"Unknown stop reason: {stop_reason}")
# Add the LLM response to the messages list
messages.append(
{"role": "assistant", "content": new_response.choices[0].message.content}
)
# Return the LLM response and messages
return new_response.choices[0].message.content, messages
async def main():
"""
Main function that sets up the MCP server, initializes tools, and runs the interactive loop.
The server is run in a Docker container to ensure isolation and consistency.
"""
# Configure Docker-based MCP server for SQLite
# server_params = StdioServerParameters(
# command="docker",
# args=[
# "run",
# "--rm", # Remove container after exit
# "-i", # Interactive mode
# "-v", # Mount volume
# "mcp-test:/mcp", # Map local volume to container path
# "mcp/sqlite", # Use SQLite MCP image
# "--db-path",
# "/mcp/test.db", # Database file path inside container
# ],
# env=None,
# )
# Configure Local-based MCP server
server_params = StdioServerParameters(
command="mcp",
args=[
"run",
"server.py", # Remove container after exit
],
env=None,
)
# Start MCP client and create interactive session
async with MCPClient(server_params) as mcp_client:
# Get available database tools and prepare them for the LLM
mcp_tools = await mcp_client.get_available_tools()
# Convert MCP tools into a format the LLM can understand and use
tools = {
tool.name: {
"name": tool.name,
"callable": mcp_client.call_tool(
tool.name
), # returns a callable function for the rpc call
"schema": {
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema,
},
},
}
for tool in mcp_tools if tool.name != "list_tables" # Excludes list_tables tool as it has an incorrect schema
}
# Start interactive prompt loop for user queries
messages = None
while True:
try:
# Get user input and check for exit commands
user_input = input("\nEnter your prompt (or 'quit' to exit): ")
if user_input.lower() in ["quit", "exit", "q"]:
break
# Process the prompt and run agent loop
response, messages = await agent_loop(user_input, tools, messages)
print("\nResponse:", response)
# print("\nMessages:", messages)
except KeyboardInterrupt:
print("\nExiting...")
break
except Exception as e:
print(f"\nError occurred: {e}")
if __name__ == "__main__":
asyncio.run(main())