tool_manager.pyโข8.17 kB
"""
Tool Manager Module
Author: Yobie Benjamin
Version: 0.9
Date: August 1, 2025
This module manages all available tools in the MCP server.
It handles tool registration, discovery, validation, and execution.
"""
import asyncio
import importlib
import inspect
from pathlib import Path
from typing import Any, Dict, List, Optional
from mcp.types import Tool
from structlog import get_logger
from ..config import Config
from .base import BaseTool, ToolResult
from .web_search import WebSearchTool
from .file_ops import FileReadTool, FileWriteTool, ListFilesTool
from .calculator import CalculatorTool
from .datetime_tool import DateTimeTool
from .code_executor import CodeExecutorTool
from .http_client import HttpRequestTool
from .json_tool import JsonTool
logger = get_logger(__name__)
class ToolManager:
"""
Central manager for all MCP tools.
This class:
- Registers and manages available tools
- Validates tool parameters
- Executes tools safely
- Provides tool discovery for MCP protocol
"""
def __init__(self, config: Config):
"""
Initialize the tool manager.
Args:
config: Configuration object
"""
self.config = config
self.logger = logger.bind(component="tool_manager")
self.tools: Dict[str, BaseTool] = {}
self.logger.info("Tool manager initialized")
async def initialize(self):
"""
Initialize all tools.
This method:
1. Registers built-in tools
2. Loads custom tools
3. Validates tool configurations
"""
self.logger.info("Initializing tools...")
# Register built-in tools
await self._register_builtin_tools()
# Load custom tools if configured
await self._load_custom_tools()
self.logger.info(
f"Tools initialized",
tool_count=len(self.tools),
tools=list(self.tools.keys())
)
async def _register_builtin_tools(self):
"""
Register all built-in tools.
Tools are conditionally registered based on configuration.
"""
# Always available tools
tools_to_register = [
CalculatorTool(),
DateTimeTool(),
JsonTool(),
]
# Conditionally available tools
if self.config.enable_web_search:
tools_to_register.append(WebSearchTool())
if self.config.filesystem_base_path:
tools_to_register.extend([
FileReadTool(self.config),
ListFilesTool(self.config),
])
if self.config.allow_file_writes:
tools_to_register.append(FileWriteTool(self.config))
if self.config.enable_code_execution:
tools_to_register.append(CodeExecutorTool(self.config))
tools_to_register.append(HttpRequestTool())
# Register each tool
for tool in tools_to_register:
self.register_tool(tool)
self.logger.debug(f"Registered tool: {tool.name}")
async def _load_custom_tools(self):
"""
Load custom tools from plugins directory.
Users can add custom tools by placing them in the tools/custom directory.
"""
custom_dir = Path(__file__).parent / "custom"
if not custom_dir.exists():
return
self.logger.debug(f"Loading custom tools from {custom_dir}")
for tool_file in custom_dir.glob("*.py"):
if tool_file.name.startswith("_"):
continue
try:
# Import the module
module_name = f".custom.{tool_file.stem}"
module = importlib.import_module(module_name, package="llama4_maverick_mcp.tools")
# Find tool classes
for name, obj in inspect.getmembers(module):
if (inspect.isclass(obj) and
issubclass(obj, BaseTool) and
obj != BaseTool):
tool = obj()
self.register_tool(tool)
self.logger.info(f"Loaded custom tool: {tool.name}")
except Exception as e:
self.logger.error(
f"Failed to load custom tool from {tool_file}",
error=str(e)
)
def register_tool(self, tool: BaseTool):
"""
Register a tool.
Args:
tool: Tool instance to register
"""
if tool.name in self.tools:
self.logger.warning(
f"Tool {tool.name} already registered, overwriting"
)
self.tools[tool.name] = tool
self.logger.debug(
f"Registered tool",
name=tool.name,
description=tool.description
)
async def get_tools(self) -> List[Tool]:
"""
Get all available tools in MCP format.
Returns:
List of Tool objects for MCP protocol
"""
tools = []
for name, tool in self.tools.items():
try:
schema = tool.get_schema()
# Convert to MCP Tool type
mcp_tool = Tool(
name=schema["name"],
description=schema["description"],
inputSchema=schema["inputSchema"]
)
tools.append(mcp_tool)
except Exception as e:
self.logger.error(
f"Failed to get schema for tool {name}",
error=str(e)
)
return tools
async def execute_tool(
self,
name: str,
arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""
Execute a tool by name.
Args:
name: Tool name
arguments: Tool arguments
Returns:
Tool execution result
Raises:
ValueError: If tool not found
ValidationError: If arguments are invalid
"""
# Find the tool
if name not in self.tools:
raise ValueError(f"Tool '{name}' not found")
tool = self.tools[name]
self.logger.debug(
f"Executing tool",
name=name,
arguments=arguments
)
try:
# Validate parameters
validated_args = tool.validate_params(arguments)
# Execute the tool
result = await tool.execute(**validated_args)
# Log result
self.logger.info(
f"Tool executed successfully",
name=name,
success=result.success
)
return result.to_dict()
except Exception as e:
self.logger.error(
f"Tool execution failed",
name=name,
error=str(e),
exc_info=True
)
# Return error result
return ToolResult(
success=False,
error=str(e),
metadata={"tool": name, "error_type": type(e).__name__}
).to_dict()
async def cleanup(self):
"""
Clean up tool resources.
Calls cleanup on all tools that have it.
"""
self.logger.debug("Cleaning up tools...")
cleanup_tasks = []
for name, tool in self.tools.items():
if hasattr(tool, 'cleanup'):
cleanup_tasks.append(tool.cleanup())
if cleanup_tasks:
await asyncio.gather(*cleanup_tasks, return_exceptions=True)
self.logger.debug("Tool cleanup complete")