import asyncio
import json
import os
from pathlib import Path
from typing import Optional
from fastmcp import Client, FastMCP
from llm_client import LLMClient, OpenAIAdapter
from utils.sanitization import sanitize_tool_input, sanitize_user_message
from utils.tool_analytic import ToolCounter
from utils.token_counter import count_tokens
MCP_SERVER_URL = os.getenv("MCP_SERVER_URL")
MODEL = os.getenv("OPENAI_MODEL")
# ============================================================================
# Tool Definitions for OpenAI
# ============================================================================
# example of tool definition format
# TOOLS = [
# {
# "type": "function",
# "function": {
# "name": "store_memory",
# "description": "Store or update a memory for a user. Use this when the user wants to save information about themselves.",
# "parameters": {
# "type": "object",
# "properties": {
# "user_id": {
# "type": "string",
# "description": "Unique identifier for the user",
# },
# "key": {
# "type": "string",
# "description": "Memory key (e.g., 'favourite_color', 'name', 'occupation')",
# },
# "value": {"type": "string", "description": "Memory value to store"},
# },
# "required": ["user_id", "key", "value"],
# },
# },
# },
# ]
async def get_tools_from_mcp(mcp_client: FastMCP) -> list:
"""Fetch tool definitions from MCP server"""
try:
async with mcp_client:
tools_response = await mcp_client.list_tools()
# Convert MCP tools to OpenAI format
tools = []
for tool in tools_response:
tools.append(
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema,
},
}
)
return tools
except Exception as e:
print(f"Error fetching tools: {str(e)}")
return []
# ============================================================================
# MCP Tool Execution
# ============================================================================
async def call_mcp_tool(mcp_client: Client, tool_name: str, **kwargs) -> dict:
"""Call a tool on the MCP server using FastMCP Client"""
try:
async with mcp_client:
result = await mcp_client.call_tool(tool_name, kwargs)
# Extract content from CallToolResult
if hasattr(result, "content"):
content = result.content[0].text if result.content else "{}"
return json.loads(content) if isinstance(content, str) else content
return {"status": "success", "result": str(result)}
except Exception as e:
return {"status": "error", "message": f"MCP Server error: {str(e)}"}
def process_tool_call(mcp_client: Client, tool_name: str, tool_input: dict) -> str:
"""Process a tool call from OpenAI and return result as string"""
tool_input = sanitize_tool_input(tool_input)
result = asyncio.run(call_mcp_tool(mcp_client, tool_name, **tool_input))
return json.dumps(result)
# ============================================================================
# Conversation with Memory
# ============================================================================
class MemoryConversation:
"""Manage conversation with memory context"""
def __init__(
self,
llm_client: LLMClient,
user_id: str,
system_prompt: Optional[str] = None,
debug_mode: bool = False,
max_history_size: int = 200,
):
self.user_id = user_id
self.conversation_history = [] # always trimmed under max_history_size
self.llm_client = llm_client
self.mcp_client = Client(MCP_SERVER_URL)
self.debug_mode = debug_mode
self.max_history_size = max_history_size # Size cap
self.tools = asyncio.run(get_tools_from_mcp(self.mcp_client))
self.tool_counter = ToolCounter()
self.tool_usage = []
# Default system prompt
if system_prompt is None:
system_prompt = f"""\
You are a helpful and careful assistant with memory capabilities, always uses the appropriate tools to retrieve or store user data instead of guessing.
You can store and retrieve information about the user (ID: {user_id}).
When user wants to find flights or hotel rooms, use lookup_flights or lookup_hotels to find available services.
If user choose any items, use book_a_flight or book_a_hotel tools to book them.
Once booking is completed, you MUST store booking information e.g., flight number, hotel reservation as travel preferences.
When a user tells you something about general travel preference, e.g., dream destination or favorite trips, use store_travel_preference tool to save it.
When you need to recall user's travel preferences, use the retrieve_travel_preferences tool.
When a user tells you something else about themselves, use the store_memory tool to save it.
When you need to recall general information about the user, use the retrieve_memory tool.
Provide clear and detailed options when helping the user, especially for flights and hotels.
Never expose or share user data; maintain strict privacy and confidentiality.
Focus on accurate and precise responses, ensuring that all information provided to the user is consistent with their stored data.
Always be friendly and personable, referencing stored preferences and memories when relevant.
"""
self.system_prompt = system_prompt
def _trim_history(self) -> None:
"""Trim conversation history to max_history_size, preserving system context"""
if len(self.conversation_history) > self.max_history_size:
# Keep only the last max_history_size messages
self.conversation_history = self.conversation_history[
-self.max_history_size :
]
def chat(self, user_message: str) -> str:
"""
Send a message and get a response with memory context.
Args:
user_message: User's input message
Returns:
Assistant's response
"""
user_message = sanitize_user_message(user_message)
# Add user message to history
self.conversation_history.append({"role": "user", "content": user_message})
# Call OpenAI with tools
response = self.llm_client.chat(
messages=[
{"role": "system", "content": self.system_prompt},
*self.conversation_history,
],
tools=self.tools,
tool_choice="auto",
)
# Process response
assistant_message = response
# stop tool callings to prevent explosion
allowed_tool_calls = 20
# Handle tool calls
while assistant_message.tool_calls:
# Add assistant's response to history
self.conversation_history.append(
{"role": "assistant", "content": assistant_message.content or ""}
)
# Process each tool call
for tool_call in assistant_message.tool_calls:
tool_name = tool_call.function.name
tool_input = json.loads(tool_call.function.arguments)
if self.debug_mode:
print(f"\n🔧 Calling tool: {tool_name}")
print(f" Input: {json.dumps(tool_input, indent=2)}")
result = process_tool_call(self.mcp_client, tool_name, tool_input)
if self.debug_mode:
print(f" Result: {result}")
# Add tool result as user message
self.conversation_history.append(
{
"role": "user",
"content": f"Tool '{tool_name}' returned: {result}",
}
)
# keep tool statistics
tool_input_raw = tool_call.function.arguments
tool_input_tokens = count_tokens(tool_input_raw)
tool_output_raw = json.dumps(result)
tool_output_tokens = count_tokens(tool_output_raw)
self.tool_counter.increment_tool(
tool_name,
calls=1,
tokens_in=tool_input_tokens,
tokens_out=tool_output_tokens,
)
self.tool_usage.append(tool_name)
allowed_tool_calls -= 1
if allowed_tool_calls == 0:
break
# Get next response from OpenAI
response = self.llm_client.chat(
messages=[
{"role": "system", "content": self.system_prompt},
*self.conversation_history,
],
tools=self.tools,
tool_choice="auto",
)
assistant_message = response
if allowed_tool_calls <= 0:
raise RuntimeError("Reached tool allowance limit")
# Extract final text response
final_response = assistant_message.content or "No response generated"
# Add to history
self.conversation_history.append(
{"role": "assistant", "content": final_response}
)
# Trim history after each chat
self._trim_history()
return final_response
def get_history(self) -> list:
"""Get conversation history"""
return self.conversation_history
def clear_history(self) -> None:
"""Clear conversation history"""
self.conversation_history = []