Skip to main content
Glama
base.py60.7 kB
"""Base tool classes and decorators for Ultimate MCP Server.""" import asyncio import functools import inspect import time from typing import Any, Callable, Dict, List, Optional, Type, Union try: from fastmcp import Tool except ImportError: # Handle case where mcp might be available via different import try: from fastmcp import Tool except ImportError: Tool = None # Tool will be provided by the mcp_server from ultimate_mcp_server.exceptions import ( ResourceError, ToolError, ToolExecutionError, ToolInputError, format_error_response, ) # from ultimate_mcp_server.services.cache import with_cache from ultimate_mcp_server.utils import get_logger logger = get_logger("ultimate_mcp_server.tools.base") def tool(name=None, description=None): """ Decorator that marks a BaseTool class method as an MCP tool. This decorator adds metadata to a method, identifying it as a tool that should be registered with the MCP server when the containing BaseTool class is initialized. It allows customizing the tool's name and description, which are used in tool discoverability and documentation. Unlike the register_tool function which directly registers standalone functions, this decorator only marks methods for later registration, allowing BaseTool subclasses to organize multiple related tools together in a single class. The decorator adds three attributes to the method: - _tool: A boolean flag indicating this is a tool method - _tool_name: The name to use when registering the tool (or original method name) - _tool_description: The description to use for the tool (or method docstring) These attributes are used during the tool registration process, typically in the _register_tools method of BaseTool subclasses. Args: name: Custom name for the tool (defaults to the method name if not provided) description: Custom description for the tool (defaults to the method's docstring) Returns: A decorator function that adds tool metadata attributes to the decorated method Example: ```python class MyToolSet(BaseTool): tool_name = "my_toolset" @tool(name="custom_operation", description="Performs a customized operation") async def perform_operation(self, param1: str, param2: int) -> Dict[str, Any]: # Implementation return {"result": "success"} ``` Notes: - This decorator should be used on methods of classes that inherit from BaseTool - Decorated methods should be async - The decorated method must take self as its first parameter - This decorator does not apply error handling or other middleware automatically """ def decorator(func): @functools.wraps(func) async def wrapper(self, *args, **kwargs): return await func(self, *args, **kwargs) wrapper._tool = True wrapper._tool_name = name wrapper._tool_description = description return wrapper return decorator def with_resource(resource_type, allow_creation=False, require_existence=True): """ Decorator for standardizing resource access and validation in tool methods. This decorator provides consistent resource handling for tool methods that access or create persistent resources in the MCP ecosystem. It enforces resource validation rules, handles resource registration, and provides unified error handling for resource-related operations. Core functionalities: 1. Resource existence validation - Ensures resources exist before allowing access 2. Resource creation tracking - Registers newly created resources with the system 3. Resource type validation - Confirms resources match expected types 4. Standardized error handling - Produces consistent error responses for resource issues The decorator identifies resource IDs by looking for common parameter names like '{resource_type}_id', 'id', or 'resource_id' in the function's keyword arguments. When a resource ID is found, it performs the configured validation checks before allowing the function to execute. After execution, it can optionally register newly created resources. Args: resource_type: Type category for the resource (e.g., "document", "embedding", "database"). Used for validation and registration. allow_creation: Whether the tool is allowed to create new resources of this type. When True, the decorator will register any created resources. require_existence: Whether the resource must exist before the tool is called. When True, the decorator will verify resource existence. Returns: A decorator function that applies resource handling to tool methods. Raises: ResourceError: When resource validation fails (e.g., resource not found, resource type mismatch, or unauthorized resource access). Example: ```python class DocumentTools(BaseTool): @tool() @with_resource("document", require_existence=True, allow_creation=False) async def get_document_summary(self, document_id: str): # This method will fail with ResourceError if document_id doesn't exist # Resource existence is checked before this code runs ... @tool() @with_resource("document", require_existence=False, allow_creation=True) async def create_document(self, content: str, metadata: Dict[str, Any] = None): # Process content and create document doc_id = str(uuid.uuid4()) # ... processing logic ... # Return created resource with resource_id key to trigger registration return { "resource_id": doc_id, # This triggers resource registration "status": "created", "metadata": {"content_length": len(content), "created_at": time.time()} } # The resource is automatically registered with the returned metadata ``` Notes: - This decorator should be applied after @tool but before other decorators like @with_error_handling to ensure proper execution order - Resources created with allow_creation=True must include a "resource_id" key in their result dictionary to trigger registration - The resource registry must be accessible via the tool's mcp server instance """ def decorator(func): @functools.wraps(func) async def wrapper(self, *args, **kwargs): # Get resource ID from kwargs (common parameter names) resource_id = None for param_name in [f"{resource_type}_id", "id", "resource_id"]: if param_name in kwargs: resource_id = kwargs[param_name] break # Check if resource exists if required if require_existence and resource_id: # Get resource registry from MCP server resource_registry = getattr(self.mcp, "resources", None) if resource_registry is None: logger.warning( f"Resource registry not available, skipping existence check for {resource_type}/{resource_id}", emoji_key="warning" ) else: # Check if resource exists exists = await resource_registry.exists(resource_type, resource_id) if not exists: raise ResourceError( f"{resource_type.capitalize()} not found: {resource_id}", resource_type=resource_type, resource_id=resource_id ) # Call function result = await func(self, *args, **kwargs) # If the function returns a new resource ID, register it if allow_creation and isinstance(result, dict) and "resource_id" in result: new_resource_id = result["resource_id"] # Get resource registry from MCP server resource_registry = getattr(self.mcp, "resources", None) if resource_registry is not None: # Register new resource metadata = { "created_at": time.time(), "creator": kwargs.get("ctx", {}).get("user_id", "unknown"), "resource_type": resource_type } # Add other metadata from result if available if "metadata" in result: metadata.update(result["metadata"]) await resource_registry.register( resource_type, new_resource_id, metadata=metadata ) logger.info( f"Registered new {resource_type}: {new_resource_id}", emoji_key="resource", resource_type=resource_type, resource_id=new_resource_id ) return result # Add resource metadata to function wrapper._resource_type = resource_type wrapper._allow_creation = allow_creation wrapper._require_existence = require_existence return wrapper return decorator class ResourceRegistry: """ Registry that tracks and manages resources used by MCP tools. The ResourceRegistry provides a centralized system for tracking resources created or accessed by tools within the MCP ecosystem. It maintains resource metadata, handles persistence of resource information, and provides methods for registering, looking up, and deleting resources. Resources in the MCP ecosystem represent persistent or semi-persistent objects that may be accessed across multiple tool calls or sessions. Examples include documents, knowledge bases, embeddings, file paths, and database connections. The registry helps manage the lifecycle of these resources and prevents issues like resource leaks or unauthorized access. Key features: - In-memory caching of resource metadata for fast lookups - Optional persistent storage via pluggable storage backends - Resource type categorization (documents, embeddings, etc.) - Resource existence checking for access control - Simple CRUD operations for resource metadata Resources are organized by type and identified by unique IDs within those types. Each resource has associated metadata that can include creation time, owner information, and resource-specific attributes. The registry is typically initialized by the MCP server and made available to all tools. Tools that create resources should register them, and tools that access resources should verify their existence before proceeding. """ def __init__(self, storage_backend=None): """Initialize the resource registry. Args: storage_backend: Backend for persistent storage (if None, in-memory only) """ self.resources = {} self.storage = storage_backend self.logger = get_logger("ultimate_mcp_server.resources") async def register(self, resource_type, resource_id, metadata=None): """Register a resource in the registry. Args: resource_type: Type of resource (e.g., "document", "embedding") resource_id: Resource identifier metadata: Additional metadata about the resource Returns: True if registration was successful """ # Initialize resource type if not exists if resource_type not in self.resources: self.resources[resource_type] = {} # Register resource self.resources[resource_type][resource_id] = { "id": resource_id, "type": resource_type, "metadata": metadata or {}, "registered_at": time.time() } # Persist to storage backend if available if self.storage: try: await self.storage.save_resource( resource_type, resource_id, self.resources[resource_type][resource_id] ) except Exception as e: self.logger.error( f"Failed to persist resource {resource_type}/{resource_id}: {str(e)}", emoji_key="error", exc_info=True ) return True async def exists(self, resource_type, resource_id): """Check if a resource exists in the registry. Args: resource_type: Type of resource resource_id: Resource identifier Returns: True if the resource exists """ # Check in-memory registry first if resource_type in self.resources and resource_id in self.resources[resource_type]: return True # Check storage backend if available if self.storage: try: return await self.storage.resource_exists(resource_type, resource_id) except Exception as e: self.logger.error( f"Failed to check resource existence {resource_type}/{resource_id}: {str(e)}", emoji_key="error", exc_info=True ) return False async def get(self, resource_type, resource_id): """Get resource metadata from the registry. Args: resource_type: Type of resource resource_id: Resource identifier Returns: Resource metadata or None if not found """ # Check in-memory registry first if resource_type in self.resources and resource_id in self.resources[resource_type]: return self.resources[resource_type][resource_id] # Check storage backend if available if self.storage: try: resource = await self.storage.get_resource(resource_type, resource_id) if resource: # Cache in memory for future access if resource_type not in self.resources: self.resources[resource_type] = {} self.resources[resource_type][resource_id] = resource return resource except Exception as e: self.logger.error( f"Failed to get resource {resource_type}/{resource_id}: {str(e)}", emoji_key="error", exc_info=True ) return None async def list(self, resource_type, limit=100, offset=0, filters=None): """List resources of a specific type. Args: resource_type: Type of resource to list limit: Maximum number of resources to return offset: Offset for pagination filters: Dictionary of filters to apply Returns: List of resource metadata """ result = [] # Get from storage backend first if available if self.storage: try: resources = await self.storage.list_resources( resource_type, limit=limit, offset=offset, filters=filters ) # Cache in memory for future access if resources: if resource_type not in self.resources: self.resources[resource_type] = {} for resource in resources: resource_id = resource.get("id") if resource_id: self.resources[resource_type][resource_id] = resource return resources except Exception as e: self.logger.error( f"Failed to list resources of type {resource_type}: {str(e)}", emoji_key="error", exc_info=True ) # Fallback to in-memory registry if resource_type in self.resources: # Apply filters if provided filtered_resources = self.resources[resource_type].values() if filters: for key, value in filters.items(): filtered_resources = [ r for r in filtered_resources if r.get("metadata", {}).get(key) == value ] # Apply pagination result = list(filtered_resources)[offset:offset+limit] return result async def delete(self, resource_type, resource_id): """Delete a resource from the registry. Args: resource_type: Type of resource resource_id: Resource identifier Returns: True if deletion was successful """ # Delete from in-memory registry if resource_type in self.resources and resource_id in self.resources[resource_type]: del self.resources[resource_type][resource_id] # Delete from storage backend if available if self.storage: try: return await self.storage.delete_resource(resource_type, resource_id) except Exception as e: self.logger.error( f"Failed to delete resource {resource_type}/{resource_id}: {str(e)}", emoji_key="error", exc_info=True ) return True class BaseToolMetrics: """ Metrics collection and aggregation system for tool execution statistics. The BaseToolMetrics class provides a standardized way to track and aggregate performance metrics for tool executions. It maintains cumulative statistics about calls to a tool, including execution counts, success rates, timing information, and optional token usage and cost data when available. This class is used both internally by BaseTool instances and by the with_tool_metrics decorator to provide consistent metrics tracking across the entire MCP ecosystem. The collected metrics enable monitoring, debugging, and optimization of tool performance and usage patterns. Metrics tracked: - Total number of calls - Number of successful and failed calls - Success rate - Total, minimum, and maximum execution duration - Total token usage (for LLM-based tools) - Total cost (for tools with cost accounting) The metrics are aggregated in memory and can be retrieved at any time via the get_stats() method. They represent the lifetime statistics of the tool since the metrics object was created. Example: ```python # Accessing metrics from a tool my_tool = MyToolClass(mcp_server) metrics = my_tool.metrics.get_stats() print(f"Success rate: {metrics['success_rate']:.2%}") print(f"Average duration: {metrics['average_duration']:.2f}s") ``` """ def __init__(self): """Initialize metrics tracking.""" self.total_calls = 0 self.successful_calls = 0 self.failed_calls = 0 self.total_duration = 0.0 self.min_duration = float('inf') self.max_duration = 0.0 self.total_tokens = 0 self.total_cost = 0.0 def record_call( self, success: bool, duration: float, tokens: Optional[int] = None, cost: Optional[float] = None ) -> None: """Record metrics for a tool call. Args: success: Whether the call was successful duration: Duration of the call in seconds tokens: Number of tokens used (if applicable) cost: Cost of the call (if applicable) """ self.total_calls += 1 if success: self.successful_calls += 1 else: self.failed_calls += 1 self.total_duration += duration self.min_duration = min(self.min_duration, duration) self.max_duration = max(self.max_duration, duration) if tokens is not None: self.total_tokens += tokens if cost is not None: self.total_cost += cost def get_stats(self) -> Dict[str, Any]: """Get current metrics. Returns: Dictionary of metrics """ if self.total_calls == 0: return { "total_calls": 0, "success_rate": 0.0, "average_duration": 0.0, "min_duration": 0.0, "max_duration": 0.0, "total_tokens": 0, "total_cost": 0.0, } return { "total_calls": self.total_calls, "successful_calls": self.successful_calls, "failed_calls": self.failed_calls, "success_rate": self.successful_calls / self.total_calls, "average_duration": self.total_duration / self.total_calls, "min_duration": self.min_duration if self.min_duration != float('inf') else 0.0, "max_duration": self.max_duration, "total_tokens": self.total_tokens, "total_cost": self.total_cost, } class BaseTool: """ Foundation class for all tool implementations in the Ultimate MCP Server. The BaseTool class serves as the fundamental building block for creating tools that can be registered with and executed by the MCP server. It provides core functionality for metrics tracking, logging, resource management, and tool execution. Tools in the Ultimate MCP Server ecosystem are designed to provide specific capabilities that can be invoked by clients (typically LLMs) to perform various operations like document processing, vector search, file operations, etc. The BaseTool architecture ensures all tools have consistent behavior for error handling, metrics collection, and server integration. Key features: - Standardized tool registration via decorators - Consistent metrics tracking for all tool executions - Unified error handling and response formatting - Integration with the server's resource registry - Logger setup with tool-specific naming Tool classes should inherit from BaseTool and define their tools using the @tool decorator. Each tool method should be async and follow the standard pattern of accepting parameters, performing operations, and returning results in a structured format. Example: ```python class MyCustomTools(BaseTool): tool_name = "my_custom_tools" description = "Provides custom tools for specific operations" @tool(name="custom_operation") @with_tool_metrics @with_error_handling async def perform_operation(self, param1: str, param2: int) -> Dict[str, Any]: # Implementation return {"result": "success", "data": some_data} ``` """ tool_name: str = "base_tool" description: str = "Base tool class for Ultimate MCP Server." def __init__(self, mcp_server): """Initialize the tool. Args: mcp_server: MCP server instance """ # If mcp_server is a Gateway instance, get the MCP object self.mcp = mcp_server.mcp if hasattr(mcp_server, 'mcp') else mcp_server self.logger = get_logger(f"tool.{self.tool_name}") self.metrics = BaseToolMetrics() # Initialize resource registry if not already available if not hasattr(self.mcp, "resources"): self.mcp.resources = ResourceRegistry() def _register_tools(self): """Register tools with MCP server. Override this method in subclasses to register specific tools. This method is no longer called by the base class constructor. Registration is now handled externally, e.g., in register_all_tools. """ pass async def execute(self, tool_name: str, params: Dict[str, Any]) -> Any: """ Execute a tool method by name with the given parameters. This method provides the core execution mechanism for BaseTool subclasses, dynamically dispatching calls to the appropriate tool method based on the tool_name parameter. It handles parameter validation, metrics collection, and error standardization to ensure consistent behavior across all tools. Execution flow: 1. Looks up the requested tool method in the class 2. Validates that the method is properly marked as a tool 3. Applies metrics tracking via _wrap_with_metrics 4. Executes the tool with the provided parameters 5. Returns the tool's response or a standardized error Args: tool_name: Name of the specific tool method to execute params: Dictionary of parameters to pass to the tool method (These parameters will be unpacked as kwargs) Returns: The result returned by the tool method, or a standardized error response if execution fails Raises: ToolError: If the specified tool_name is not found or not properly marked as a tool method Example: ```python # Direct execution of a tool method result = await my_tool_instance.execute( "analyze_document", {"document_id": "doc123", "analysis_type": "sentiment"} ) # Error handling if "isError" in result and result["isError"]: print(f"Tool execution failed: {result['error']['message']}") else: print(f"Analysis result: {result['analysis_score']}") ``` """ # Find method with tool name method_name = tool_name.split(".")[-1] # Handle namespaced tools method = getattr(self, method_name, None) if not method or not hasattr(method, "_tool"): raise ToolError( f"Tool not found: {tool_name}", error_code="tool_not_found" ) # Execute tool with metrics wrapper return await self._wrap_with_metrics(method, **params) async def _wrap_with_metrics( self, func: Callable, *args, **kwargs ) -> Any: """ Internal method that wraps a function call with metrics tracking. This method provides a standardized way to execute tool functions while capturing performance metrics such as execution duration, success/failure status, token usage, and cost. These metrics are stored in the BaseTool instance's metrics object for later analysis and reporting. The method performs the following steps: 1. Records the start time of the operation 2. Executes the provided function with the supplied arguments 3. If successful, extracts metrics data from the result (if available) 4. Records the execution metrics in the BaseTool's metrics object 5. Returns the original result or propagates any exceptions that occurred Metrics extraction: - If the result is a dictionary, it will attempt to extract: - Token usage from either result["tokens"]["total"] or result["total_tokens"] - Cost information from result["cost"] Args: func: Async function to execute with metrics tracking *args: Positional arguments to pass to the function **kwargs: Keyword arguments to pass to the function Returns: The result of the wrapped function call Raises: Any exception raised by the wrapped function (after logging it) Notes: - This method is typically called internally by BaseTool subclasses - Related to but different from the standalone with_tool_metrics decorator - Exceptions are logged but not caught (to allow proper error handling) """ start_time = time.time() success = False tokens = None cost = None try: # Call function result = await func(*args, **kwargs) # Extract metrics if available if isinstance(result, dict): if "tokens" in result and isinstance(result["tokens"], dict): tokens = result["tokens"].get("total") elif "total_tokens" in result: tokens = result["total_tokens"] cost = result.get("cost") success = True return result except Exception as e: self.logger.error( f"Tool execution failed: {func.__name__}: {str(e)}", emoji_key="error", tool=func.__name__, exc_info=True ) raise finally: # Record metrics duration = time.time() - start_time self.metrics.record_call( success=success, duration=duration, tokens=tokens, cost=cost ) def with_tool_metrics(func): """ Decorator that automatically tracks performance metrics for tool functions. This decorator captures and records execution metrics for both class methods and standalone functions. It adapts its behavior based on whether the decorated function is a method on a BaseTool instance or a standalone function. Metrics captured include: - Execution time (duration in seconds) - Success/failure state - Token usage (extracted from result if available) - Cost information (extracted from result if available) The decorator performs several functions: 1. Captures start time before execution 2. Executes the wrapped function, preserving all args/kwargs 3. Extracts metrics from the result dictionary if available 4. Logs execution statistics 5. Updates metrics in the BaseTool.metrics object if available When used with other decorators: - Should be applied before with_error_handling to ensure metrics are captured even when errors occur - Works well with with_cache, tracking metrics for both cache hits and misses - Compatible with with_retry, recording each attempt separately Args: func: The async function to decorate (can be a method or standalone function) Returns: Wrapped async function that captures and records metrics Example: ```python @with_tool_metrics @with_error_handling async def my_tool_function(param1, param2): # Function implementation ``` """ @functools.wraps(func) async def wrapper(*args, **kwargs): # Check if the first arg looks like a BaseTool instance self_obj = args[0] if args and isinstance(args[0], BaseTool) else None tool_name = getattr(self_obj, 'tool_name', func.__name__) start_time = time.time() success = False tokens = None cost = None result = None try: # Call original function, passing self_obj if it exists if self_obj: # Assumes if self_obj exists, it's the first positional arg expected by func result = func(self_obj, *args[1:], **kwargs) else: # Pass only the args/kwargs received, assuming func is standalone result = func(*args, **kwargs) # Only await when necessary if inspect.isawaitable(result): result = await result # result is now either a ToolResult _or_ an async iterator # Extract metrics if available from result if isinstance(result, dict): if "tokens" in result and isinstance(result["tokens"], dict): tokens = result["tokens"].get("total") elif "total_tokens" in result: tokens = result["total_tokens"] cost = result.get("cost") success = True return result except Exception as e: logger.error( f"Tool execution failed: {tool_name}: {str(e)}", emoji_key="error", tool=tool_name, exc_info=True ) raise # Re-raise exception for other handlers (like with_error_handling) finally: # Record metrics duration = time.time() - start_time # Log execution stats logger.debug( f"Tool execution: {tool_name} ({'success' if success else 'failed'})", emoji_key="tool" if success else "error", tool=tool_name, time=duration, cost=cost ) # Update metrics if we found a self object with a metrics attribute if self_obj and hasattr(self_obj, 'metrics'): self_obj.metrics.record_call( success=success, duration=duration, tokens=tokens, cost=cost ) return wrapper def with_retry( max_retries: int = 3, retry_delay: float = 1.0, backoff_factor: float = 2.0, retry_exceptions: List[Type[Exception]] = None ): """ Decorator that adds exponential backoff retry logic to async tool functions. This decorator wraps an async function with retry logic that will automatically re-execute the function if it fails with certain exceptions. It implements an exponential backoff strategy to progressively increase the wait time between retry attempts, reducing load during transient failures. Retry behavior: 1. When the decorated function raises an exception, the decorator checks if it's a retriable exception type (based on the retry_exceptions parameter) 2. If retriable, it waits for a delay period (which increases with each attempt) 3. After waiting, it retries the function with the same arguments 4. This process repeats until either the function succeeds or max_retries is reached Args: max_retries: Maximum number of retry attempts before giving up (default: 3) retry_delay: Initial delay in seconds before first retry (default: 1.0) backoff_factor: Multiplier for delay between retries (default: 2.0) Each retry's delay is calculated as: retry_delay * (backoff_factor ^ attempt) retry_exceptions: List of exception types that should trigger retries. If None, all exceptions will trigger retries. Returns: A decorator function that wraps the given async function with retry logic. Example: ```python @with_retry(max_retries=3, retry_delay=2.0, backoff_factor=3.0, retry_exceptions=[ConnectionError, TimeoutError]) async def fetch_data(url): # This function will retry up to 3 times if it raises ConnectionError or TimeoutError # Delays between retries: 2s, 6s, 18s # For other exceptions, it will fail immediately return await some_api_call(url) ``` Notes: - This decorator only works with async functions - The decorated function must be idempotent (safe to call multiple times) - Retries are logged at WARNING level, final failures at ERROR level - The final exception is re-raised after all retries are exhausted """ def decorator(func): @functools.wraps(func) async def wrapper(*args, **kwargs): last_exception = None delay = retry_delay for attempt in range(max_retries + 1): try: # Call original function return await func(*args, **kwargs) except Exception as e: # Only retry on specified exceptions if retry_exceptions and not any( isinstance(e, exc_type) for exc_type in retry_exceptions ): raise last_exception = e # Log retry attempt if attempt < max_retries: logger.warning( f"Tool execution failed, retrying ({attempt+1}/{max_retries}): {str(e)}", emoji_key="warning", tool=func.__name__, attempt=attempt+1, max_retries=max_retries, delay=delay ) # Wait before retrying await asyncio.sleep(delay) # Increase delay for next retry delay *= backoff_factor else: # Log final failure logger.error( f"Tool execution failed after {max_retries} retries: {str(e)}", emoji_key="error", tool=func.__name__, exc_info=True ) # If we get here, all retries failed raise last_exception return wrapper return decorator def with_error_handling(func): """ Decorator that transforms tool function exceptions into standardized error responses. This decorator intercepts any exceptions raised during tool execution and converts them into a structured error response format following the MCP protocol standards. It ensures that clients receive consistent, actionable error information regardless of how or where the error occurred. The decorator performs several key functions: 1. Detects if it's decorating a BaseTool method or standalone function and adapts accordingly 2. Reconstructs function call arguments appropriately based on function signature 3. Catches exceptions raised during execution and transforms them into structured responses 4. Maps different exception types to corresponding MCP error types with appropriate metadata 5. Logs detailed error information while providing a clean, standardized response to clients Exception handling: - ToolError: Passed through with logging (assumes already formatted correctly) - ValueError: Converted to ToolInputError with detailed context - Other exceptions: Converted to ToolExecutionError with execution context All error responses have the same structure: ``` { "success": False, "isError": True, "error": { "type": "<error_type>", "message": "<human-readable message>", "details": {<context-specific details>}, "retriable": <boolean>, "suggestions": [<optional recovery suggestions>], "timestamp": <current_time> } } ``` Args: func: The async function to decorate (can be a method or standalone function) Returns: Decorated async function that catches exceptions and returns structured error responses Example: ```python @with_error_handling async def my_tool_function(param1, param2): # If this raises an exception, it will be transformed into a structured response # rather than propagating up to the caller # ... ``` """ @functools.wraps(func) async def wrapper(*args, **kwargs): # Check if the first arg looks like a BaseTool instance self_obj = args[0] if args and isinstance(args[0], BaseTool) else None # Determine tool_name based on instance or func name tool_name = getattr(self_obj, 'tool_name', func.__name__) sig = inspect.signature(func) func_params = set(sig.parameters.keys()) # noqa: F841 call_args = [] call_kwargs = {} if self_obj: expected_params = list(sig.parameters.values()) if expected_params and expected_params[0].name == 'self': call_args.append(self_obj) start_index = 1 if self_obj and call_args else 0 call_args.extend(args[start_index:]) # Pass all original kwargs through call_kwargs.update(kwargs) try: # Call original function with reconstructed args/kwargs # This version passes *all* kwargs received by the wrapper, # trusting FastMCP to pass the correct ones including 'ctx'. result = func(*call_args, **call_kwargs) # Only await when necessary if inspect.isawaitable(result): result = await result # result is now either a ToolResult _or_ an async iterator return result except ToolError as e: # Already a tool error, log and return logger.error( f"Tool error in {tool_name}: {str(e)} ({e.error_code})", emoji_key="error", tool=tool_name, error_code=e.error_code, details=e.details ) # Debug log the formatted error response error_response = format_error_response(e) logger.debug(f"Formatted error response for {tool_name}: {error_response}") # Return standardized error response return error_response except ValueError as e: # Convert ValueError to ToolInputError with more detailed information error = ToolInputError( f"Invalid input to {tool_name}: {str(e)}", details={ "tool_name": tool_name, "exception_type": "ValueError", "original_error": str(e) } ) logger.error( f"Invalid input to {tool_name}: {str(e)}", emoji_key="error", tool=tool_name, error_code=error.error_code ) # Return standardized error response return format_error_response(error) except Exception as e: # Create a more specific error message that includes the tool name specific_message = f"Execution error in {tool_name}: {str(e)}" # Convert to ToolExecutionError for other exceptions error = ToolExecutionError( specific_message, cause=e, details={ "tool_name": tool_name, "exception_type": type(e).__name__, "original_message": str(e) } ) logger.error( specific_message, emoji_key="error", tool=tool_name, exc_info=True ) # Return standardized error response return format_error_response(error) return wrapper def register_tool(mcp_server, name=None, description=None, cache_ttl=None): """ Register a standalone function as an MCP tool with optional caching and error handling. This function creates a decorator that registers the decorated function with the MCP server, automatically applying error handling and optional result caching. It provides a simpler alternative to class-based tool registration via the BaseTool class, allowing standalone functions to be exposed as MCP tools without creating a full tool class. The decorator handles: 1. Tool registration with the MCP server using the provided name (or function name) 2. Documentation via the provided description (or function docstring) 3. Optional result caching with the specified TTL 4. Standardized error handling via the with_error_handling decorator Args: mcp_server: MCP server instance to register the tool with name: Tool name used for registration (defaults to the function name if not provided) description: Tool description for documentation (defaults to function docstring if not provided) cache_ttl: Optional time-to-live in seconds for caching tool results. If provided, the tool results will be cached for this duration to improve performance for identical calls. Returns: Decorator function that transforms the decorated function into a registered MCP tool Example: ```python # Initialize MCP server mcp_server = FastMCP() # Register a function as a tool @register_tool(mcp_server, name="get_weather", cache_ttl=300) async def get_weather_data(location: str, units: str = "metric"): '''Get current weather data for a location.''' # Implementation return {"temperature": 22, "conditions": "sunny"} # The function is now registered as an MCP tool named "get_weather" # with 5-minute result caching and standardized error handling ``` Notes: - The decorated function must be async - If cache_ttl is provided, identical calls will return cached results rather than re-executing the function - Function signature is preserved, making it transparent to callers - For more complex tools with multiple methods, use the BaseTool class instead """ def decorator(func): # Get function name and docstring tool_name = name or func.__name__ tool_description = description or func.__doc__ or f"Tool: {tool_name}" # noqa: F841 # Apply caching if specified # if cache_ttl is not None: # func = with_cache(ttl=cache_ttl)(func) # Apply error handling func = with_error_handling(func) # Register with MCP server mcp_server.tool(name=tool_name)(func) return func return decorator def _get_json_schema_type(type_annotation): """ Convert Python type annotations to JSON Schema type definitions. This utility function translates Python's typing annotations into equivalent JSON Schema type definitions, enabling automatic generation of API documentation and client interfaces from Python function signatures. It handles basic types, Optional types, Lists, and provides reasonable defaults for complex types. The function is primarily used internally by the MCP framework to generate JSON Schema definitions for tool parameters, allowing clients to understand the expected input types and structures for each tool. Type mappings: - str -> {"type": "string"} - int -> {"type": "integer"} - float -> {"type": "number"} - bool -> {"type": "boolean"} - Optional[T] -> Same as T, but adds "null" to "type" array - List[T] -> {"type": "array", "items": <schema for T>} - Dict -> {"type": "object"} - Other complex types -> {"type": "object"} Args: type_annotation: A Python type annotation (from typing module or built-in types) Returns: A dictionary containing the equivalent JSON Schema type definition Notes: - This function provides only type information, not complete JSON Schema validation rules like minimum/maximum values, string patterns, etc. - Complex nested types (e.g., List[Dict[str, List[int]]]) are handled, but deeply nested structures may be simplified in the output schema - This function is meant for internal use by the tool registration system Examples: ```python # Basic types _get_json_schema_type(str) # -> {"type": "string"} _get_json_schema_type(int) # -> {"type": "integer"} # Optional types from typing import Optional _get_json_schema_type(Optional[str]) # -> {"type": ["string", "null"]} # List types from typing import List _get_json_schema_type(List[int]) # -> {"type": "array", "items": {"type": "integer"}} # Complex types from typing import Dict, List _get_json_schema_type(Dict[str, List[int]]) # -> {"type": "object"} ``` """ import typing # Handle basic types if type_annotation is str: return {"type": "string"} elif type_annotation is int: return {"type": "integer"} elif type_annotation is float: return {"type": "number"} elif type_annotation is bool: return {"type": "boolean"} # Handle Optional types origin = typing.get_origin(type_annotation) args = typing.get_args(type_annotation) if origin is Union and type(None) in args: # Optional type - get the non-None type non_none_args = [arg for arg in args if arg is not type(None)] if len(non_none_args) == 1: inner_type = _get_json_schema_type(non_none_args[0]) return inner_type # Handle lists if origin is list or origin is List: if args: item_type = _get_json_schema_type(args[0]) return { "type": "array", "items": item_type } return {"type": "array"} # Handle dictionaries if origin is dict or origin is Dict: return {"type": "object"} # Default to object for complex types return {"type": "object"} def with_state_management(namespace: str): """ Decorator that provides persistent state management capabilities to tool functions. This decorator enables stateful behavior in otherwise stateless tool functions by injecting state access methods that allow reading, writing, and deleting values from a persistent, namespace-based state store. This is essential for tools that need to maintain context across multiple invocations, manage session data, or build features with memory capabilities. The state management system provides: - Namespace isolation: Each tool can use its own namespace to prevent key collisions - Thread-safe concurrency: Built-in locks ensure safe parallel access to the same state - Optional persistence: State can be backed by disk storage for durability across restarts - Lazy loading: State is loaded from disk only when accessed, improving performance State accessibility functions injected into the decorated function: - get_state(key, default=None) → Any: Retrieve a value by key, with optional default - set_state(key, value) → None: Store a value under the specified key - delete_state(key) → None: Remove a value from the state store All state operations are async, allowing the tool to continue processing while state operations are pending. Args: namespace: A unique string identifying this tool's state namespace. This should be chosen carefully to avoid collisions with other tools. Recommended format: "<tool_category>.<specific_feature>" Examples: "conversation.history", "user.preferences", "document.cache" Returns: A decorator function that wraps the original tool function, adding state management capabilities via injected parameters. Examples: Basic usage with conversation history: ```python @with_state_management("conversation.history") async def chat_with_memory(message: str, ctx=None, get_state=None, set_state=None, delete_state=None): # Get previous messages from persistent store history = await get_state("messages", []) # Add new message history.append({"role": "user", "content": message}) # Generate response based on all previous conversation context response = generate_response(message, history) # Add AI response to history history.append({"role": "assistant", "content": response}) # Store updated history for future calls await set_state("messages", history) return {"response": response} ``` Advanced pattern with conversational memory and user customization: ```python @with_state_management("assistant.settings") async def personalized_assistant( query: str, update_preferences: bool = False, preferences: Dict[str, Any] = None, ctx=None, get_state=None, set_state=None, delete_state=None ): # Get user ID from context user_id = ctx.get("user_id", "default_user") # Retrieve user-specific preferences user_prefs = await get_state(f"prefs:{user_id}", { "tone": "professional", "verbosity": "concise", "expertise_level": "intermediate" }) # Update preferences if requested if update_preferences and preferences: user_prefs.update(preferences) await set_state(f"prefs:{user_id}", user_prefs) # Get conversation history history = await get_state(f"history:{user_id}", []) # Process query using preferences and history response = process_personalized_query( query, user_preferences=user_prefs, conversation_history=history ) # Update conversation history history.append({"query": query, "response": response}) if len(history) > 20: # Keep only recent history history = history[-20:] await set_state(f"history:{user_id}", history) return { "response": response, "preferences": user_prefs } ``` State persistence across server restarts: ```python # First call to the tool @with_state_management("task.progress") async def long_running_task(task_id: str, step: int = None, ctx=None, get_state=None, set_state=None, delete_state=None): # Get current progress progress = await get_state(task_id, {"completed_steps": [], "current_step": 0}) # Update progress if a new step is provided if step is not None: progress["current_step"] = step progress["completed_steps"].append({ "step": step, "timestamp": time.time() }) await set_state(task_id, progress) # Even if the server restarts, the next call will retrieve the saved progress return { "task_id": task_id, "progress": progress, "completed": len(progress["completed_steps"]), "current_step": progress["current_step"] } ``` Implementation Pattern: The decorator works by injecting three async state management functions into the decorated function's keyword arguments: 1. `get_state(key, default=None)`: - Retrieves state values from the persistent store - If key doesn't exist, returns the provided default value - Example: `user_data = await get_state("user:12345", {})` 2. `set_state(key, value)`: - Stores a value in the persistent state store - Automatically serializes complex Python objects (dicts, lists, etc.) - Example: `await set_state("session:abc", {"authenticated": True})` 3. `delete_state(key)`: - Removes a key and its associated value from the store - Example: `await delete_state("temporary_data")` Notes: - The decorated function must accept get_state, set_state, delete_state, and ctx parameters, either explicitly or via **kwargs. - State persistence depends on the MCP server configuration. If persistence is enabled, state will survive server restarts. - For large objects, consider storing only references or identifiers in the state and using a separate storage system for the actual data. - The state store is shared across all server instances, so state keys should be chosen to avoid collisions between different tools and features. """ def decorator(func): @functools.wraps(func) async def wrapper(*args, **kwargs): # Get context from kwargs context = kwargs.get('ctx') if not context or not hasattr(context, 'fastmcp'): raise ValueError("Context with FastMCP server required") # Access StateStore via the FastMCP 2.0+ pattern if not hasattr(context.fastmcp, '_state_store'): raise ValueError("FastMCP server does not have a state store attached") state_store = context.fastmcp._state_store # Add state accessors to kwargs kwargs['get_state'] = lambda key, default=None: state_store.get(namespace, key, default) kwargs['set_state'] = lambda key, value: state_store.set(namespace, key, value) kwargs['delete_state'] = lambda key: state_store.delete(namespace, key) return await func(*args, **kwargs) # Update signature to include context parameter if not already present sig = inspect.signature(func) if 'ctx' not in sig.parameters: wrapped_params = list(sig.parameters.values()) wrapped_params.append( inspect.Parameter('ctx', inspect.Parameter.KEYWORD_ONLY, annotation='Optional[Dict[str, Any]]', default=None) ) wrapper.__signature__ = sig.replace(parameters=wrapped_params) return wrapper return decorator

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kappasig920/Ultimate-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server