"""OpenAPI tool generator for OpManager MCP Server.
This module parses OpenAPI specifications and generates MCP tool definitions
with enhanced descriptions for LLM context.
Example:
>>> from opmanager_mcp.tool_generator import ToolGenerator, load_openapi_spec
>>> spec = load_openapi_spec("openapi.json")
>>> generator = ToolGenerator(spec)
>>> tools = generator.generate_tools()
>>> print(f"Generated {len(tools)} tools")
Note:
Only GET methods are generated by default for safe read-only operations.
Each tool includes credential parameters (host, apiKey) that must be
provided per-request.
OpManager API uses path convention:
- /api/json/{category}/{operation}
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import yaml
from .exceptions import OpenAPILoadError, OpenAPIParseError
from .logging_config import get_logger
logger = get_logger(__name__)
# Maximum number of fields to show in description
MAX_FIELDS_DISPLAY = 20
MAX_KEY_FIELDS = 10
MAX_ENUM_VALUES = 5
def load_openapi_spec(spec_path: str) -> dict[str, Any]:
"""Load OpenAPI specification from file.
Supports both JSON and YAML formats.
Args:
spec_path: Path to the OpenAPI spec file.
Returns:
Parsed OpenAPI specification dictionary.
Raises:
OpenAPILoadError: If the file cannot be loaded.
OpenAPIParseError: If the file cannot be parsed.
"""
path = Path(spec_path)
if not path.exists():
raise OpenAPILoadError(spec_path, message=f"File not found: {spec_path}")
try:
content = path.read_text(encoding="utf-8")
except Exception as e:
raise OpenAPILoadError(spec_path, e) from e
try:
if path.suffix.lower() in (".yaml", ".yml"):
spec = yaml.safe_load(content)
else:
spec = json.loads(content)
if not isinstance(spec, dict):
raise OpenAPIParseError(spec_path, message="OpenAPI spec must be an object")
logger.info(
f"Loaded OpenAPI spec: {spec.get('info', {}).get('title', 'Unknown')}",
extra={"path_count": len(spec.get("paths", {}))},
)
return spec
except json.JSONDecodeError as e:
raise OpenAPIParseError(spec_path, e) from e
except yaml.YAMLError as e:
raise OpenAPIParseError(spec_path, e) from e
class ToolGenerator:
"""Generate MCP tools from OpenAPI specification.
This class parses an OpenAPI 3.0 specification and generates
MCP tool definitions for configured HTTP methods. Tools include enhanced
descriptions with schema information for better LLM understanding.
Attributes:
spec: The parsed OpenAPI specification.
tool_names: Dictionary tracking tool name usage for uniqueness.
allowed_methods: List of HTTP methods to generate tools for.
Example:
>>> generator = ToolGenerator(spec, allowed_methods=["GET"])
>>> tools = generator.generate_tools()
>>> for tool in tools:
... print(f"{tool['name']}: {tool['description'][:50]}...")
"""
# Default to GET only for safe read-only operations
DEFAULT_ALLOWED_METHODS = ["GET"]
def __init__(
self,
spec: dict[str, Any],
allowed_methods: list[str] | None = None,
) -> None:
"""Initialize tool generator with OpenAPI spec.
Args:
spec: Parsed OpenAPI specification dictionary.
allowed_methods: List of HTTP methods to generate tools for.
Defaults to ["GET"] for safe read-only operations.
Set to ["GET", "POST", "DELETE", "PATCH", "PUT"]
to enable all operations.
"""
self.spec = spec
self.tool_names: dict[str, int] = {}
self.allowed_methods = [
m.lower() for m in (allowed_methods or self.DEFAULT_ALLOWED_METHODS)
]
def generate_tools(self) -> list[dict[str, Any]]:
"""Generate MCP tools from OpenAPI spec for configured methods.
Returns:
List of MCP tool definitions.
Note:
Only methods specified in allowed_methods are included.
Default is GET only for safe read-only operations.
"""
tools: list[dict[str, Any]] = []
paths = self.spec.get("paths", {})
for path, path_item in paths.items():
# Generate tools for each allowed method
for method in self.allowed_methods:
if method in path_item:
operation = path_item[method]
tool = self._generate_tool_from_operation(path, method, operation)
if tool:
tools.append(tool)
logger.info(
"Generated MCP tools from OpenAPI spec",
extra={
"tool_count": len(tools),
"methods": [m.upper() for m in self.allowed_methods],
},
)
return tools
def _generate_tool_from_operation(
self,
path: str,
method: str,
operation: dict[str, Any],
) -> dict[str, Any] | None:
"""Generate a single MCP tool from an OpenAPI operation.
Args:
path: API endpoint path (e.g., "/api/json/alarm/listAlarms").
method: HTTP method (lowercase).
operation: OpenAPI operation object.
Returns:
MCP tool definition or None if generation fails.
"""
try:
# Generate tool name from operationId or path + method
tool_name = operation.get(
"operationId"
) or self._generate_tool_name_from_path(path, method)
# Make tool name unique if duplicate exists
tool_name = self._make_unique_name(tool_name, path)
# Generate base description
base_description = (
operation.get("summary")
or operation.get("description")
or f"{method.upper()} {path}"
)
# Get category from path
category = self._get_category_from_path(path)
# Build enhanced description
description = self._build_enhanced_description(
base_description, category, operation
)
# Generate input schema from parameters
input_schema = self._generate_input_schema(operation, path)
return {
"name": tool_name,
"description": description,
"inputSchema": input_schema,
"_path": path, # Store path for later use
"_method": method,
}
except Exception as e:
logger.warning(
f"Failed to generate tool for {method.upper()} {path}",
extra={"error": str(e), "path": path, "method": method},
)
return None
def _get_category_from_path(self, path: str) -> str:
"""Extract category from API path.
OpManager API paths follow pattern: /api/json/{category}/{operation}
Args:
path: API endpoint path.
Returns:
Category name (e.g., "alarm", "device", "monitor").
"""
parts = path.split("/")
# /api/json/{category}/{operation} -> category is at index 3
if len(parts) >= 4 and parts[1] == "api" and parts[2] == "json":
return parts[3]
return ""
def _build_enhanced_description(
self,
base_description: str,
category: str,
operation: dict[str, Any],
) -> str:
"""Build enhanced description with parameter info for LLM context.
Args:
base_description: Original operation description.
category: API category name.
operation: OpenAPI operation object.
Returns:
Enhanced description with parameter information.
"""
description = base_description
# Add category context
if category:
category_descriptions = {
"alarm": "Manage and query alarms/alerts in OpManager",
"device": "Manage and query network devices",
"monitor": "Configure and query performance monitors",
"discovery": "Discover new devices on the network",
"group": "Manage logical device groups",
"interface": "Query network interface information",
"availability": "Check device availability and uptime",
"notification": "Configure notification profiles",
"report": "Generate and retrieve reports",
"event": "Manage events and event logs",
"dashboard": "Access dashboard widgets and data",
"business": "Manage business views",
}
if category in category_descriptions:
description += f"\n\nCategory: {category_descriptions[category]}"
# Add parameter hints
parameters = operation.get("parameters", [])
if parameters:
param_hints = []
for param in parameters[:5]: # Show first 5 params
name = param.get("name", "")
required = param.get("required", False)
param_desc = param.get("description", "")[:60]
marker = "*" if required else ""
if param_desc:
param_hints.append(f"- {name}{marker}: {param_desc}")
else:
param_hints.append(f"- {name}{marker}")
if param_hints:
description += "\n\nKey parameters:\n" + "\n".join(param_hints)
return description
def _generate_input_schema(
self,
operation: dict[str, Any],
_path: str,
) -> dict[str, Any]:
"""Generate JSON Schema for tool input.
Args:
operation: OpenAPI operation object.
path: API path.
Returns:
JSON Schema for input validation.
"""
properties: dict[str, Any] = {}
required: list[str] = []
# Add credential properties (always required)
properties["host"] = {
"type": "string",
"description": "OpManager host address (e.g., 'opmanager.example.com')",
}
properties["apiKey"] = {
"type": "string",
"description": "OpManager API key for authentication",
}
required.extend(["host", "apiKey"])
# Add optional connection parameters
properties["port"] = {
"type": "integer",
"description": "OpManager port (default: 8060 for HTTP, 8061 for HTTPS)",
"default": 8060,
}
properties["use_ssl"] = {
"type": "boolean",
"description": "Use HTTPS instead of HTTP. Auto-detected from port if not specified (8061=HTTPS, 8060=HTTP)",
}
properties["verify_ssl"] = {
"type": "boolean",
"description": "Verify SSL certificates (default: false for self-signed certs)",
"default": False,
}
# Add parameters from OpenAPI spec
for param in operation.get("parameters", []):
param_name = param.get("name", "")
if not param_name or param_name.lower() == "apikey":
continue
param_schema = param.get("schema", {})
param_type = param_schema.get("type", "string")
param_description = param.get("description", "")
param_required = param.get("required", False)
prop: dict[str, Any] = {
"type": param_type,
"description": param_description,
}
# Add enum values if present
if "enum" in param_schema:
prop["enum"] = param_schema["enum"]
# Add default value if present
if "default" in param_schema:
prop["default"] = param_schema["default"]
properties[param_name] = prop
if param_required:
required.append(param_name)
return {
"type": "object",
"properties": properties,
"required": required,
}
def _make_unique_name(self, tool_name: str, path: str) -> str:
"""Make tool name unique by adding suffix if needed.
Args:
tool_name: Original tool name.
path: API path for generating suffix.
Returns:
Unique tool name.
"""
if tool_name in self.tool_names:
count = self.tool_names[tool_name] + 1
self.tool_names[tool_name] = count
# Add path-based suffix to make it unique
path_parts = [p for p in path.split("/") if p and not p.startswith("{")]
path_suffix = (
"_".join(path_parts[-2:]) if len(path_parts) >= 2 else str(count)
)
tool_name = f"{tool_name}_{path_suffix}"
else:
self.tool_names[tool_name] = 0
return tool_name
def _generate_tool_name_from_path(self, path: str, method: str) -> str:
"""Generate tool name from path and method.
Args:
path: API endpoint path.
method: HTTP method.
Returns:
Generated tool name in camelCase.
"""
# /api/json/alarm/listAlarms -> alarm_listAlarms
parts = path.split("/")
meaningful_parts = [
p for p in parts if p and p not in ("api", "json") and not p.startswith("{")
]
if meaningful_parts:
# Join with underscores and prefix with method
name = "_".join(meaningful_parts)
return f"{method}_{name}"
return f"{method}_operation"
def get_path_for_tool(self, tool_name: str) -> str | None:
"""Get API path for a tool by name.
Args:
tool_name: Name of the tool.
Returns:
API path or None if not found.
"""
for path, path_item in self.spec.get("paths", {}).items():
for method in self.allowed_methods:
if method in path_item:
operation = path_item[method]
op_id = operation.get("operationId")
# Check for exact match
if op_id == tool_name:
return path
# Check for prefix match (e.g., listAlarms_alarm)
if op_id and tool_name.startswith(f"{op_id}_"):
return path
return None
def get_method_for_tool(self, tool_name: str) -> str:
"""Get HTTP method for a tool by name.
Args:
tool_name: Name of the tool.
Returns:
HTTP method (defaults to GET).
"""
for _path, path_item in self.spec.get("paths", {}).items():
for method in self.allowed_methods:
if method in path_item:
operation = path_item[method]
op_id = operation.get("operationId")
if op_id == tool_name or (
op_id and tool_name.startswith(f"{op_id}_")
):
return method.upper()
return "GET"