privateGPT MCP Server
by Fujitsu-AI
- agents
- MCP-Client
- Python
import json
import logging
import re
from typing import Any, Dict, Optional
from .messages.send_call_tool import send_call_tool
from .messages.send_tools_list import send_tools_list
def parse_tool_response(response: str) -> Optional[Dict[str, Any]]:
"""Parse tool call from Llama's XML-style format."""
function_regex = r"<function=(\w+)>(.*?)</function>"
match = re.search(function_regex, response)
if match:
function_name, args_string = match.groups()
try:
args = json.loads(args_string)
return {
"id": f"call_{function_name}",
"function": function_name,
"arguments": args,
}
except json.JSONDecodeError as error:
logging.debug(f"Error parsing function arguments: {error}")
return None
async def handle_tool_call(tool_call, conversation_history, server_streams):
"""
Handle a single tool call for both OpenAI and Llama formats.
This function no longer prints directly to stdout. It updates the conversation_history
with the tool call and its response. The calling function can then display the results.
"""
tool_call_id = None
tool_name = "unknown_tool"
raw_arguments = {}
try:
# Handle object-style tool calls from both OpenAI and Ollama
if hasattr(tool_call, "function") or (
isinstance(tool_call, dict) and "function" in tool_call
):
# Get tool name and arguments based on format
if hasattr(tool_call, "function"):
tool_call_id = tool_call.id
tool_name = tool_call.function.name
raw_arguments = tool_call.function.arguments
else:
tool_call_id = tool_call["id"]
tool_name = tool_call["function"]["name"]
raw_arguments = tool_call["function"]["arguments"]
else:
# Parse Llama's XML format from the last message
last_message = conversation_history[-1]["content"]
parsed_tool = parse_tool_response(last_message)
if not parsed_tool:
logging.debug("Unable to parse tool call from message")
return
tool_call_id = parsed_tool["id"]
tool_name = parsed_tool["function"]
raw_arguments = parsed_tool["arguments"]
# Parse the tool arguments
tool_args = (
json.loads(raw_arguments)
if isinstance(raw_arguments, str)
else raw_arguments
)
# Call the tool (no direct print here)
tool_response = {}
for read_stream, write_stream in server_streams:
tools = await fetch_tools(read_stream, write_stream)
server_has_tool = False
for tool in tools:
if tool["name"] == tool_name:
server_has_tool = True
if server_has_tool:
tool_response = await send_call_tool(
tool_name, tool_args, read_stream, write_stream
)
if not tool_response.get("isError"):
break
else:
continue
if tool_response.get("isError"):
logging.debug(
f"Error calling tool '{tool_name}': {tool_response.get('content')}"
)
# Format the tool response
formatted_response = format_tool_response(tool_response.get("content", []))
logging.debug(f"Tool '{tool_name}' Response: {formatted_response}")
if formatted_response == "":
print(f"Warning Tool '{tool_name}' Response: {formatted_response}")
#print(f"Tool '{tool_name}' Response: {formatted_response}")
# Update the conversation history with the tool call
# Add the tool call itself (for OpenAI tracking)
conversation_history.append(
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": tool_call_id,
"type": "function",
"function": {
"name": tool_name,
"arguments": json.dumps(tool_args)
if isinstance(tool_args, dict)
else tool_args,
},
}
],
}
)
# Add the tool response to conversation history
conversation_history.append(
{
"role": "tool",
"name": tool_name,
"content": str(formatted_response),
"tool_call_id": tool_call_id,
}
)
except json.JSONDecodeError:
logging.debug(
f"Error decoding arguments for tool '{tool_name}': {raw_arguments}"
)
except Exception as e:
logging.debug(f"Error handling tool call '{tool_name}': {str(e)}")
def format_tool_response(response_content):
"""Format the response content from a tool."""
if isinstance(response_content, list):
return "\n".join(
item.get("text", "No content")
for item in response_content
if item.get("type") == "text"
)
return str(response_content)
async def fetch_tools(read_stream, write_stream):
"""Fetch tools from the server."""
logging.debug("\nFetching tools for chat mode...")
# get the tools list
tools_response = await send_tools_list(read_stream, write_stream)
tools = tools_response.get("tools", [])
# check if tools are valid
if not isinstance(tools, list) or not all(isinstance(tool, dict) for tool in tools):
logging.debug("Invalid tools format received.")
return None
return tools
def convert_to_openai_tools(tools):
"""Convert tools into OpenAI-compatible function definitions."""
openai_tools = []
for tool in tools:
# TODO. This works around the mcp-remote library putting the input scheme into another objet
if tool.get("inputSchema", {}).get("properties").get("type") == "object":
inputScheme = tool.get("inputSchema", {}).get("properties")
else:
inputScheme = tool.get("inputSchema", {})
entry = {
"type": "function",
"function": {
"name": tool["name"],
"description": tool.get("description", ""),
"parameters": inputScheme
},
}
openai_tools.append(entry)
return openai_tools