Skip to main content
Glama

NetBox Read/Write MCP Server

server.py37.1 kB
#!/usr/bin/env python3 """ NetBox MCP Server A Model Context Protocol server for safe read/write access to NetBox instances. Provides tools for querying and managing NetBox data with comprehensive safety controls. Version: 0.9.7 - Hierarchical Architecture with Registry Bridge """ from mcp.server.fastmcp import FastMCP from fastapi import FastAPI, Depends, HTTPException from pydantic import BaseModel from .client import NetBoxClient from .config import load_config from .registry import ( TOOL_REGISTRY, PROMPT_REGISTRY, load_tools, load_prompts, serialize_registry_for_api, serialize_prompts_for_api, execute_tool, execute_prompt ) from .dependencies import NetBoxClientManager, get_netbox_client # Use new dependency system from .monitoring import get_performance_monitor, MetricsCollector, HealthCheck, MetricsDashboard from .openapi_generator import OpenAPIGenerator, generate_api_documentation from .debug_monitor import get_monitor, log_startup, log_protocol_message, log_connection_event, log_error, log_performance from ._version import get_cached_version import logging import os import threading import time import signal import sys import inspect from functools import wraps from http.server import BaseHTTPRequestHandler, HTTPServer import json from typing import Dict, List, Optional, Any # Configure logging (will be updated from config) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 🧠 ULTRATHINK DEBUG: Initialize monitoring log_startup("Debug monitor initialized - starting server diagnostics") # === REGISTRY BRIDGE IMPLEMENTATION === # Step 1: Load all tools and prompts into our internal registries log_startup("Loading tools and prompts into registry") load_tools() load_prompts() logger.info(f"Internal tool registry initialized with {len(TOOL_REGISTRY)} tools") logger.info(f"Internal prompt registry initialized with {len(PROMPT_REGISTRY)} prompts") log_startup(f"Registry loaded: {len(TOOL_REGISTRY)} tools, {len(PROMPT_REGISTRY)} prompts") # Step 2: Initialize FastMCP server (empty at first) mcp = FastMCP( "NetBox Model-Context Protocol", description="A powerful, tool-based interface to manage and orchestrate a NetBox instance." ) # 🧠 ULTRATHINK DEBUG: Add MCP protocol message interceptor def add_mcp_protocol_logging(): """Add protocol-level logging to FastMCP to monitor Claude Desktop communication.""" original_handle_request = None try: # Try to find and wrap the request handler if hasattr(mcp, '_server') and hasattr(mcp._server, 'request_handlers'): handlers = mcp._server.request_handlers log_startup("Found FastMCP request handlers - adding protocol logging") # Wrap each handler to log messages for method, handler in handlers.items(): original_handler = handler def create_logged_handler(method_name, orig_handler): async def logged_handler(request): log_protocol_message("RECEIVED", { "method": method_name, "params": getattr(request, 'params', None), "id": getattr(request, 'id', None) }) result = await orig_handler(request) log_protocol_message("SENT", { "method": method_name, "result": "response_sent", "id": getattr(request, 'id', None) }) return result return logged_handler handlers[method] = create_logged_handler(method, original_handler) else: log_startup("FastMCP request handlers not found - protocol logging unavailable") except Exception as e: log_error(f"Failed to add MCP protocol logging: {e}", e) # Add protocol logging after FastMCP initialization log_startup("Adding MCP protocol logging interceptor") add_mcp_protocol_logging() # Step 3: The Registry Bridge function def bridge_tools_to_fastmcp(): """ Dynamically registers all tools from our internal TOOL_REGISTRY with the FastMCP instance, creating wrappers for dependency injection. """ bridged_count = 0 for tool_name, tool_metadata in TOOL_REGISTRY.items(): try: original_func = tool_metadata["function"] description = tool_metadata.get("description", f"Executes the {tool_name} tool.") category = tool_metadata.get("category", "General") # Create a 'wrapper' that injects the client with EXACT function signature (Gemini's Fix) def create_tool_wrapper(original_func): """ Creates a tool wrapper that mimics the exact signature of the original function, while automatically injecting the NetBox client and preventing argument duplicates. """ sig = inspect.signature(original_func) wrapper_params = [p for p in sig.parameters.values() if p.name != 'client'] @wraps(original_func) def tool_wrapper(*args, **kwargs): # 🧠 ULTRATHINK DEBUG: Log tool calls from Claude Desktop from .debug_monitor import log_tool_call log_tool_call(tool_name, kwargs) # Get performance monitor for timing monitor = get_performance_monitor() with monitor.time_operation(tool_name, kwargs): try: # ----- SAFE ARGUMENT HANDLING ----- # 1. Create a list of expected parameter names (excluding 'client') param_names = [p.name for p in wrapper_params] # 2. Create a dictionary from positional arguments (*args) final_kwargs = dict(zip(param_names, args)) # 3. Update with keyword arguments (**kwargs). # This overwrites any duplicates and is the core of the fix. final_kwargs.update(kwargs) # ---------------------------------------- client = get_netbox_client() # Call the original function with clean, deduplicated arguments. return original_func(client, **final_kwargs) except Exception as e: logger.error(f"Execution of tool '{tool_name}' failed: {e}", exc_info=True) return {"success": False, "error": str(e), "error_type": type(e).__name__} new_sig = sig.replace(parameters=wrapper_params) # Use setattr to avoid type checker issues with __signature__ setattr(tool_wrapper, '__signature__', new_sig) return tool_wrapper # Register the 'wrapper' with FastMCP with the correct metadata wrapped_tool = create_tool_wrapper(original_func) mcp.tool(name=tool_name, description=description)(wrapped_tool) bridged_count += 1 logger.debug(f"Bridged tool: {tool_name} (category: {category})") except Exception as e: logger.error(f"Failed to bridge tool '{tool_name}' to FastMCP: {e}", exc_info=True) logger.info(f"Successfully bridged {bridged_count}/{len(TOOL_REGISTRY)} tools to the FastMCP interface") log_startup(f"Tools bridged to FastMCP: {bridged_count}/{len(TOOL_REGISTRY)} successful") # Step 4: Execute the bridge function at server startup log_startup("Bridging tools to FastMCP interface") bridge_tools_to_fastmcp() # Step 5: Bridge prompts to FastMCP def bridge_prompts_to_fastmcp(): """ Bridge internal prompt registry to FastMCP interface. This function creates FastMCP-compatible prompt handlers for each prompt in our internal PROMPT_REGISTRY and registers them with the FastMCP server. """ bridged_count = 0 for prompt_name, prompt_metadata in PROMPT_REGISTRY.items(): try: original_func = prompt_metadata["function"] description = prompt_metadata["description"] logger.debug(f"Bridging prompt: {prompt_name}") def create_prompt_wrapper(func, name): """Create a wrapper function that FastMCP can call""" async def prompt_wrapper(**kwargs): try: logger.debug(f"Executing prompt '{name}' with args: {kwargs}") # Execute the prompt function if inspect.iscoroutinefunction(func): result = await func(**kwargs) else: result = func(**kwargs) logger.debug(f"Prompt '{name}' executed successfully") return result except Exception as e: logger.error(f"Execution of prompt '{name}' failed: {e}", exc_info=True) return {"success": False, "error": str(e), "error_type": type(e).__name__} return prompt_wrapper # Register the wrapper with FastMCP wrapped_prompt = create_prompt_wrapper(original_func, prompt_name) mcp.prompt(name=prompt_name, description=description)(wrapped_prompt) bridged_count += 1 logger.debug(f"Bridged prompt: {prompt_name}") except Exception as e: logger.error(f"Failed to bridge prompt '{prompt_name}' to FastMCP: {e}", exc_info=True) logger.info(f"Successfully bridged {bridged_count}/{len(PROMPT_REGISTRY)} prompts to the FastMCP interface") log_startup(f"Prompts bridged to FastMCP: {bridged_count}/{len(PROMPT_REGISTRY)} successful") # Step 6: Execute the prompt bridge function at server startup log_startup("Bridging prompts to FastMCP interface") bridge_prompts_to_fastmcp() # === FASTAPI SELF-DESCRIBING ENDPOINTS === # Initialize FastAPI server for self-describing endpoints api_app = FastAPI( title="NetBox MCP API", description="Self-describing REST API for NetBox Management & Control Plane", version="0.9.7" ) # Pydantic models for API requests class ExecutionRequest(BaseModel): tool_name: str parameters: Dict[str, Any] = {} class ToolFilter(BaseModel): category: Optional[str] = None name_pattern: Optional[str] = None @api_app.get("/api/v1/tools", response_model=List[Dict[str, Any]]) async def get_tools( category: Optional[str] = None, name_pattern: Optional[str] = None ) -> List[Dict[str, Any]]: """ Discovery endpoint: List all available MCP tools. Query Parameters: category: Filter tools by category (system, ipam, dcim, etc.) name_pattern: Filter tools by name pattern (partial match) Returns: List of tool metadata with parameters, descriptions, and categories """ try: tools = serialize_registry_for_api() # Apply filters if category: tools = [tool for tool in tools if tool.get("category") == category] if name_pattern: tools = [tool for tool in tools if name_pattern.lower() in tool.get("name", "").lower()] logger.info(f"Tools discovery request: {len(tools)} tools returned (category={category}, pattern={name_pattern})") return tools except Exception as e: logger.error(f"Error in tools discovery: {e}") raise HTTPException(status_code=500, detail=f"Internal error: {str(e)}") @api_app.post("/api/v1/execute") async def execute_mcp_tool( request: ExecutionRequest, client: NetBoxClient = Depends(get_netbox_client) ) -> Dict[str, Any]: """ Generic execution endpoint: Execute any registered MCP tool. Request Body: tool_name: Name of the tool to execute parameters: Dictionary of tool parameters Returns: Tool execution result """ try: logger.info(f"Executing tool: {request.tool_name} with parameters: {request.parameters}") # Execute tool with dependency injection result = execute_tool(request.tool_name, client, **request.parameters) return { "success": True, "tool_name": request.tool_name, "result": result } except ValueError as e: # Tool not found logger.warning(f"Tool not found: {request.tool_name}") raise HTTPException(status_code=404, detail=str(e)) except Exception as e: logger.error(f"Tool execution failed for {request.tool_name}: {e}") raise HTTPException(status_code=500, detail=f"Tool execution failed: {str(e)}") # === PROMPT ENDPOINTS === class PromptRequest(BaseModel): prompt_name: str arguments: Dict[str, Any] = {} @api_app.get("/api/v1/prompts", response_model=List[Dict[str, Any]]) async def get_prompts() -> List[Dict[str, Any]]: """ Discovery endpoint: List all available MCP prompts. Returns: List of prompt metadata with descriptions and usage information """ try: prompts = serialize_prompts_for_api() logger.info(f"Prompts discovery request: {len(prompts)} prompts returned") return prompts except Exception as e: logger.error(f"Error in prompts discovery: {e}") raise HTTPException(status_code=500, detail=f"Internal error: {str(e)}") @api_app.post("/api/v1/prompts/execute") async def execute_mcp_prompt(request: PromptRequest) -> Dict[str, Any]: """ Generic prompt execution endpoint: Execute any registered MCP prompt. Request Body: prompt_name: Name of the prompt to execute arguments: Dictionary of prompt arguments (optional) Returns: Prompt execution result """ try: logger.info(f"Executing prompt: {request.prompt_name} with arguments: {request.arguments}") # Execute prompt result = await execute_prompt(request.prompt_name, **request.arguments) return { "success": True, "prompt_name": request.prompt_name, "result": result } except ValueError as e: # Prompt not found logger.warning(f"Prompt not found: {request.prompt_name}") raise HTTPException(status_code=404, detail=str(e)) except Exception as e: logger.error(f"Prompt execution failed for {request.prompt_name}: {e}") raise HTTPException(status_code=500, detail=f"Prompt execution failed: {str(e)}") # === MONITORING ENDPOINTS === # Initialize monitoring components performance_monitor = get_performance_monitor() metrics_collector = MetricsCollector(performance_monitor) health_check = HealthCheck(performance_monitor) metrics_dashboard = MetricsDashboard(metrics_collector) @api_app.get("/api/v1/metrics") async def get_performance_metrics() -> Dict[str, Any]: """ Get performance metrics and dashboard data. Returns: Complete performance metrics including operations, cache, and system stats """ try: dashboard_data = metrics_dashboard.get_dashboard_data() return dashboard_data except Exception as e: logger.error(f"Error getting performance metrics: {e}") raise HTTPException(status_code=500, detail=f"Metrics error: {str(e)}") @api_app.get("/api/v1/health/detailed") async def get_detailed_health() -> Dict[str, Any]: """ Get detailed health status including performance metrics. Returns: Comprehensive health status with all checks and metrics """ try: # Set NetBox client for health check health_check.netbox_client = get_netbox_client() # Get health status health_status = health_check.get_health_status() # Add active alerts alerts = metrics_dashboard.get_active_alerts() health_status["active_alerts"] = alerts return health_status except Exception as e: logger.error(f"Error getting detailed health: {e}") raise HTTPException(status_code=500, detail=f"Health check error: {str(e)}") @api_app.get("/api/v1/metrics/operations/{operation_name}") async def get_operation_metrics(operation_name: str) -> Dict[str, Any]: """ Get metrics for a specific operation. Args: operation_name: Name of the operation to get metrics for Returns: Operation-specific metrics and statistics """ try: # Get operation statistics stats = performance_monitor.get_operation_statistics(operation_name) if not stats or stats.get("total_operations", 0) == 0: raise HTTPException(status_code=404, detail=f"No metrics found for operation '{operation_name}'") # Get operation history history = performance_monitor.get_operation_history(operation_name) recent_history = history[-10:] # Last 10 executions return { "operation_name": operation_name, "statistics": stats, "recent_history": [ { "timestamp": metric.timestamp.isoformat(), "duration": metric.duration, "success": metric.success, "error_details": metric.error_details } for metric in recent_history ] } except HTTPException: raise except Exception as e: logger.error(f"Error getting operation metrics for {operation_name}: {e}") raise HTTPException(status_code=500, detail=f"Operation metrics error: {str(e)}") @api_app.get("/api/v1/metrics/export") async def export_metrics(format: str = "json") -> Dict[str, Any]: """ Export all metrics data. Args: format: Export format (json or csv) Returns: Exported metrics data """ try: if format.lower() == "csv": csv_data = metrics_dashboard.export_data(format="csv") return { "format": "csv", "data": csv_data, "content_type": "text/csv" } else: json_data = metrics_dashboard.export_data(format="json") return { "format": "json", "data": json_data, "content_type": "application/json" } except Exception as e: logger.error(f"Error exporting metrics: {e}") raise HTTPException(status_code=500, detail=f"Metrics export error: {str(e)}") # === API DOCUMENTATION ENDPOINTS === @api_app.get("/api/v1/openapi.json") async def get_openapi_spec() -> Dict[str, Any]: """ Get OpenAPI 3.0 specification for all NetBox MCP tools. Returns: OpenAPI specification as JSON """ try: from .openapi_generator import OpenAPIConfig config = OpenAPIConfig( title="NetBox MCP Server API", description="Production-ready Model Context Protocol server for NetBox automation with 142+ enterprise-grade tools", version=get_cached_version(), server_url="http://localhost:8000" ) generator = OpenAPIGenerator(config) spec = generator.generate_spec() return spec except Exception as e: logger.error(f"Error generating OpenAPI spec: {e}") raise HTTPException(status_code=500, detail=f"OpenAPI generation error: {str(e)}") @api_app.get("/api/v1/openapi.yaml") async def get_openapi_spec_yaml() -> str: """ Get OpenAPI 3.0 specification as YAML. Returns: OpenAPI specification as YAML string """ try: from .openapi_generator import OpenAPIConfig import yaml config = OpenAPIConfig( title="NetBox MCP Server API", description="Production-ready Model Context Protocol server for NetBox automation with 142+ enterprise-grade tools", version=get_cached_version(), server_url="http://localhost:8000" ) generator = OpenAPIGenerator(config) spec = generator.generate_spec() yaml_content = yaml.dump(spec, default_flow_style=False, sort_keys=False) # Return as plain text with correct content type from fastapi import Response return Response(content=yaml_content, media_type="application/x-yaml") except Exception as e: logger.error(f"Error generating OpenAPI YAML: {e}") raise HTTPException(status_code=500, detail=f"OpenAPI YAML generation error: {str(e)}") @api_app.get("/api/v1/postman") async def get_postman_collection() -> Dict[str, Any]: """ Get Postman collection for all NetBox MCP tools. Returns: Postman collection JSON """ try: from .openapi_generator import OpenAPIConfig config = OpenAPIConfig( title="NetBox MCP Server API", version=get_cached_version(), server_url="http://localhost:8000" ) generator = OpenAPIGenerator(config) collection = generator.generate_postman_collection() return collection except Exception as e: logger.error(f"Error generating Postman collection: {e}") raise HTTPException(status_code=500, detail=f"Postman collection error: {str(e)}") # === CONTEXT MANAGEMENT ENDPOINTS === @api_app.get("/api/v1/context/status") async def get_context_status( _client: NetBoxClient = Depends(get_netbox_client) ) -> Dict[str, Any]: """ Get current auto-context status and configuration. Returns: Context status including environment detection and safety level """ # Client parameter required by FastAPI dependency injection but not used in this endpoint _ = _client # Suppress unused parameter warning try: from .persona import get_context_manager context_manager = get_context_manager() context_state = context_manager.get_context_state() if context_state: return { "context_initialized": True, "environment": context_state.environment, "safety_level": context_state.safety_level, "instance_type": context_state.instance_type, "initialization_time": context_state.initialization_time.isoformat(), "netbox_url": context_state.netbox_url, "netbox_version": context_state.netbox_version, "auto_context_enabled": context_state.auto_context_enabled, "user_preferences": context_state.user_preferences } else: return { "context_initialized": False, "auto_context_enabled": os.getenv('NETBOX_AUTO_CONTEXT', 'true').lower() == 'true', "environment_override": os.getenv('NETBOX_ENVIRONMENT'), "safety_level_override": os.getenv('NETBOX_SAFETY_LEVEL') } except Exception as e: logger.error(f"Error getting context status: {e}") raise HTTPException(status_code=500, detail=f"Context status error: {str(e)}") @api_app.post("/api/v1/context/initialize") async def initialize_context( client: NetBoxClient = Depends(get_netbox_client) ) -> Dict[str, Any]: """ Manually initialize Bridget auto-context system. Returns: Context initialization result """ try: from .persona import get_context_manager context_manager = get_context_manager() # Reset context if already initialized if context_manager.is_context_initialized(): context_manager.reset_context() # Initialize context context_state = context_manager.initialize_context(client) context_message = context_manager.generate_context_message(context_state) return { "success": True, "message": "Context initialized successfully", "context": { "environment": context_state.environment, "safety_level": context_state.safety_level, "instance_type": context_state.instance_type, "initialization_time": context_state.initialization_time.isoformat() }, "bridget_message": context_message } except Exception as e: logger.error(f"Error initializing context: {e}") raise HTTPException(status_code=500, detail=f"Context initialization failed: {str(e)}") @api_app.post("/api/v1/context/reset") async def reset_context() -> Dict[str, Any]: """ Reset the auto-context system state. Returns: Reset operation result """ try: from .registry import reset_context_state reset_context_state() return { "success": True, "message": "Context state reset successfully" } except Exception as e: logger.error(f"Error resetting context: {e}") raise HTTPException(status_code=500, detail=f"Context reset failed: {str(e)}") @api_app.get("/api/v1/status") async def get_system_status( client: NetBoxClient = Depends(get_netbox_client) ) -> Dict[str, Any]: """ Health/Status endpoint: Get MCP system status and NetBox connectivity. Returns: System status including NetBox connection, tool registry stats, and performance metrics """ try: # Get NetBox health status netbox_status = client.health_check() # Get tool registry statistics from .registry import get_registry_stats registry_stats = get_registry_stats() # Get client status from .dependencies import get_client_status client_status = get_client_status() return { "service": "NetBox MCP", "version": "0.9.7", "status": "healthy" if netbox_status.connected else "degraded", "netbox": { "connected": netbox_status.connected, "version": netbox_status.version, "python_version": netbox_status.python_version, "django_version": netbox_status.django_version, "response_time_ms": netbox_status.response_time_ms, "plugins": netbox_status.plugins }, "tool_registry": registry_stats, "client": client_status, "cache_stats": netbox_status.cache_stats if hasattr(netbox_status, 'cache_stats') else None } except Exception as e: logger.error(f"Status check failed: {e}") return { "service": "NetBox MCP", "version": "0.9.7", "status": "error", "error": str(e), "error_type": type(e).__name__ } # === HTTP HEALTH CHECK SERVER === class HealthCheckHandler(BaseHTTPRequestHandler): """HTTP handler for health check endpoints.""" def do_GET(self): """Handle GET requests for health check endpoints.""" try: if self.path in ['/health', '/healthz']: # Basic liveness check self.send_response(200) self.send_header('Content-Type', 'application/json') self.end_headers() response = { "status": "OK", "service": "netbox-mcp", "version": "0.9.7" } self.wfile.write(json.dumps(response).encode()) elif self.path == '/readyz': # Readiness check - test NetBox connection try: status = NetBoxClientManager.get_client().health_check() if status.connected: self.send_response(200) response = { "status": "OK", "netbox_connected": True, "netbox_version": status.version, "response_time_ms": status.response_time_ms } else: self.send_response(503) response = { "status": "Service Unavailable", "netbox_connected": False, "error": status.error } except Exception as e: self.send_response(503) response = { "status": "Service Unavailable", "netbox_connected": False, "error": str(e) } self.send_header('Content-Type', 'application/json') self.end_headers() self.wfile.write(json.dumps(response).encode()) else: self.send_response(404) self.send_header('Content-Type', 'application/json') self.end_headers() response = {"error": "Not Found"} self.wfile.write(json.dumps(response).encode()) except Exception as e: logger.error(f"Health check handler error: {e}") self.send_response(500) self.send_header('Content-Type', 'application/json') self.end_headers() response = {"error": "Internal Server Error", "details": str(e)} self.wfile.write(json.dumps(response).encode()) def log_message(self, format, *args): """Override to use our logger.""" logger.debug(f"Health check: {format % args}") def start_health_server(port: int): """Start the HTTP health check server in a separate thread.""" def run_server(): try: server = HTTPServer(('0.0.0.0', port), HealthCheckHandler) logger.info(f"Health check server started on port {port}") logger.info(f"Health endpoints: /health, /healthz (liveness), /readyz (readiness)") server.serve_forever() except Exception as e: logger.error(f"Health check server failed: {e}") health_thread = threading.Thread(target=run_server, daemon=True) health_thread.start() def initialize_server(): """Initialize the NetBox MCP server with configuration and client.""" try: # Load configuration config = load_config() logger.info(f"Configuration loaded successfully") # Update logging level logging.getLogger().setLevel(getattr(logging, config.log_level.upper())) logger.info(f"Log level set to {config.log_level}") # Log safety configuration if config.safety.dry_run_mode: logger.warning("🚨 NetBox MCP running in DRY-RUN mode - no actual writes will be performed") if not config.safety.enable_write_operations: logger.info("🔒 Write operations are DISABLED - server is read-only") # Initialize NetBox client using Gemini's singleton pattern NetBoxClientManager.initialize(config) logger.info("NetBox client initialized successfully via singleton manager") # Test connection (graceful degradation if NetBox is unavailable) client = NetBoxClientManager.get_client() try: status = client.health_check() if status.connected: logger.info(f"✅ Connected to NetBox {status.version} (response time: {status.response_time_ms:.1f}ms)") else: logger.warning(f"⚠️ NetBox connection degraded: {status.error}") except Exception as e: logger.warning(f"⚠️ NetBox connection failed during startup, running in degraded mode: {e}") # Continue startup - health server should still start for liveness probes # Async task system removed - using synchronous operations only logger.info("NetBox MCP server using synchronous operations") # Start health check server if enabled if config.enable_health_server: start_health_server(config.health_check_port) logger.info("NetBox MCP server initialization complete") except Exception as e: logger.error(f"Failed to initialize NetBox MCP server: {e}") raise def main(): """Main entry point for the NetBox MCP server.""" # Create shutdown event for graceful termination shutdown_event = threading.Event() # Signal handler for graceful shutdown def signal_handler(signum, frame): logger.info(f"Received signal {signum}. Initiating graceful shutdown...") shutdown_event.set() # Register signal handlers for graceful shutdown signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # Termination signal if hasattr(signal, 'SIGHUP'): signal.signal(signal.SIGHUP, signal_handler) # Hangup signal (Unix) try: log_startup("🚀 NetBox MCP Server MAIN() - Starting initialization") # Initialize server log_startup("Calling initialize_server()") initialize_server() log_startup("Server initialization completed successfully") # Define the MCP server task to run in a thread def run_mcp_server(): try: log_startup("🔌 Starting MCP server thread with stdio transport") logger.info("Starting NetBox MCP server on stdio transport...") log_connection_event("MCP_THREAD_START", "stdio transport") # 🛡️ ULTRATHINK FIX: Wrap in TaskGroup error protection try: mcp.run(transport="stdio") except Exception as e: # Handle TaskGroup and other MCP exceptions gracefully if "TaskGroup" in str(e) or "unhandled errors" in str(e): log_error(f"TaskGroup error captured and handled: {e}", e) logger.warning(f"TaskGroup error handled gracefully - server continues: {e}") # Don't crash on TaskGroup errors - let server restart naturally import time time.sleep(2) # Brief pause then exit gracefully for restart else: # Re-raise other exceptions raise log_connection_event("MCP_THREAD_EXIT", "stdio transport finished") except Exception as e: log_error(f"MCP server thread error: {e}", e) logger.error(f"MCP server thread encountered an error: {e}", exc_info=True) shutdown_event.set() # Trigger shutdown on MCP server error # Start the MCP server in a daemon thread log_startup("Creating MCP server daemon thread") mcp_thread = threading.Thread(target=run_mcp_server) mcp_thread.daemon = True mcp_thread.start() log_startup("MCP daemon thread started successfully") # Log startup information logger.info("NetBox MCP server is ready and listening") logger.info("Health endpoints: /health, /healthz (liveness), /readyz (readiness)") logger.info("Press Ctrl+C or send SIGTERM to gracefully shutdown") log_startup("🎯 SERVER READY - Waiting for Claude Desktop connections") log_connection_event("SERVER_LISTENING", "Ready for MCP protocol connections") # Wait for shutdown signal try: # Check every second if we should shutdown while not shutdown_event.is_set(): # Also check if MCP thread is still alive if not mcp_thread.is_alive(): logger.warning("MCP server thread has stopped unexpectedly") break shutdown_event.wait(timeout=1.0) except KeyboardInterrupt: logger.info("Received keyboard interrupt...") shutdown_event.set() # Graceful shutdown logger.info("Shutting down NetBox MCP server gracefully...") # Give threads time to cleanup (max 5 seconds) mcp_thread.join(timeout=5.0) if mcp_thread.is_alive(): logger.warning("MCP thread did not terminate within timeout") logger.info("NetBox MCP server shutdown complete") sys.exit(0) except Exception as e: logger.error(f"NetBox MCP server error: {e}") sys.exit(1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Deployment-Team/netbox-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server