Crawlab MCP Server

Official
import json import logging import re import time from typing import Any, Dict, List, Optional, Tuple from mcp import Tool from crawlab_mcp.utils.constants import ( MODELS_WITH_TOOL_SUPPORT, MODEL_TOOL_SUPPORT_PATTERNS, PYTHON_KEYWORDS, ) from crawlab_mcp.utils.http import api_request tools_logger = logging.getLogger("crawlab_mcp.utils.tools") # Simple type conversion from OpenAPI to Python types OPENAPI_TO_PYTHON_TYPES = { "string": str, "integer": int, "number": float, "boolean": bool, "array": list, "object": dict, } # Define mapping from Python types to OpenAPI types PYTHON_TO_OPENAPI_TYPES = { str: "string", int: "integer", float: "number", bool: "boolean", list: "array", dict: "object", Dict: "object", List: "array", Any: "string", # Default to string for Any } def extract_openapi_parameters(operation: Dict[str, Any]) -> Dict[str, Tuple]: """ Extract parameter information from an OpenAPI operation. Args: operation: The operation object from the OpenAPI spec Returns: Dictionary mapping parameter names to tuples of: (type, default_value, description, is_path_param, additional_schema) where additional_schema contains other OpenAPI schema properties like enum, format, etc. """ param_dict = {} def get_default_value(param_type: str, is_required: bool) -> Any: """Helper to get the default value for a parameter based on its type""" if is_required: return None # Default values for optional parameters by type defaults = {"string": "", "array": [], "object": {}, "boolean": False} # Default to 0 for number types if param_type in ["integer", "number"]: return 0 return defaults.get(param_type, None) # Process path parameters and query parameters for param in operation.get("parameters", []): param_name = param.get("name") param_required = param.get("required", False) param_schema = param.get("schema", {}) param_description = param.get("description", "") param_type = param_schema.get("type", "string") param_in = param.get("in", "") # Flag whether this is a path parameter is_path_param = param_in == "path" python_type = OPENAPI_TO_PYTHON_TYPES.get(param_type, str) default_val = get_default_value(param_type, param_required) # Ensure path parameters are required if is_path_param: default_val = None # Extract additional schema properties (enum, format, minimum, maximum, pattern, etc.) additional_schema = {} for key in [ "enum", "format", "minimum", "maximum", "pattern", "exclusiveMinimum", "exclusiveMaximum", "minLength", "maxLength", "multipleOf", ]: if key in param_schema: additional_schema[key] = param_schema[key] # Add parameter to the dictionary with path parameter flag and additional schema param_dict[param_name] = ( python_type, default_val, param_description, is_path_param, additional_schema, ) # Process request body if present request_body = operation.get("requestBody", {}) if request_body: content = request_body.get("content", {}) json_content = content.get("application/json", {}) if json_content: schema = json_content.get("schema", {}) properties = schema.get("properties", {}) required = schema.get("required", []) for prop_name, prop_schema in properties.items(): prop_type = prop_schema.get("type", "string") prop_description = prop_schema.get("description", "") prop_required = prop_name in required python_type = OPENAPI_TO_PYTHON_TYPES.get(prop_type, str) default_val = get_default_value(prop_type, prop_required) # Extract additional schema properties for body parameters additional_schema = {} for key in [ "enum", "format", "minimum", "maximum", "pattern", "exclusiveMinimum", "exclusiveMaximum", "minLength", "maxLength", "multipleOf", ]: if key in prop_schema: additional_schema[key] = prop_schema[key] # Add parameter to the dictionary (not a path parameter) with additional schema param_dict[prop_name] = ( python_type, default_val, prop_description, False, additional_schema, ) return param_dict def create_input_schema_from_openapi( operation_id: str, operation: Dict[str, Any], method: str, path: str ) -> Dict[str, Any]: """Create a standardized input schema from an OpenAPI operation. Args: operation_id: The operation ID in the OpenAPI spec operation: The operation object from the OpenAPI spec method: The HTTP method (GET, POST, etc.) path: The path for the operation Returns: A dictionary with the input schema information """ # Extract path parameters from the path path_param_names = re.findall(r"{([^{}]+)}", path) # Create a basic schema structure input_schema = { "type": "object", "properties": {}, "required": [], } # Helper function to create a property schema from a parameter schema def create_property_schema(param_schema: Dict[str, Any], description: str) -> Dict[str, Any]: """Create a property schema for the input schema.""" property_schema = {} # Copy relevant fields for field in ["type", "format", "enum", "minimum", "maximum", "pattern"]: if field in param_schema: property_schema[field] = param_schema[field] # Enhance enum descriptions to make them more explicit for AI agents if field == "enum" and param_schema[field]: enum_values = param_schema[field] enum_str = ", ".join([f"'{v}'" for v in enum_values]) property_schema["description"] = ( f"{description or ''}. Valid values: [{enum_str}]" ) # Handle schema reference if "$ref" in param_schema: # For simplicity, we'll just extract the type from the reference # In a real implementation, you might want to resolve the reference ref_parts = param_schema["$ref"].split("/") type_name = ref_parts[-1] # Convert CamelCase to snake_case for readability type_name = re.sub(r"(?<!^)(?=[A-Z])", "_", type_name).lower() # Remove common suffixes type_name = type_name.replace("_schema", "").replace("_type", "") property_schema["type"] = type_name # Add description if not empty and we haven't already added an enum-enhanced description if description and "description" not in property_schema: property_schema["description"] = description return property_schema # Process path and query parameters for param in operation.get("parameters", []): param_name = param.get("name") param_schema = param.get("schema", {}) param_description = param.get("description", "") param_required = param.get("required", False) param_in = param.get("in", "") # Force path parameters to be required if param_name in path_param_names or param_in == "path": param_required = True # Create and add the property schema property_schema = create_property_schema(param_schema, param_description) # Mark path parameters clearly in the schema if param_name in path_param_names or param_in == "path": property_schema["x-path-parameter"] = True property_schema["description"] = ( f"[Path Parameter] {property_schema.get('description', '')}" ) input_schema["properties"][param_name] = property_schema # Add to required list if needed if param_required and param_name not in input_schema["required"]: input_schema["required"].append(param_name) # Process request body if present request_body = operation.get("requestBody", {}) if request_body: content = request_body.get("content", {}) json_content = content.get("application/json", {}) if json_content: schema = json_content.get("schema", {}) properties = schema.get("properties", {}) required = schema.get("required", []) for prop_name, prop_schema in properties.items(): prop_description = prop_schema.get("description", "") property_schema = create_property_schema(prop_schema, prop_description) input_schema["properties"][prop_name] = property_schema # Add to required list if needed if prop_name in required and prop_name not in input_schema["required"]: input_schema["required"].append(prop_name) # If there are no required parameters, remove the required list if not input_schema["required"]: input_schema.pop("required") # Final schema structure tool_schema = { "name": operation_id, "inputSchema": input_schema, } return tool_schema def create_tool_function(tool_name, method, path, param_dict, enable_logging=True): """Create a tool function that calls the Crawlab API based on OpenAPI parameters. Args: tool_name: The name of the tool/operation method: HTTP method (GET, POST, etc.) path: API endpoint path param_dict: Dictionary of parameters with their types, defaults, descriptions, and schema information Format: { "param_name": ( param_type, # Python type (str, int, etc.) default_value, # Default value or None if required description, # Parameter description is_path_param, # Whether this is a path parameter additional_schema # Dictionary with additional schema information (enum, format, etc.) ) } enable_logging: Whether to enable execution logging for this tool (default: True) Returns: A callable function with proper type annotations to be registered as a tool """ import functools import inspect from typing import ( Any, Dict, Literal, ) # Extract path parameters from the path path_param_names = re.findall(r"{([^{}]+)}", path) # Validate inputs to prevent code injection if not isinstance(tool_name, str) or not tool_name.isidentifier(): raise ValueError(f"Tool name '{tool_name}' is not a valid Python identifier") if not isinstance(method, str) or method.lower() not in [ "get", "post", "put", "delete", "patch", ]: raise ValueError(f"Invalid HTTP method: {method}") if not isinstance(path, str): raise ValueError("Path must be a string") # Separate required and optional parameters required_params = [] optional_params = [] param_mapping = {} # Map safe parameter names to original names used_param_names = set() # Track used parameter names to avoid duplicates # Process parameters to handle Python keywords and reserved names for param_name, ( param_type, default_val, description, is_path_param, additional_schema, ) in param_dict.items(): # Validate parameter name to prevent code injection if not isinstance(param_name, str): raise ValueError(f"Parameter name must be a string, got {type(param_name)}") # Generate a safe parameter name if needed safe_param_name = param_name if ( param_name in PYTHON_KEYWORDS or param_name == "id" or param_name.startswith("_") or not param_name.isidentifier() ): clean_name = "".join(c for c in param_name if c.isalnum() or c == "_") clean_name = clean_name.lstrip("_") if not clean_name: clean_name = "param" safe_param_name = f"param_{clean_name}" # Ensure the parameter name is unique suffix = 1 original_safe_name = safe_param_name while safe_param_name in used_param_names: safe_param_name = f"{original_safe_name}_{suffix}" suffix += 1 param_mapping[safe_param_name] = param_name # Add to used parameters set used_param_names.add(safe_param_name) # Ensure path parameters are required if is_path_param: tools_logger.warning( f"Path parameter '{param_name}' in {path} should be required. Forcing as required." ) default_val = None # Separate required and optional parameters if default_val is None: required_params.append((safe_param_name, param_type, description, is_path_param)) else: optional_params.append( (safe_param_name, param_type, default_val, description, is_path_param) ) # Helper function to create type annotation based on parameter type and additional schema def create_type_annotation(param_type, additional_schema): # Check if enum values are provided if additional_schema and "enum" in additional_schema: enum_values = additional_schema["enum"] # For string enums, use Literal if param_type == str and all(isinstance(v, str) for v in enum_values): return f"Literal[{', '.join(repr(v) for v in enum_values)}]" # For numeric enums, use Literal elif param_type in (int, float) and all( isinstance(v, (int, float)) for v in enum_values ): return f"Literal[{', '.join(repr(v) for v in enum_values)}]" # For other common types, use their type annotations if param_type == str: return "str" elif param_type == int: return "int" elif param_type == float: return "float" elif param_type == bool: return "bool" elif param_type == list: return "List[Any]" elif param_type == dict: return "Dict[str, Any]" else: return "Any" # Helper function to create actual type objects for runtime validation def create_actual_type(param_type, additional_schema): # Check if enum values are provided if additional_schema and "enum" in additional_schema: enum_values = additional_schema["enum"] # For enums, use Literal if all(isinstance(v, (str, int, float, bool)) for v in enum_values): return Literal[tuple(enum_values)] # For other types, return the type directly return param_type # Generate function signature as string for eval params_str = [] type_annotations = {} param_validators = {} # Add required parameters for p_name, p_type, p_desc, _ in required_params: orig_name = param_mapping.get(p_name, p_name) p_schema = param_dict.get(orig_name, (None, None, None, None, {}))[4] type_anno = create_type_annotation(p_type, p_schema) params_str.append(f"{p_name}: {type_anno}") type_annotations[p_name] = create_actual_type(p_type, p_schema) # Create validator for schema constraints if p_schema: validator = {} if "enum" in p_schema: validator["enum"] = p_schema["enum"] if "minimum" in p_schema and p_type in (int, float): validator["minimum"] = p_schema["minimum"] if "maximum" in p_schema and p_type in (int, float): validator["maximum"] = p_schema["maximum"] if "pattern" in p_schema and p_type == str: validator["pattern"] = p_schema["pattern"] if validator: param_validators[p_name] = validator # Add optional parameters with their default values for p_name, p_type, default, p_desc, _ in optional_params: orig_name = param_mapping.get(p_name, p_name) p_schema = param_dict.get(orig_name, (None, None, None, None, {}))[4] type_anno = create_type_annotation(p_type, p_schema) # Format default value correctly if default is None: default_str = "None" elif isinstance(default, str): default_str = f'"{default}"' elif isinstance(default, bool): default_str = str(default).lower() else: default_str = str(default) params_str.append(f"{p_name}: {type_anno} = {default_str}") type_annotations[p_name] = create_actual_type(p_type, p_schema) # Create validator for schema constraints if p_schema: validator = {} if "enum" in p_schema: validator["enum"] = p_schema["enum"] if "minimum" in p_schema and p_type in (int, float): validator["minimum"] = p_schema["minimum"] if "maximum" in p_schema and p_type in (int, float): validator["maximum"] = p_schema["maximum"] if "pattern" in p_schema and p_type == str: validator["pattern"] = p_schema["pattern"] if validator: param_validators[p_name] = validator # Define the function dynamically using a factory approach and safer methods def create_wrapper(): # Create function documentation doc_lines = [f"Call {method.upper()} {path}"] if required_params: doc_lines.append("\nRequired Parameters:") for p_name, p_type, p_desc, is_path in required_params: orig_name = param_mapping.get(p_name, p_name) path_indicator = " (path parameter)" if is_path else "" doc_lines.append(f" {p_name}: {p_desc or 'No description'}{path_indicator}") if optional_params: doc_lines.append("\nOptional Parameters:") for p_name, p_type, p_default, p_desc, is_path in optional_params: doc_lines.append(f" {p_name}: {p_desc or 'No description'} (default: {p_default})") function_doc = "\n".join(doc_lines) # Create parameter list for the signature parameters = [] # Add required parameters for p_name, _, _, _ in required_params: parameters.append( inspect.Parameter( p_name, inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=type_annotations.get(p_name, inspect.Parameter.empty), ) ) # Add optional parameters with their default values for p_name, _, default, _, _ in optional_params: parameters.append( inspect.Parameter( p_name, inspect.Parameter.POSITIONAL_OR_KEYWORD, default=default, annotation=type_annotations.get(p_name, inspect.Parameter.empty), ) ) # Create the function signature sig = inspect.Signature(parameters, return_annotation=Dict[str, Any]) # Create the actual function that will be called def actual_function(*args, **kwargs): if enable_logging: tools_logger.info(f"Executing tool: {tool_name} ({method.upper()} {path})") tools_logger.debug(f"Tool parameters: {kwargs}") start_time = time.time() try: # Bind the arguments to the signature to get a mapping of parameter names to values bound_args = sig.bind(*args, **kwargs) bound_args.apply_defaults() # Apply default values for missing optional parameters # Get the parameter values param_values = bound_args.arguments # Validate parameters against validators for param_name, value in param_values.items(): if param_name in param_validators and value is not None: validator = param_validators[param_name] # Check enum values if "enum" in validator: allowed_values = validator["enum"] if value not in allowed_values: allowed_str = ", ".join([repr(v) for v in allowed_values]) error_msg = f"Parameter '{param_name}' must be one of [{allowed_str}], got {repr(value)}" if enable_logging: tools_logger.error(error_msg) raise ValueError(error_msg) # Check minimum constraint if "minimum" in validator and isinstance(value, (int, float)): minimum = validator["minimum"] if value < minimum: error_msg = ( f"Parameter '{param_name}' must be >= {minimum}, got {value}" ) if enable_logging: tools_logger.error(error_msg) raise ValueError(error_msg) # Check maximum constraint if "maximum" in validator and isinstance(value, (int, float)): maximum = validator["maximum"] if value > maximum: error_msg = ( f"Parameter '{param_name}' must be <= {maximum}, got {value}" ) if enable_logging: tools_logger.error(error_msg) raise ValueError(error_msg) # Check pattern constraint if "pattern" in validator and isinstance(value, str): pattern = validator["pattern"] if not re.match(pattern, value): error_msg = f"Parameter '{param_name}' must match pattern '{pattern}', got {repr(value)}" if enable_logging: tools_logger.error(error_msg) raise ValueError(error_msg) # Check for missing path parameters missing_path_params = [] for path_param in path_param_names: found = False # Check if using original parameter name if path_param in param_values: found = True else: # Check if using safe parameter name for safe_name, orig_name in param_mapping.items(): if orig_name == path_param and safe_name in param_values: found = True break if not found: missing_path_params.append(path_param) if missing_path_params: error_msg = f"Missing required path parameter(s) for {path}: {', '.join(missing_path_params)}" tools_logger.error(error_msg) raise ValueError(error_msg) # Validate and transform parameters transformed_params = {} for key, value in param_values.items(): # Get original parameter name if it was renamed orig_key = param_mapping.get(key, key) # Get parameter type info (if available) param_info = param_dict.get(orig_key) if param_info: # Extract parameter type param_type = param_info[0] is_path_param = orig_key in path_param_names # Check if this is a path parameter and validate it's not None if is_path_param and value is None: error_msg = f"Path parameter '{orig_key}' cannot be None for {path}" tools_logger.error(error_msg) raise ValueError(error_msg) # Apply type conversion if needed try: # Only convert if value is not None and not already the correct type if value is not None and not isinstance(value, param_type): transformed_params[key] = param_type(value) if enable_logging: tools_logger.debug( f"Converted parameter {key} from {type(value).__name__} to {param_type.__name__}" ) else: transformed_params[key] = value except (ValueError, TypeError) as e: # Special handling for path parameters - they must be valid if is_path_param: error_msg = ( f"Invalid value for path parameter '{orig_key}': {str(e)}" ) tools_logger.error(error_msg) raise ValueError(error_msg) if enable_logging: tools_logger.warning( f"Failed to convert parameter {key} to {param_type.__name__}: {str(e)}. Using original value." ) transformed_params[key] = value else: # If no type info, just pass through transformed_params[key] = value # Replace path parameters and build request data endpoint = path query_params = {} body_data = {} # Process all parameters for key, value in transformed_params.items(): # Skip None values for optional parameters that aren't required if value is None and key not in [p[0] for p in required_params]: continue # Get original parameter name if it was renamed orig_key = param_mapping.get(key, key) # Replace path parameters if "{" + orig_key + "}" in endpoint: # Convert value to string and ensure it's properly URL encoded str_value = str(value) endpoint = endpoint.replace("{" + orig_key + "}", str_value) # Add to appropriate dictionary based on HTTP method elif method.lower() in ["get", "delete"]: query_params[orig_key] = value else: body_data[orig_key] = value # Make the API request api_response = api_request( method=method.upper(), endpoint=endpoint.lstrip("/"), params=query_params if query_params else None, data=body_data if body_data else None, ) result = api_response.get("data", {}) if enable_logging: execution_time = time.time() - start_time tools_logger.info( f"Tool {tool_name} executed successfully in {execution_time:.2f} seconds" ) # Log result summary (truncate if too large) result_str = str(result) if len(result_str) > 200: tools_logger.debug(f"Result (truncated): {result_str[:197]}...") else: tools_logger.debug(f"Result: {result_str}") return result except Exception as e: if enable_logging: execution_time = time.time() - start_time tools_logger.error( f"Tool {tool_name} failed after {execution_time:.2f} seconds: {str(e)}", exc_info=True, ) raise # Create the wrapper function with proper signature and docstring @functools.wraps(actual_function) def wrapper(*args, **kwargs) -> Dict[str, Any]: return actual_function(*args, **kwargs) # Set the signature and docstring wrapper.__signature__ = sig wrapper.__doc__ = function_doc wrapper.__name__ = tool_name # Set function annotations with complete type information wrapper.__annotations__ = {k: v for k, v in sig.parameters.items()} wrapper.__annotations__["return"] = Dict[str, Any] # Create input schema that includes default values for optional parameters input_schema = { "type": "object", "properties": {}, "required": [p[0] for p in required_params], } # Add all parameters to the properties for p_name, p_type, p_desc, _ in required_params: orig_name = param_mapping.get(p_name, p_name) p_schema = param_dict.get(orig_name, (None, None, None, None, {}))[4] property_schema = {"type": PYTHON_TO_OPENAPI_TYPES.get(p_type, "string")} # Add description if available if p_desc: property_schema["description"] = p_desc # Add additional schema properties if available if p_schema: for key, value in p_schema.items(): property_schema[key] = value input_schema["properties"][p_name] = property_schema for p_name, p_type, default, p_desc, _ in optional_params: orig_name = param_mapping.get(p_name, p_name) p_schema = param_dict.get(orig_name, (None, None, None, None, {}))[4] property_schema = {"type": PYTHON_TO_OPENAPI_TYPES.get(p_type, "string")} # Add default value property_schema["default"] = default # Add description if available if p_desc: property_schema["description"] = p_desc # Add additional schema properties if available if p_schema: for key, value in p_schema.items(): property_schema[key] = value input_schema["properties"][p_name] = property_schema # Attach the input schema to the function wrapper.input_schema = input_schema return wrapper # Create the final function final_function = create_wrapper() return final_function def create_tool( tool_name: str, method: str, path: str, param_dict: Dict[str, Any], enable_logging=True ) -> Tool: """Create a Tool object with schema support for required fields, enums, and other schema features. Args: tool_name: The name of the tool/operation method: HTTP method (GET, POST, etc.) path: API endpoint path param_dict: Dictionary of parameters with their types, defaults, descriptions, and metadata enable_logging: Whether to enable execution logging for this tool (default: True) Returns: A Tool object ready to be registered with a ToolRegistry The param_dict should follow this format: { "param_name": ( param_type, # Python type (str, int, etc.) default_value, # Default value or None if required description, # Parameter description is_path_param, # Whether this is a path parameter { # Optional dictionary with additional schema information "enum": [...], # List of allowed values "minimum": n, # Minimum value for numbers "maximum": n, # Maximum value for numbers "pattern": "...", # Regex pattern for strings "format": "..." # Format for specific types (date, email, etc.) } ) } """ # Create the function for the tool func = create_tool_function(tool_name, method, path, param_dict, enable_logging) # Create the schema for the tool using the input_schema attribute schema = { "name": tool_name, "description": f"Call {method.upper()} {path}", "parameters": func.input_schema, # Use the input_schema directly } # Create and return the Tool object return Tool(name=tool_name, description=schema["description"], function=func, schema=schema) def list_tags(resolved_spec): """List all available tags/endpoint groups in the API.""" def wrapper(): tags_dict = {} # Extract tags from the top-level OpenAPI spec for tag_info in resolved_spec.get("tags", []): tag_name = tag_info.get("name", "") tags_dict[tag_name] = {"description": tag_info.get("description", ""), "tools": []} # If no tags are defined in the spec, initialize from operations if not tags_dict: for path, path_item in resolved_spec.get("paths", {}).items(): for method, operation in path_item.items(): if method.lower() not in ["get", "post", "put", "delete", "patch"]: continue operation_tags = operation.get("tags", []) for tag in operation_tags: if tag not in tags_dict: tags_dict[tag] = { "description": f"Operations tagged with {tag}", "tools": [], } # Populate tools under each tag for path, path_item in resolved_spec.get("paths", {}).items(): for method, operation in path_item.items(): if method.lower() not in ["get", "post", "put", "delete", "patch"]: continue operation_tags = operation.get("tags", []) operation_id = operation.get("operationId") summary = operation.get("summary", "") if operation_id: tool_info = { "name": operation_id, "method": method.upper(), "summary": summary, } # Add tool to each tag it belongs to for tag in operation_tags: if tag in tags_dict: tags_dict[tag]["tools"].append(tool_info) # Convert to list format for return tags_list = [ {"name": name, "description": info["description"], "tools": info["tools"]} for name, info in tags_dict.items() ] return {"tags": tags_list} return wrapper def model_supports_tools(model_name: str) -> bool: """ Check if a model supports tools/function calling based on regex patterns. Args: model_name: Name of the model to check. Returns: True if the model supports tools, False otherwise. """ # First check the legacy hard-coded dictionary if model_name in MODELS_WITH_TOOL_SUPPORT: return MODELS_WITH_TOOL_SUPPORT[model_name] # Then check regex patterns for pattern in MODEL_TOOL_SUPPORT_PATTERNS: if re.match(pattern, model_name): return True return False def export_tool_schemas(tool_schemas, output_file=None): """ Export tool schemas to a JSON file or return them as a string. Args: tool_schemas: Dictionary of tool schemas to export output_file: Optional path to export JSON file. If None, returns JSON string. Returns: If output_file is provided, writes to file and returns None. Otherwise, returns the JSON string. """ # Format the schemas as properly indented JSON json_str = json.dumps(tool_schemas, indent=2) if output_file: with open(output_file, "w") as f: f.write(json_str) tools_logger.info(f"Tool schemas exported to {output_file}") return None return json_str def get_tool_schemas_function(registered_tools): """Create a function that returns the schema definitions for tools. Args: registered_tools: Dictionary of tool definitions Returns: A callable function to be registered as a tool """ # Pre-process tools to create full schema information for each tool tool_schemas = {} for tool_name, tool_info in registered_tools.items(): # Extract info from the tool registration method = tool_info.get("method", "") path = tool_info.get("path", "") operation = tool_info.get("operation", {}) # Create full schema with rich type information input_schema = create_input_schema_from_openapi(tool_name, operation, method, path) # Store the enhanced schema tool_schemas[tool_name] = input_schema def get_tool_schemas(tool_name=None): """Get the schema definition for one or all tools. Args: tool_name: Optional name of specific tool to get schema for. If not provided, returns all tool schemas. Returns: Dictionary containing tool schema(s) with detailed parameter information including required parameters and enum values """ if tool_name is not None: if tool_name not in tool_schemas: return {"error": f"Tool '{tool_name}' not found"} return {"tools": [tool_schemas[tool_name]]} # Return all tool schemas return {"tools": list(tool_schemas.values())} return get_tool_schemas def list_parameter_info(registered_tools): """Create a function that returns detailed parameter information for tools. This is specifically designed to help AI agents understand what parameters are required and what enum values are available. Args: registered_tools: Dictionary of tool definitions Returns: A callable function to be registered as a tool """ # Pre-process tools to create parameter information for quick access tool_param_info = {} for tool_name, tool_info in registered_tools.items(): # Extract info from the tool registration method = tool_info.get("method", "") path = tool_info.get("path", "") operation = tool_info.get("operation", {}) # Get parameter details param_info = {"required_params": [], "enum_params": {}, "path_params": []} # Process parameters from the operation for param in operation.get("parameters", []): param_name = param.get("name") param_required = param.get("required", False) param_schema = param.get("schema", {}) param_in = param.get("in", "") # Track required parameters if param_required: param_info["required_params"].append(param_name) # Track path parameters if param_in == "path": param_info["path_params"].append(param_name) # Track enum parameters if "enum" in param_schema: param_info["enum_params"][param_name] = param_schema["enum"] # Process request body if present request_body = operation.get("requestBody", {}) if request_body: content = request_body.get("content", {}) json_content = content.get("application/json", {}) if json_content: schema = json_content.get("schema", {}) properties = schema.get("properties", {}) required = schema.get("required", []) # Add required body parameters if required: param_info["required_params"].extend(required) # Check for enum values in body parameters for prop_name, prop_schema in properties.items(): if "enum" in prop_schema: param_info["enum_params"][prop_name] = prop_schema["enum"] # Store parameter info for this tool tool_param_info[tool_name] = param_info def list_parameters(tool_name=None): """Get detailed parameter information for one or all tools. This function is designed to help AI agents understand what parameters are required and what enum values are available. Args: tool_name: Optional name of specific tool to get parameter info for. If not provided, returns parameter info for all tools. Returns: Dictionary containing parameter information including: - required_params: List of required parameter names - enum_params: Dictionary mapping parameter names to their enum values - path_params: List of parameters that are part of the URL path """ if tool_name is not None: if tool_name not in tool_param_info: return {"error": f"Tool '{tool_name}' not found"} return {"tool": tool_name, "parameters": tool_param_info[tool_name]} # Return all parameter information return {"tools": {name: info for name, info in tool_param_info.items()}} return list_parameters def create_tools_from_openapi( openapi_spec: Dict[str, Any], filter_tags: Optional[List[str]] = None, filter_operations: Optional[List[str]] = None, enable_logging: bool = True, ) -> Dict[str, Tool]: """Create Tool objects from an OpenAPI specification with enhanced schema support. Args: openapi_spec: The resolved OpenAPI specification filter_tags: Optional list of tags to filter operations by filter_operations: Optional list of operation IDs to include enable_logging: Whether to enable logging for the created tools Returns: A dictionary mapping operation IDs to Tool objects """ tools = {} # Get the paths from the OpenAPI spec paths = openapi_spec.get("paths", {}) # Iterate through each path for path, path_item in paths.items(): # Iterate through each HTTP method in the path for method, operation in path_item.items(): # Skip non-HTTP methods if method not in ["get", "post", "put", "delete", "patch"]: continue # Get the operation ID operation_id = operation.get("operationId") if not operation_id: # Generate an operation ID if not provided path_parts = path.strip("/").split("/") operation_id = f"{method}_{('_'.join(path_parts)).replace('-', '_')}" # Check if we should filter by tags if filter_tags: operation_tags = operation.get("tags", []) if not any(tag in filter_tags for tag in operation_tags): continue # Check if we should filter by operation IDs if filter_operations and operation_id not in filter_operations: continue # Extract the parameters param_dict = extract_openapi_parameters(operation) # Create the tool tools[operation_id] = create_tool( tool_name=operation_id, method=method, path=path, param_dict=param_dict, enable_logging=enable_logging, ) return tools