tools_handler.py•6.68 kB
import json
import logging
import re
from typing import Any, Dict, Optional
from .messages.send_call_tool import send_call_tool
from .messages.send_tools_list import send_tools_list
def parse_tool_response(response: str) -> Optional[Dict[str, Any]]:
    """Parse tool call from Llama's XML-style format."""
    function_regex = r"<function=(\w+)>(.*?)</function>"
    match = re.search(function_regex, response)
    if match:
        function_name, args_string = match.groups()
        try:
            args = json.loads(args_string)
            return {
                "id": f"call_{function_name}",
                "function": function_name,
                "arguments": args,
            }
        except json.JSONDecodeError as error:
            logging.debug(f"Error parsing function arguments: {error}")
    return None
async def handle_tool_call(tool_call, conversation_history, server_streams):
    """
    Handle a single tool call for both OpenAI and Llama formats.
    This function no longer prints directly to stdout. It updates the conversation_history
    with the tool call and its response. The calling function can then display the results.
    """
    tool_call_id = None
    tool_name = "unknown_tool"
    raw_arguments = {}
    try:
        # Handle object-style tool calls from both OpenAI and Ollama
        if hasattr(tool_call, "function") or (
            isinstance(tool_call, dict) and "function" in tool_call
        ):
            # Get tool name and arguments based on format
            if hasattr(tool_call, "function"):
                tool_call_id = tool_call.id
                tool_name = tool_call.function.name
                raw_arguments = tool_call.function.arguments
            else:
                tool_call_id = tool_call["id"]
                tool_name = tool_call["function"]["name"]
                raw_arguments = tool_call["function"]["arguments"]
        else:
            # Parse Llama's XML format from the last message
            last_message = conversation_history[-1]["content"]
            parsed_tool = parse_tool_response(last_message)
            if not parsed_tool:
                logging.debug("Unable to parse tool call from message")
                return
            tool_call_id = parsed_tool["id"]
            tool_name = parsed_tool["function"]
            raw_arguments = parsed_tool["arguments"]
        # Parse the tool arguments
        tool_args = (
            json.loads(raw_arguments)
            if isinstance(raw_arguments, str)
            else raw_arguments
        )
        # Call the tool (no direct print here)
        tool_response = {}
        for read_stream, write_stream in server_streams:
            tools = await fetch_tools(read_stream, write_stream)
            server_has_tool = False
            for tool in tools:
                if tool["name"] == tool_name:
                    server_has_tool = True
            if server_has_tool:
                tool_response = await send_call_tool(
                    tool_name, tool_args, read_stream, write_stream
                )
                if not tool_response.get("isError"):
                    break
            else:
                continue
        if tool_response.get("isError"):
            logging.debug(
                f"Error calling tool '{tool_name}': {tool_response.get('content')}"
            )
        # Format the tool response
        formatted_response = format_tool_response(tool_response.get("content", []))
        logging.debug(f"Tool '{tool_name}' Response: {formatted_response}")
        if formatted_response == "":
            print(f"Warning Tool '{tool_name}' Response: {formatted_response}")
        #print(f"Tool '{tool_name}' Response: {formatted_response}")
        # Update the conversation history with the tool call
        # Add the tool call itself (for OpenAI tracking)
        conversation_history.append(
            {
                "role": "assistant",
                "content": None,
                "tool_calls": [
                    {
                        "id": tool_call_id,
                        "type": "function",
                        "function": {
                            "name": tool_name,
                            "arguments": json.dumps(tool_args)
                            if isinstance(tool_args, dict)
                            else tool_args,
                        },
                    }
                ],
            }
        )
        # Add the tool response to conversation history
        conversation_history.append(
            {
                "role": "tool",
                "name": tool_name,
                "content": str(formatted_response),
                "tool_call_id": tool_call_id,
            }
        )
    except json.JSONDecodeError:
        logging.debug(
            f"Error decoding arguments for tool '{tool_name}': {raw_arguments}"
        )
    except Exception as e:
        logging.debug(f"Error handling tool call '{tool_name}': {str(e)}")
def format_tool_response(response_content):
    """Format the response content from a tool."""
    if isinstance(response_content, list):
        return "\n".join(
            item.get("text", "No content")
            for item in response_content
            if item.get("type") == "text"
        )
    return str(response_content)
async def fetch_tools(read_stream, write_stream):
    """Fetch tools from the server."""
    logging.debug("\nFetching tools for chat mode...")
    # get the tools list
    tools_response = await send_tools_list(read_stream, write_stream)
    tools = tools_response.get("tools", [])
    # check if tools are valid
    if not isinstance(tools, list) or not all(isinstance(tool, dict) for tool in tools):
        logging.debug("Invalid tools format received.")
        return None
    return tools
def convert_to_openai_tools(tools):
    """Convert tools into OpenAI-compatible function definitions."""
    openai_tools = []
    for tool in tools:
        # TODO. This works around the mcp-remote library putting the input scheme into another objet
        if tool.get("inputSchema", {}).get("properties").get("type") == "object":
            inputScheme = tool.get("inputSchema", {}).get("properties")
        else:
            inputScheme = tool.get("inputSchema", {})
        entry = {
                "type": "function",
                "function": {
                    "name": tool["name"],
                    "description": tool.get("description", ""),
                    "parameters": inputScheme
                },
            }
        openai_tools.append(entry)
    return openai_tools