Skip to main content
Glama

NetBox MCP Server

by fringemonkey
structured_mcp_server.py13.9 kB
"""Structured MCP server optimized for full-stack control. This server is designed to work with a controlled gateway/router system where we have full control over the entire data flow from user to LLM to MCP. """ import asyncio import json import logging import os import sys import time from typing import Any, Dict, List, Optional from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent from .vault_client import VaultClient from .netbox_client import NetBoxClient from .state_confidence import StateConfidenceClient from .structured_protocol import ( StructuredProtocol, ProtocolOptimizer, MCPRequest, MCPResponse, ToolCategory, ModelLocation ) from .tools import hosts, virtual_machines, ip_addresses, vlans logger = logging.getLogger(__name__) # Global variables for clients vault_client: Optional[VaultClient] = None netbox_client: Optional[NetBoxClient] = None state_client: Optional[StateConfidenceClient] = None protocol: Optional[StructuredProtocol] = None optimizer: Optional[ProtocolOptimizer] = None # Initialize MCP server server = Server("netbox-structured-mcp-server") def initialize_clients() -> None: """Initialize all clients with structured protocol support.""" global vault_client, netbox_client, state_client, protocol, optimizer logger.info("Initializing Structured MCP server...") # Initialize Vault client vault_addr = os.getenv("VAULT_ADDR", "http://localhost:8200") vault_role_id = os.getenv("VAULT_ROLE_ID") vault_secret_id = os.getenv("VAULT_SECRET_ID") if vault_role_id and vault_secret_id: vault_client = VaultClient(vault_addr, vault_role_id, vault_secret_id) if not vault_client.authenticate(): logger.warning("Failed to authenticate with Vault. NetBox access will fail.") else: logger.warning("VAULT_ROLE_ID or VAULT_SECRET_ID not set. Vault authentication will fail.") # Initialize NetBox client netbox_url = os.getenv("NETBOX_URL", "http://localhost:8000") vault_path = os.getenv("VAULT_PATH", "secret/netbox/jit-tokens") if vault_client: netbox_client = NetBoxClient(netbox_url, vault_client, vault_path) logger.info(f"NetBox client initialized for {netbox_url}") else: logger.error("Vault client not available. NetBox client not initialized.") # Initialize state confidence client postgres_host = os.getenv("POSTGRES_HOST") postgres_port = int(os.getenv("POSTGRES_PORT", "5432")) postgres_db = os.getenv("POSTGRES_DB") postgres_user = os.getenv("POSTGRES_USER") postgres_password = os.getenv("POSTGRES_PASSWORD") if all([postgres_host, postgres_db, postgres_user, postgres_password]): state_client = StateConfidenceClient( host=postgres_host, port=postgres_port, database=postgres_db, user=postgres_user, password=postgres_password, ) logger.info("PostgreSQL state confidence client initialized") else: logger.info("PostgreSQL configuration not provided. State confidence scores will not be available.") # Initialize structured protocol protocol = StructuredProtocol() optimizer = ProtocolOptimizer() logger.info("Structured MCP server initialization complete") @server.list_tools() async def list_tools() -> List[Tool]: """List all available tools with structured metadata.""" if not netbox_client: logger.error("NetBox client not initialized") return [] tools = [] # Host tools tools.extend(hosts.get_host_tools(netbox_client, state_client)) # VM tools tools.extend(virtual_machines.get_vm_tools(netbox_client, state_client)) # IP tools tools.extend(ip_addresses.get_ip_tools(netbox_client, state_client)) # VLAN tools tools.extend(vlans.get_vlan_tools(netbox_client, state_client)) return tools @server.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Call tool with structured protocol support.""" if not netbox_client or not protocol: return [TextContent(type="text", text="Error: MCP server not properly initialized")] start_time = time.time() try: # Create structured MCP request mcp_request = protocol.create_mcp_request( query_id=arguments.get("query_id", "unknown"), tool_name=name, parameters=arguments, context=arguments.get("context", {}), priority=arguments.get("priority", 1), timeout=arguments.get("timeout", 30.0) ) # Log structured request logger.debug(f"Structured MCP request: {protocol.serialize_message(mcp_request)}") # Route to appropriate handler result = await _handle_structured_tool_call(mcp_request) processing_time = time.time() - start_time # Create structured MCP response mcp_response = protocol.create_mcp_response( query_id=mcp_request.query_id, tool_name=name, data=result.get("data", []), metadata=result.get("metadata", {}), confidence=result.get("confidence", 0.8), processing_time=processing_time, cache_hit=result.get("cache_hit", False) ) # Log structured response logger.debug(f"Structured MCP response: {protocol.serialize_message(mcp_response)}") # Update metrics optimizer.update_metrics( processing_time=processing_time, tokens_used=result.get("tokens_used", 0), cost=result.get("cost", 0.0), cache_hit=result.get("cache_hit", False) ) # Return structured response return [TextContent(type="text", text=protocol.serialize_message(mcp_response))] except Exception as e: logger.error(f"Error calling structured tool {name}: {e}") error_response = { "error": str(e), "query_id": arguments.get("query_id", "unknown"), "tool_name": name, "timestamp": time.time() } return [TextContent(type="text", text=json.dumps(error_response, indent=2))] async def _handle_structured_tool_call(mcp_request: MCPRequest) -> Dict[str, Any]: """Handle a structured tool call.""" tool_name = mcp_request.tool_name parameters = mcp_request.parameters # Remove protocol-specific parameters clean_parameters = {k: v for k, v in parameters.items() if k not in ["query_id", "context", "priority", "timeout"]} # Route to appropriate handler if tool_name.startswith("list_hosts"): result = await hosts.handle_list_hosts(clean_parameters, netbox_client, state_client) elif tool_name.startswith("get_host"): result = await hosts.handle_get_host(clean_parameters, netbox_client, state_client) elif tool_name.startswith("search_hosts"): result = await hosts.handle_search_hosts(clean_parameters, netbox_client, state_client) elif tool_name.startswith("list_vms"): result = await virtual_machines.handle_list_vms(clean_parameters, netbox_client, state_client) elif tool_name.startswith("get_vm"): result = await virtual_machines.handle_get_vm(clean_parameters, netbox_client, state_client) elif tool_name.startswith("list_vm_interfaces"): result = await virtual_machines.handle_list_vm_interfaces(clean_parameters, netbox_client, state_client) elif tool_name.startswith("list_ips"): result = await ip_addresses.handle_list_ips(clean_parameters, netbox_client, state_client) elif tool_name.startswith("get_ip"): result = await ip_addresses.handle_get_ip(clean_parameters, netbox_client, state_client) elif tool_name.startswith("search_ips"): result = await ip_addresses.handle_search_ips(clean_parameters, netbox_client, state_client) elif tool_name.startswith("list_vlans"): result = await vlans.handle_list_vlans(clean_parameters, netbox_client, state_client) elif tool_name.startswith("get_vlan"): result = await vlans.handle_get_vlan(clean_parameters, netbox_client, state_client) elif tool_name.startswith("list_vlan_ips"): result = await vlans.handle_list_vlan_ips(clean_parameters, netbox_client, state_client) else: return { "data": [], "metadata": {"error": f"Unknown tool: {tool_name}"}, "confidence": 0.0, "cache_hit": False } # Parse result and extract structured data structured_data = [] metadata = { "tool_name": tool_name, "result_count": len(result) if result else 0, "timestamp": time.time() } if result: for item in result: if hasattr(item, 'text') and item.text: try: # Try to parse as JSON first data = json.loads(item.text) if isinstance(data, list): structured_data.extend(data) else: structured_data.append(data) except: # If not JSON, treat as text structured_data.append({"content": item.text, "type": "text"}) # Calculate confidence based on data quality confidence = _calculate_confidence(structured_data, mcp_request.context) return { "data": structured_data, "metadata": metadata, "confidence": confidence, "cache_hit": False, # TODO: Implement caching "tokens_used": len(str(structured_data)) // 4, # Rough estimation "cost": 0.0 # TODO: Implement cost calculation } def _calculate_confidence(data: List[Dict[str, Any]], context: Dict[str, Any]) -> float: """Calculate confidence score for structured data.""" if not data: return 0.0 # Base confidence confidence = 0.8 # Adjust based on data quality for item in data: if isinstance(item, dict): # Check for completeness if len(item) > 3: confidence += 0.05 # Check for certainty scores if 'certainty_score' in item: confidence += 0.1 # Check for required fields if any(field in item for field in ['id', 'name', 'address']): confidence += 0.05 # Cap at 1.0 return min(confidence, 1.0) @server.list_resources() async def list_resources() -> List[Dict[str, Any]]: """List available resources with structured metadata.""" return [ { "uri": "netbox://hosts", "name": "NetBox Hosts", "description": "Infrastructure hosts from NetBox", "mimeType": "application/json" }, { "uri": "netbox://vms", "name": "NetBox Virtual Machines", "description": "Virtual machines and containers from NetBox", "mimeType": "application/json" }, { "uri": "netbox://ips", "name": "NetBox IP Addresses", "description": "IP addresses from NetBox", "mimeType": "application/json" }, { "uri": "netbox://vlans", "name": "NetBox VLANs", "description": "VLANs from NetBox", "mimeType": "application/json" } ] @server.read_resource() async def read_resource(uri: str) -> str: """Read a resource with structured data.""" if not netbox_client: return json.dumps({"error": "NetBox client not initialized"}) # Parse URI if uri.startswith("netbox://"): resource_type = uri.split("://")[1] else: return json.dumps({"error": "Invalid URI format"}) # Get data based on resource type data = [] if resource_type == "hosts": devices = netbox_client.list_devices(limit=100) data = [netbox_client._record_to_dict(device) for device in devices] elif resource_type == "vms": vms = netbox_client.list_virtual_machines(limit=100) data = [netbox_client._record_to_dict(vm) for vm in vms] elif resource_type == "ips": ips = netbox_client.list_ip_addresses(limit=100) data = [netbox_client._record_to_dict(ip) for ip in ips] elif resource_type == "vlans": vlans = netbox_client.list_vlans(limit=100) data = [netbox_client._record_to_dict(vlan) for vlan in vlans] else: return json.dumps({"error": f"Unknown resource type: {resource_type}"}) # Return structured JSON return json.dumps({ "resource_type": resource_type, "data": data, "count": len(data), "timestamp": time.time(), "source": "netbox" }, indent=2) async def main(): """Main entry point for structured MCP server.""" logger.info("Starting Structured NetBox MCP Server...") logger.info(f"NetBox URL: {os.getenv('NETBOX_URL', 'http://localhost:8000')}") logger.info(f"Vault Address: {os.getenv('VAULT_ADDR', 'http://localhost:8200')}") logger.info("Optimized for full-stack control with structured JSON protocol") # Initialize clients initialize_clients() async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options(), ) if __name__ == "__main__": # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fringemonkey/mcp-dc'

If you have feedback or need assistance with the MCP directory API, please join our Discord server