"""OpenAPI tool generator for Isilon MCP Server.
This module parses OpenAPI specifications and generates MCP tool definitions
with enhanced descriptions for LLM context.
Example:
>>> from isilon_mcp.tool_generator import ToolGenerator, load_openapi_spec
>>> spec = load_openapi_spec("powerscale_openapi.json")
>>> generator = ToolGenerator(spec)
>>> tools = generator.generate_tools()
>>> print(f"Generated {len(tools)} tools")
Note:
Only GET methods are generated by default for safe read-only operations.
Each tool includes credential parameters (host, username, password) that
must be provided per-request.
PowerScale API uses path convention:
- Platform API: /platform/{version}/{category}/{resource}
- Namespace API: /namespace/{path}
"""
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any
import yaml
from .exceptions import OpenAPILoadError, OpenAPIParseError
from .logging_config import get_logger
logger = get_logger(__name__)
# Maximum number of fields to show in description
MAX_FIELDS_DISPLAY = 20
MAX_KEY_FIELDS = 10
MAX_ENUM_VALUES = 5
def load_openapi_spec(spec_path: str) -> dict[str, Any]:
"""Load OpenAPI specification from file.
Supports both JSON and YAML formats.
Args:
spec_path: Path to the OpenAPI spec file.
Returns:
Parsed OpenAPI specification dictionary.
Raises:
OpenAPILoadError: If the file cannot be loaded.
OpenAPIParseError: If the file cannot be parsed.
"""
path = Path(spec_path)
if not path.exists():
raise OpenAPILoadError(spec_path, message=f"File not found: {spec_path}")
try:
content = path.read_text(encoding="utf-8")
except Exception as e:
raise OpenAPILoadError(spec_path, e) from e
try:
if path.suffix.lower() in (".yaml", ".yml"):
spec = yaml.safe_load(content)
else:
spec = json.loads(content)
if not isinstance(spec, dict):
raise OpenAPIParseError(spec_path, message="OpenAPI spec must be an object")
logger.info(
f"Loaded OpenAPI spec: {spec.get('info', {}).get('title', 'Unknown')}",
extra={"path_count": len(spec.get("paths", {}))},
)
return spec
except json.JSONDecodeError as e:
raise OpenAPIParseError(spec_path, e) from e
except yaml.YAMLError as e:
raise OpenAPIParseError(spec_path, e) from e
class ToolGenerator:
"""Generate MCP tools from OpenAPI specification.
This class parses an OpenAPI 3.0 specification and generates
MCP tool definitions for configured HTTP methods. Tools include enhanced
descriptions with schema information for better LLM understanding.
Attributes:
spec: The parsed OpenAPI specification.
tool_names: Dictionary tracking tool name usage for uniqueness.
allowed_methods: List of HTTP methods to generate tools for.
Example:
>>> generator = ToolGenerator(spec, allowed_methods=["GET"])
>>> tools = generator.generate_tools()
>>> for tool in tools:
... print(f"{tool['name']}: {tool['description'][:50]}...")
"""
# Default to GET only for safe read-only operations
DEFAULT_ALLOWED_METHODS = ["GET"]
def __init__(
self,
spec: dict[str, Any],
allowed_methods: list[str] | None = None,
) -> None:
"""Initialize tool generator with OpenAPI spec.
Args:
spec: Parsed OpenAPI specification dictionary.
allowed_methods: List of HTTP methods to generate tools for.
Defaults to ["GET"] for safe read-only operations.
Set to ["GET", "POST", "DELETE", "PATCH", "PUT"]
to enable all operations.
"""
self.spec = spec
self.tool_names: dict[str, int] = {}
self.allowed_methods = [
m.lower() for m in (allowed_methods or self.DEFAULT_ALLOWED_METHODS)
]
def generate_tools(self) -> list[dict[str, Any]]:
"""Generate MCP tools from OpenAPI spec for configured methods.
Returns:
List of MCP tool definitions.
Note:
Only methods specified in allowed_methods are included.
Default is GET only for safe read-only operations.
"""
tools: list[dict[str, Any]] = []
paths = self.spec.get("paths", {})
for path, path_item in paths.items():
# Generate tools for each allowed method
for method in self.allowed_methods:
if method in path_item:
operation = path_item[method]
tool = self._generate_tool_from_operation(path, method, operation)
if tool:
tools.append(tool)
logger.info(
"Generated MCP tools from OpenAPI spec",
extra={
"tool_count": len(tools),
"methods": [m.upper() for m in self.allowed_methods],
},
)
return tools
def _generate_tool_from_operation(
self,
path: str,
method: str,
operation: dict[str, Any],
) -> dict[str, Any] | None:
"""Generate a single MCP tool from an OpenAPI operation.
Args:
path: API endpoint path (e.g., "/platform/1/cluster/config").
method: HTTP method (lowercase).
operation: OpenAPI operation object.
Returns:
MCP tool definition or None if generation fails.
"""
try:
# Generate tool name from operationId or path + method
tool_name = operation.get(
"operationId"
) or self._generate_tool_name_from_path(path, method)
# Make tool name unique if duplicate exists
tool_name = self._make_unique_name(tool_name, path)
# Generate base description
base_description = (
operation.get("summary")
or operation.get("description")
or f"{method.upper()} {path}"
)
# Get category from path
category = self._get_category_from_path(path)
# Build enhanced description
description = self._build_enhanced_description(
base_description, category, operation
)
# Generate input schema from parameters
input_schema = self._generate_input_schema(operation, path)
return {
"name": tool_name,
"description": description,
"inputSchema": input_schema,
"_path": path, # Store path for later use
"_method": method,
}
except Exception as e:
logger.warning(
f"Failed to generate tool for {method.upper()} {path}",
extra={"error": str(e), "path": path, "method": method},
)
return None
def _get_category_from_path(self, path: str) -> str:
"""Extract category from API path.
PowerScale API paths follow patterns:
- /platform/{version}/{category}/{resource}
- /namespace/{path}
Args:
path: API endpoint path.
Returns:
Category name (e.g., "cluster", "protocols", "auth").
"""
parts = path.split("/")
# /platform/{version}/{category}/... -> category is at index 3
if len(parts) >= 4 and parts[1] == "platform":
return parts[3]
# /namespace/... -> category is "namespace"
if len(parts) >= 2 and parts[1] == "namespace":
return "namespace"
return ""
def _build_enhanced_description(
self,
base_description: str,
category: str,
operation: dict[str, Any],
) -> str:
"""Build enhanced description with parameter info for LLM context.
Args:
base_description: Original operation description.
category: API category name.
operation: OpenAPI operation object.
Returns:
Enhanced description with parameter information.
"""
description = base_description
# Add category context
if category:
category_descriptions = {
"auth": "Authentication, users, groups, roles, and access control",
"cluster": "Cluster configuration, nodes, and identity",
"protocols": "Protocol settings (NFS, SMB, HDFS, S3, HTTP, FTP)",
"quota": "Storage quota policies and reports",
"snapshot": "Snapshot management and schedules",
"sync": "SyncIQ replication policies and jobs",
"storagepool": "Storage pools, node pools, and tiers",
"network": "Network groupnets, subnets, and pools",
"event": "Events, alerts, and notifications",
"job": "Job engine and scheduled tasks",
"statistics": "Performance statistics and metrics",
"audit": "Audit logging and compliance",
"antivirus": "Antivirus scanning and policies",
"cloud": "CloudPools and cloud tiering",
"namespace": "File system operations (files, directories, ACLs)",
"zones": "Access zones configuration",
"upgrade": "Cluster upgrade operations",
"hardware": "Hardware inventory and management",
"license": "Software licensing",
"dedupe": "Deduplication settings",
"worm": "SmartLock WORM compliance",
"fsa": "File System Analytics",
"filepool": "SmartPools file policies",
}
if category in category_descriptions:
description += f"\n\nCategory: {category_descriptions[category]}"
# Add parameter hints
parameters = operation.get("parameters", [])
if parameters:
param_hints = []
for param in parameters[:5]: # Show first 5 params
name = param.get("name", "")
required = param.get("required", False)
param_desc = param.get("description", "")[:60]
marker = "*" if required else ""
if param_desc:
param_hints.append(f"- {name}{marker}: {param_desc}")
else:
param_hints.append(f"- {name}{marker}")
if param_hints:
description += "\n\nKey parameters:\n" + "\n".join(param_hints)
return description
def _generate_input_schema(
self,
operation: dict[str, Any],
path: str,
) -> dict[str, Any]:
"""Generate JSON Schema for tool input.
Args:
operation: OpenAPI operation object.
path: API path.
Returns:
JSON Schema for input validation.
"""
properties: dict[str, Any] = {}
required: list[str] = []
# Add credential properties (always required)
properties["host"] = {
"type": "string",
"description": "PowerScale cluster hostname or IP (e.g., 'powerscale.example.com')",
}
properties["username"] = {
"type": "string",
"description": "PowerScale username for authentication",
}
properties["password"] = {
"type": "string",
"description": "PowerScale password for authentication",
}
required.extend(["host", "username", "password"])
# Add optional connection parameters
properties["port"] = {
"type": "integer",
"description": "PowerScale API port (default: 8080)",
"default": 8080,
}
properties["use_https"] = {
"type": "boolean",
"description": "Use HTTPS instead of HTTP (default: true)",
"default": True,
}
properties["verify_ssl"] = {
"type": "boolean",
"description": "Verify SSL certificates (default: false for self-signed certs)",
"default": False,
}
# Add path parameters from the path itself
path_params = re.findall(r'\{([^}]+)\}', path)
for param_name in path_params:
if param_name not in properties:
properties[param_name] = {
"type": "string",
"description": f"Path parameter: {param_name}",
}
required.append(param_name)
# Add parameters from OpenAPI spec
for param in operation.get("parameters", []):
param_name = param.get("name", "")
if not param_name:
continue
# Skip if already added from path
if param_name in properties:
continue
param_schema = param.get("schema", {})
param_type = param_schema.get("type", "string")
param_description = param.get("description", "")
param_required = param.get("required", False)
param_in = param.get("in", "query")
prop: dict[str, Any] = {
"type": param_type,
"description": param_description or f"{param_in} parameter: {param_name}",
}
# Add enum values if present
if "enum" in param_schema:
prop["enum"] = param_schema["enum"][:MAX_ENUM_VALUES]
# Add default value if present
if "default" in param_schema:
prop["default"] = param_schema["default"]
properties[param_name] = prop
if param_required and param_name not in required:
required.append(param_name)
return {
"type": "object",
"properties": properties,
"required": required,
}
def _make_unique_name(self, tool_name: str, path: str) -> str:
"""Make tool name unique by adding suffix if needed.
Args:
tool_name: Original tool name.
path: API path for generating suffix.
Returns:
Unique tool name.
"""
original_name = tool_name
# Keep trying with different suffixes until we find a unique name
if tool_name in self.tool_names:
count = self.tool_names[original_name] + 1
self.tool_names[original_name] = count
# Add path-based suffix to make it unique
path_parts = [p for p in path.split("/") if p and not p.startswith("{")]
path_suffix = (
"_".join(path_parts[-2:]) if len(path_parts) >= 2 else ""
)
# Try path suffix first
if path_suffix:
tool_name = f"{original_name}_{path_suffix}"
# If still not unique, add counter
counter = 1
while tool_name in self.tool_names:
if path_suffix:
tool_name = f"{original_name}_{path_suffix}_{counter}"
else:
tool_name = f"{original_name}_{counter}"
counter += 1
# Mark this name as used
self.tool_names[tool_name] = 0
return tool_name
def _generate_tool_name_from_path(self, path: str, method: str) -> str:
"""Generate tool name from path and method.
Args:
path: API endpoint path.
method: HTTP method.
Returns:
Generated tool name in snake_case.
"""
# /platform/1/cluster/config -> cluster_config
parts = path.split("/")
meaningful_parts = [
p for p in parts
if p and p not in ("platform", "namespace")
and not p.startswith("{")
and not p.isdigit() # Skip version numbers
]
if meaningful_parts:
# Join with underscores and prefix with method
name = "_".join(meaningful_parts)
return f"{method}_{name}"
return f"{method}_operation"
def get_path_for_tool(self, tool_name: str) -> str | None:
"""Get API path for a tool by name.
Args:
tool_name: Name of the tool.
Returns:
API path or None if not found.
"""
for path, path_item in self.spec.get("paths", {}).items():
for method in self.allowed_methods:
if method in path_item:
operation = path_item[method]
op_id = operation.get("operationId")
# Check for exact match
if op_id == tool_name:
return path
# Check for prefix match
if op_id and tool_name.startswith(f"{op_id}_"):
return path
return None
def get_method_for_tool(self, tool_name: str) -> str:
"""Get HTTP method for a tool by name.
Args:
tool_name: Name of the tool.
Returns:
HTTP method (defaults to GET).
"""
for _path, path_item in self.spec.get("paths", {}).items():
for method in self.allowed_methods:
if method in path_item:
operation = path_item[method]
op_id = operation.get("operationId")
if op_id == tool_name or (
op_id and tool_name.startswith(f"{op_id}_")
):
return method.upper()
return "GET"