Skip to main content
Glama

NetBox Read/Write MCP Server

openapi_generator.py38.1 kB
""" OpenAPI Specification Generator for NetBox MCP Tools. This module automatically generates comprehensive OpenAPI 3.0 specifications from the NetBox MCP tool registry, providing standardized API documentation for all 142+ tools. """ import json import yaml import logging import time from datetime import datetime from typing import Dict, Any, List, Optional, Union, get_type_hints, get_origin, get_args from dataclasses import dataclass, field import inspect import re from netbox_mcp.registry import TOOL_REGISTRY, list_tools from netbox_mcp.exceptions import NetBoxError from netbox_mcp._version import get_cached_version logger = logging.getLogger(__name__) @dataclass class OpenAPIConfig: """Configuration for OpenAPI spec generation.""" title: str = "NetBox MCP Server API" description: str = "Model Context Protocol server for NetBox automation with 142+ tools" version: str = field(default_factory=get_cached_version) server_url: str = "http://localhost:8000" contact_name: str = "Deployment Team" contact_email: str = "info@deployment-team.nl" license_name: str = "MIT" license_url: str = "https://opensource.org/licenses/MIT" include_examples: bool = True include_security: bool = True class TypeConverter: """Convert Python types to OpenAPI schema types.""" @staticmethod def python_type_to_openapi(python_type: Any, include_examples: bool = True) -> Dict[str, Any]: """ Convert Python type annotation to OpenAPI schema. Args: python_type: Python type annotation include_examples: Whether to include example values Returns: OpenAPI schema dictionary """ # Handle basic types if python_type == str: schema = {"type": "string"} if include_examples: schema["example"] = "example_string" elif python_type == int: schema = {"type": "integer", "format": "int32"} if include_examples: schema["example"] = 42 elif python_type == float: schema = {"type": "number", "format": "float"} if include_examples: schema["example"] = 3.14 elif python_type == bool: schema = {"type": "boolean"} if include_examples: schema["example"] = True # Handle Optional types elif get_origin(python_type) is Union: args = get_args(python_type) if len(args) == 2 and type(None) in args: # This is Optional[T] non_none_type = args[0] if args[1] is type(None) else args[1] schema = TypeConverter.python_type_to_openapi(non_none_type, include_examples) schema["nullable"] = True else: # Union of multiple types schema = { "oneOf": [ TypeConverter.python_type_to_openapi(arg, include_examples) for arg in args if arg is not type(None) ] } # Handle List types elif get_origin(python_type) is list: args = get_args(python_type) if args: item_schema = TypeConverter.python_type_to_openapi(args[0], include_examples) else: item_schema = {"type": "string"} schema = { "type": "array", "items": item_schema } if include_examples: schema["example"] = ["item1", "item2"] # Handle Dict types elif get_origin(python_type) is dict: args = get_args(python_type) if len(args) >= 2: value_schema = TypeConverter.python_type_to_openapi(args[1], include_examples) else: value_schema = {"type": "string"} schema = { "type": "object", "additionalProperties": value_schema } if include_examples: schema["example"] = {"key": "value"} # Handle specific NetBox types elif hasattr(python_type, '__name__'): type_name = python_type.__name__ if type_name == "NetBoxClient": # Skip client parameters in API docs return None elif "Dict" in str(python_type): schema = { "type": "object", "additionalProperties": True } if include_examples: schema["example"] = {"result": "success"} else: # Unknown type, treat as string schema = {"type": "string"} if include_examples: schema["example"] = f"<{type_name}>" else: # Fallback to string schema = {"type": "string"} if include_examples: schema["example"] = "unknown_type" return schema @staticmethod def extract_enum_values(param_description: str) -> Optional[List[str]]: """ Extract enum values from parameter description. Args: param_description: Parameter description text Returns: List of enum values if found, None otherwise """ # Look for patterns like "Valid options: active, planned, offline" enum_patterns = [ r"Valid options?:\s*([^.]+)", r"Choices?:\s*([^.]+)", r"Must be one of:\s*([^.]+)", r"\(e\.g\.?,?\s*([^)]+)\)", # More specific pattern for examples in parentheses r"Options:\s*([^.]+)", r"Available:\s*([^.]+)" ] for pattern in enum_patterns: match = re.search(pattern, param_description, re.IGNORECASE) if match: values_str = match.group(1) # Split by comma and clean up values = [v.strip().strip('"\'') for v in values_str.split(',')] # Filter out empty strings and common non-enum words values = [v for v in values if v and len(v) > 1 and v.lower() not in ['etc', 'and', 'or', 'optional', 'required', 'watts', 'in']] # Reasonable limit to avoid false positives if values and len(values) <= 20: return values return None class OpenAPIGenerator: """Generate OpenAPI specifications from NetBox MCP tools.""" def __init__(self, config: Optional[OpenAPIConfig] = None): """ Initialize OpenAPI generator. Args: config: OpenAPI configuration """ self.config = config or OpenAPIConfig() self._schemas = {} self._paths = {} self._cached_spec = None self._cache_timestamp = None self._cache_ttl = 300 # 5 minutes cache TTL logger.info(f"OpenAPI generator initialized for {self.config.title}") def generate_spec(self) -> Dict[str, Any]: """ Generate complete OpenAPI specification with caching. Returns: OpenAPI 3.0 specification dictionary """ current_time = time.time() # Check if we have a valid cached spec if (self._cached_spec is not None and self._cache_timestamp is not None and current_time - self._cache_timestamp < self._cache_ttl): logger.debug("Returning cached OpenAPI specification") return self._cached_spec logger.info("Generating OpenAPI specification...") # Get all tools from registry tools = list_tools() # Generate OpenAPI spec structure spec = { "openapi": "3.0.3", "info": self._generate_info(), "servers": self._generate_servers(), "paths": self._generate_paths(tools), "components": { "schemas": self._generate_schemas(tools), "securitySchemes": self._generate_security_schemes() if self.config.include_security else {} } } if self.config.include_security: spec["security"] = [{"bearerAuth": []}] # Cache the generated spec self._cached_spec = spec self._cache_timestamp = current_time logger.info(f"Generated and cached OpenAPI spec with {len(spec['paths'])} paths and {len(spec['components']['schemas'])} schemas") return spec def invalidate_cache(self): """Invalidate the cached OpenAPI specification.""" self._cached_spec = None self._cache_timestamp = None logger.debug("OpenAPI specification cache invalidated") def _parse_type_string(self, type_str: str) -> Any: """ Parse a type string into a Python type with enhanced robustness. Args: type_str: String representation of the type Returns: Corresponding Python type """ if not isinstance(type_str, str): return str # Clean the type string type_str = type_str.strip() # Handle basic types basic_types = { "str": str, "string": str, "int": int, "integer": int, "bool": bool, "boolean": bool, "float": float, "number": float, "dict": dict, "list": list, "List": list, "Dict": dict, } if type_str in basic_types: return basic_types[type_str] # Handle Optional types with improved parsing if type_str.startswith("Optional[") and type_str.endswith("]"): inner_type_str = type_str[9:-1] # Remove "Optional[" and "]" inner_type = self._parse_type_string(inner_type_str) return Optional[inner_type] # Handle Union types if type_str.startswith("Union[") and type_str.endswith("]"): # For simplicity, return the first type in Union inner_types_str = type_str[6:-1] # Remove "Union[" and "]" first_type = inner_types_str.split(",")[0].strip() return self._parse_type_string(first_type) # Handle List types if type_str.startswith("List[") and type_str.endswith("]"): inner_type_str = type_str[5:-1] # Remove "List[" and "]" inner_type = self._parse_type_string(inner_type_str) return List[inner_type] # Handle Dict types if type_str.startswith("Dict[") and type_str.endswith("]"): return dict # Handle complex types that we don't recognize logger.debug(f"Unrecognized type string: '{type_str}', defaulting to str") return str def _generate_info(self) -> Dict[str, Any]: """Generate OpenAPI info section.""" info = { "title": self.config.title, "description": self.config.description, "version": self.config.version, "contact": { "name": self.config.contact_name, "email": self.config.contact_email }, "license": { "name": self.config.license_name, "url": self.config.license_url } } return info def _generate_servers(self) -> List[Dict[str, Any]]: """Generate OpenAPI servers section.""" return [ { "url": self.config.server_url, "description": "NetBox MCP Server" } ] def _generate_security_schemes(self) -> Dict[str, Any]: """Generate OpenAPI security schemes.""" return { "bearerAuth": { "type": "http", "scheme": "bearer", "bearerFormat": "JWT", "description": "NetBox API token authentication" } } def _generate_paths(self, tools: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate OpenAPI paths from tools.""" paths = {} # Group tools by category for better organization tools_by_category = {} for tool in tools: category = tool.get("category", "general") if category not in tools_by_category: tools_by_category[category] = [] tools_by_category[category].append(tool) # Generate paths for each tool for category, category_tools in tools_by_category.items(): for tool in category_tools: path = f"/api/v1/tools/{tool['name']}" paths[path] = self._generate_path_item(tool, category) # Add utility endpoints paths.update(self._generate_utility_paths()) return paths def _generate_path_item(self, tool: Dict[str, Any], category: str) -> Dict[str, Any]: """Generate OpenAPI path item for a tool.""" tool_name = tool["name"] description = tool.get("description", "") parameters = tool.get("parameters", []) # Determine if this is a write operation is_write_operation = any(keyword in tool_name.lower() for keyword in ["create", "update", "delete", "provision", "assign"]) # Generate operation operation = { "summary": f"Execute {tool_name}", "description": description, "tags": [category.upper()], "operationId": tool_name, "requestBody": self._generate_request_body(parameters, tool_name), "responses": self._generate_responses(tool, is_write_operation), } # Add security for write operations if is_write_operation and self.config.include_security: operation["security"] = [{"bearerAuth": []}] return {"post": operation} def _generate_request_body(self, parameters: List[Dict[str, Any]], tool_name: str) -> Dict[str, Any]: """Generate request body schema for tool parameters.""" properties = {} required = [] for param in parameters: param_name = param["name"] # Skip client parameter if param_name == "client": continue param_type = param.get("type", "string") param_required = param.get("required", False) param_default = param.get("default") param_description = param.get("description", "") # Convert Python type to OpenAPI schema with robust parsing try: python_type = self._parse_type_string(param_type) schema = TypeConverter.python_type_to_openapi(python_type, self.config.include_examples) if schema is None: continue # Skip client parameters except Exception as e: # Enhanced error logging for debugging logger.warning(f"Failed to parse type '{param_type}' for parameter '{param_name}': {e}") # Fallback schema schema = {"type": "string"} # Add description if param_description: schema["description"] = param_description # Add default value if param_default is not None: schema["default"] = param_default # Check for enum values in description enum_values = TypeConverter.extract_enum_values(param_description) if enum_values: schema["enum"] = enum_values # Special handling for confirm parameter if param_name == "confirm": schema.update({ "type": "boolean", "default": False, "description": "Must be true to execute write operations (safety mechanism)" }) properties[param_name] = schema if param_required: required.append(param_name) schema = { "type": "object", "properties": properties } if required: schema["required"] = required # Add example request body if self.config.include_examples: example = {} for param_name, param_schema in properties.items(): if "example" in param_schema: example[param_name] = param_schema["example"] elif "default" in param_schema: example[param_name] = param_schema["default"] if example: schema["example"] = example return { "required": True, "content": { "application/json": { "schema": schema } } } def _generate_responses(self, tool: Dict[str, Any], is_write_operation: bool) -> Dict[str, Any]: """Generate response schemas for a tool.""" tool_name = tool["name"] # Success response schema success_schema = { "type": "object", "properties": { "success": { "type": "boolean", "example": True }, "message": { "type": "string", "example": f"Operation {tool_name} completed successfully" }, "data": { "type": "object", "additionalProperties": True, "description": "Operation result data" } }, "required": ["success"] } # Dry-run response for write operations if is_write_operation: dry_run_schema = { "type": "object", "properties": { "success": { "type": "boolean", "example": True }, "dry_run": { "type": "boolean", "example": True }, "message": { "type": "string", "example": f"DRY RUN: {tool_name} would be executed. Set confirm=True to execute." }, "would_create": { "type": "object", "additionalProperties": True, "description": "Preview of what would be created/modified" } }, "required": ["success", "dry_run"] } # Error response schema error_schema = { "type": "object", "properties": { "error": { "type": "string", "example": "NetBoxValidationError" }, "message": { "type": "string", "example": "Validation failed for required parameter" }, "details": { "type": "object", "additionalProperties": True, "description": "Additional error details" } }, "required": ["error", "message"] } responses = { "200": { "description": "Successful operation", "content": { "application/json": { "schema": success_schema } } }, "400": { "description": "Validation error", "content": { "application/json": { "schema": error_schema } } }, "404": { "description": "Resource not found", "content": { "application/json": { "schema": error_schema } } }, "500": { "description": "Internal server error", "content": { "application/json": { "schema": error_schema } } } } # Add dry-run response for write operations if is_write_operation: responses["202"] = { "description": "Dry-run mode (confirm=False)", "content": { "application/json": { "schema": dry_run_schema } } } return responses def _generate_utility_paths(self) -> Dict[str, Any]: """Generate utility endpoint paths.""" return { "/api/v1/health": { "get": { "summary": "Health check", "description": "Check server health status", "tags": ["System"], "operationId": "health_check", "responses": { "200": { "description": "Health status", "content": { "application/json": { "schema": { "type": "object", "properties": { "status": {"type": "string", "example": "healthy"}, "version": {"type": "string", "example": f"{get_cached_version()}"}, "uptime": {"type": "string", "example": "2h 30m"}, "checks": { "type": "object", "additionalProperties": True } } } } } } } } }, "/api/v1/tools": { "get": { "summary": "List all tools", "description": "Get list of all available NetBox MCP tools", "tags": ["Tools"], "operationId": "list_tools", "parameters": [ { "name": "category", "in": "query", "description": "Filter tools by category", "required": False, "schema": { "type": "string", "enum": ["dcim", "ipam", "tenancy", "virtualization", "system"] } } ], "responses": { "200": { "description": "List of tools", "content": { "application/json": { "schema": { "type": "array", "items": { "type": "object", "properties": { "name": {"type": "string"}, "description": {"type": "string"}, "category": {"type": "string"}, "parameters": {"type": "array"} } } } } } } } } }, "/api/v1/metrics": { "get": { "summary": "Get performance metrics", "description": "Get server performance metrics and statistics", "tags": ["Monitoring"], "operationId": "get_metrics", "responses": { "200": { "description": "Performance metrics", "content": { "application/json": { "schema": { "type": "object", "properties": { "success": { "type": "boolean", "example": True }, "message": { "type": "string", "example": "Performance metrics retrieved successfully" }, "data": { "type": "object", "properties": { "timestamp": {"type": "string", "example": "2025-07-05T10:30:00Z"}, "system_metrics": { "type": "object", "properties": { "cpu_usage": {"type": "number", "example": 15.2}, "memory_usage": {"type": "number", "example": 2048.5}, "active_connections": {"type": "integer", "example": 25} } }, "operation_metrics": { "type": "array", "items": { "type": "object", "properties": { "operation_name": {"type": "string"}, "total_executions": {"type": "integer"}, "success_rate": {"type": "number"}, "average_duration": {"type": "number"} } } }, "cache_metrics": { "type": "object", "properties": { "hit_ratio": {"type": "number", "example": 0.85}, "cache_size_mb": {"type": "number", "example": 128.5}, "total_requests": {"type": "integer", "example": 1500} } } } } }, "required": ["success", "data"] } } } } } } } } def _generate_schemas(self, tools: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate reusable component schemas.""" schemas = { "NetBoxError": { "type": "object", "properties": { "error": { "type": "string", "description": "Error type" }, "message": { "type": "string", "description": "Error message" }, "details": { "type": "object", "additionalProperties": True, "description": "Additional error details" } }, "required": ["error", "message"] }, "SuccessResponse": { "type": "object", "properties": { "success": { "type": "boolean", "description": "Operation success status" }, "message": { "type": "string", "description": "Success message" }, "data": { "type": "object", "additionalProperties": True, "description": "Operation result data" } }, "required": ["success"] }, "DryRunResponse": { "type": "object", "properties": { "success": { "type": "boolean", "description": "Operation success status" }, "dry_run": { "type": "boolean", "description": "Indicates this was a dry run" }, "message": { "type": "string", "description": "Dry run message" }, "would_create": { "type": "object", "additionalProperties": True, "description": "Preview of what would be created" } }, "required": ["success", "dry_run"] } } return schemas def export_spec(self, format: str = "json", output_file: Optional[str] = None) -> str: """ Export OpenAPI specification to file or string. Args: format: Export format ("json" or "yaml") output_file: Optional output file path Returns: Specification as string """ spec = self.generate_spec() if format.lower() == "yaml": content = yaml.dump(spec, default_flow_style=False, sort_keys=False) else: content = json.dumps(spec, indent=2) if output_file: with open(output_file, 'w') as f: f.write(content) logger.info(f"OpenAPI spec exported to {output_file}") return content def generate_postman_collection(self) -> Dict[str, Any]: """ Generate Postman collection from OpenAPI spec. Returns: Postman collection dictionary """ spec = self.generate_spec() collection = { "info": { "name": spec["info"]["title"], "description": spec["info"]["description"], "version": spec["info"]["version"], "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json" }, "auth": { "type": "bearer", "bearer": [ { "key": "token", "value": "{{netbox_token}}", "type": "string" } ] }, "variable": [ { "key": "base_url", "value": self.config.server_url, "type": "string" }, { "key": "netbox_token", "value": "your_netbox_token_here", "type": "string" } ], "item": [] } # Group requests by category categories = {} for path, path_item in spec["paths"].items(): for method, operation in path_item.items(): category = operation.get("tags", ["General"])[0] if category not in categories: categories[category] = { "name": category, "item": [] } # Create Postman request request = { "name": operation["summary"], "request": { "method": method.upper(), "header": [ { "key": "Content-Type", "value": "application/json", "type": "text" } ], "url": { "raw": f"{{{{base_url}}}}{path}", "host": ["{{base_url}}"], "path": path.strip("/").split("/") }, "description": operation.get("description", "") } } # Add request body for POST requests if method.lower() == "post" and "requestBody" in operation: request_body = operation["requestBody"] schema = request_body["content"]["application/json"]["schema"] if "example" in schema: request["request"]["body"] = { "mode": "raw", "raw": json.dumps(schema["example"], indent=2) } categories[category]["item"].append(request) # Add categories to collection collection["item"] = list(categories.values()) return collection def generate_api_documentation( output_dir: str = "docs/api", formats: List[str] = ["json", "yaml"], include_postman: bool = True, config: Optional[OpenAPIConfig] = None ) -> Dict[str, str]: """ Generate complete API documentation. Args: output_dir: Output directory for generated files formats: List of formats to generate ("json", "yaml") include_postman: Whether to generate Postman collection config: OpenAPI configuration Returns: Dictionary mapping format to output file path """ import os # Create output directory os.makedirs(output_dir, exist_ok=True) generator = OpenAPIGenerator(config) generated_files = {} # Generate OpenAPI specs for format in formats: filename = f"netbox-mcp-api.{format}" filepath = os.path.join(output_dir, filename) generator.export_spec(format=format, output_file=filepath) generated_files[format] = filepath logger.info(f"Generated {format.upper()} specification: {filepath}") # Generate Postman collection if include_postman: postman_collection = generator.generate_postman_collection() postman_file = os.path.join(output_dir, "NetBox-MCP.postman_collection.json") with open(postman_file, 'w') as f: json.dump(postman_collection, f, indent=2) generated_files["postman"] = postman_file logger.info(f"Generated Postman collection: {postman_file}") logger.info(f"API documentation generated in {output_dir}") return generated_files if __name__ == "__main__": # Generate API documentation when run directly logging.basicConfig(level=logging.INFO) config = OpenAPIConfig( title="NetBox MCP Server API", description="Production-ready Model Context Protocol server for NetBox automation with 142+ enterprise-grade tools covering DCIM, IPAM, Virtualization, and Tenancy management.", version=get_cached_version() ) files = generate_api_documentation(config=config) print("Generated API documentation:") for format, filepath in files.items(): print(f" {format.upper()}: {filepath}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Deployment-Team/netbox-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server