structured_mcp_server.py•13.9 kB
"""Structured MCP server optimized for full-stack control.
This server is designed to work with a controlled gateway/router system
where we have full control over the entire data flow from user to LLM to MCP.
"""
import asyncio
import json
import logging
import os
import sys
import time
from typing import Any, Dict, List, Optional
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from .vault_client import VaultClient
from .netbox_client import NetBoxClient
from .state_confidence import StateConfidenceClient
from .structured_protocol import (
StructuredProtocol, ProtocolOptimizer, MCPRequest, MCPResponse,
ToolCategory, ModelLocation
)
from .tools import hosts, virtual_machines, ip_addresses, vlans
logger = logging.getLogger(__name__)
# Global variables for clients
vault_client: Optional[VaultClient] = None
netbox_client: Optional[NetBoxClient] = None
state_client: Optional[StateConfidenceClient] = None
protocol: Optional[StructuredProtocol] = None
optimizer: Optional[ProtocolOptimizer] = None
# Initialize MCP server
server = Server("netbox-structured-mcp-server")
def initialize_clients() -> None:
"""Initialize all clients with structured protocol support."""
global vault_client, netbox_client, state_client, protocol, optimizer
logger.info("Initializing Structured MCP server...")
# Initialize Vault client
vault_addr = os.getenv("VAULT_ADDR", "http://localhost:8200")
vault_role_id = os.getenv("VAULT_ROLE_ID")
vault_secret_id = os.getenv("VAULT_SECRET_ID")
if vault_role_id and vault_secret_id:
vault_client = VaultClient(vault_addr, vault_role_id, vault_secret_id)
if not vault_client.authenticate():
logger.warning("Failed to authenticate with Vault. NetBox access will fail.")
else:
logger.warning("VAULT_ROLE_ID or VAULT_SECRET_ID not set. Vault authentication will fail.")
# Initialize NetBox client
netbox_url = os.getenv("NETBOX_URL", "http://localhost:8000")
vault_path = os.getenv("VAULT_PATH", "secret/netbox/jit-tokens")
if vault_client:
netbox_client = NetBoxClient(netbox_url, vault_client, vault_path)
logger.info(f"NetBox client initialized for {netbox_url}")
else:
logger.error("Vault client not available. NetBox client not initialized.")
# Initialize state confidence client
postgres_host = os.getenv("POSTGRES_HOST")
postgres_port = int(os.getenv("POSTGRES_PORT", "5432"))
postgres_db = os.getenv("POSTGRES_DB")
postgres_user = os.getenv("POSTGRES_USER")
postgres_password = os.getenv("POSTGRES_PASSWORD")
if all([postgres_host, postgres_db, postgres_user, postgres_password]):
state_client = StateConfidenceClient(
host=postgres_host,
port=postgres_port,
database=postgres_db,
user=postgres_user,
password=postgres_password,
)
logger.info("PostgreSQL state confidence client initialized")
else:
logger.info("PostgreSQL configuration not provided. State confidence scores will not be available.")
# Initialize structured protocol
protocol = StructuredProtocol()
optimizer = ProtocolOptimizer()
logger.info("Structured MCP server initialization complete")
@server.list_tools()
async def list_tools() -> List[Tool]:
"""List all available tools with structured metadata."""
if not netbox_client:
logger.error("NetBox client not initialized")
return []
tools = []
# Host tools
tools.extend(hosts.get_host_tools(netbox_client, state_client))
# VM tools
tools.extend(virtual_machines.get_vm_tools(netbox_client, state_client))
# IP tools
tools.extend(ip_addresses.get_ip_tools(netbox_client, state_client))
# VLAN tools
tools.extend(vlans.get_vlan_tools(netbox_client, state_client))
return tools
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Call tool with structured protocol support."""
if not netbox_client or not protocol:
return [TextContent(type="text", text="Error: MCP server not properly initialized")]
start_time = time.time()
try:
# Create structured MCP request
mcp_request = protocol.create_mcp_request(
query_id=arguments.get("query_id", "unknown"),
tool_name=name,
parameters=arguments,
context=arguments.get("context", {}),
priority=arguments.get("priority", 1),
timeout=arguments.get("timeout", 30.0)
)
# Log structured request
logger.debug(f"Structured MCP request: {protocol.serialize_message(mcp_request)}")
# Route to appropriate handler
result = await _handle_structured_tool_call(mcp_request)
processing_time = time.time() - start_time
# Create structured MCP response
mcp_response = protocol.create_mcp_response(
query_id=mcp_request.query_id,
tool_name=name,
data=result.get("data", []),
metadata=result.get("metadata", {}),
confidence=result.get("confidence", 0.8),
processing_time=processing_time,
cache_hit=result.get("cache_hit", False)
)
# Log structured response
logger.debug(f"Structured MCP response: {protocol.serialize_message(mcp_response)}")
# Update metrics
optimizer.update_metrics(
processing_time=processing_time,
tokens_used=result.get("tokens_used", 0),
cost=result.get("cost", 0.0),
cache_hit=result.get("cache_hit", False)
)
# Return structured response
return [TextContent(type="text", text=protocol.serialize_message(mcp_response))]
except Exception as e:
logger.error(f"Error calling structured tool {name}: {e}")
error_response = {
"error": str(e),
"query_id": arguments.get("query_id", "unknown"),
"tool_name": name,
"timestamp": time.time()
}
return [TextContent(type="text", text=json.dumps(error_response, indent=2))]
async def _handle_structured_tool_call(mcp_request: MCPRequest) -> Dict[str, Any]:
"""Handle a structured tool call."""
tool_name = mcp_request.tool_name
parameters = mcp_request.parameters
# Remove protocol-specific parameters
clean_parameters = {k: v for k, v in parameters.items()
if k not in ["query_id", "context", "priority", "timeout"]}
# Route to appropriate handler
if tool_name.startswith("list_hosts"):
result = await hosts.handle_list_hosts(clean_parameters, netbox_client, state_client)
elif tool_name.startswith("get_host"):
result = await hosts.handle_get_host(clean_parameters, netbox_client, state_client)
elif tool_name.startswith("search_hosts"):
result = await hosts.handle_search_hosts(clean_parameters, netbox_client, state_client)
elif tool_name.startswith("list_vms"):
result = await virtual_machines.handle_list_vms(clean_parameters, netbox_client, state_client)
elif tool_name.startswith("get_vm"):
result = await virtual_machines.handle_get_vm(clean_parameters, netbox_client, state_client)
elif tool_name.startswith("list_vm_interfaces"):
result = await virtual_machines.handle_list_vm_interfaces(clean_parameters, netbox_client, state_client)
elif tool_name.startswith("list_ips"):
result = await ip_addresses.handle_list_ips(clean_parameters, netbox_client, state_client)
elif tool_name.startswith("get_ip"):
result = await ip_addresses.handle_get_ip(clean_parameters, netbox_client, state_client)
elif tool_name.startswith("search_ips"):
result = await ip_addresses.handle_search_ips(clean_parameters, netbox_client, state_client)
elif tool_name.startswith("list_vlans"):
result = await vlans.handle_list_vlans(clean_parameters, netbox_client, state_client)
elif tool_name.startswith("get_vlan"):
result = await vlans.handle_get_vlan(clean_parameters, netbox_client, state_client)
elif tool_name.startswith("list_vlan_ips"):
result = await vlans.handle_list_vlan_ips(clean_parameters, netbox_client, state_client)
else:
return {
"data": [],
"metadata": {"error": f"Unknown tool: {tool_name}"},
"confidence": 0.0,
"cache_hit": False
}
# Parse result and extract structured data
structured_data = []
metadata = {
"tool_name": tool_name,
"result_count": len(result) if result else 0,
"timestamp": time.time()
}
if result:
for item in result:
if hasattr(item, 'text') and item.text:
try:
# Try to parse as JSON first
data = json.loads(item.text)
if isinstance(data, list):
structured_data.extend(data)
else:
structured_data.append(data)
except:
# If not JSON, treat as text
structured_data.append({"content": item.text, "type": "text"})
# Calculate confidence based on data quality
confidence = _calculate_confidence(structured_data, mcp_request.context)
return {
"data": structured_data,
"metadata": metadata,
"confidence": confidence,
"cache_hit": False, # TODO: Implement caching
"tokens_used": len(str(structured_data)) // 4, # Rough estimation
"cost": 0.0 # TODO: Implement cost calculation
}
def _calculate_confidence(data: List[Dict[str, Any]], context: Dict[str, Any]) -> float:
"""Calculate confidence score for structured data."""
if not data:
return 0.0
# Base confidence
confidence = 0.8
# Adjust based on data quality
for item in data:
if isinstance(item, dict):
# Check for completeness
if len(item) > 3:
confidence += 0.05
# Check for certainty scores
if 'certainty_score' in item:
confidence += 0.1
# Check for required fields
if any(field in item for field in ['id', 'name', 'address']):
confidence += 0.05
# Cap at 1.0
return min(confidence, 1.0)
@server.list_resources()
async def list_resources() -> List[Dict[str, Any]]:
"""List available resources with structured metadata."""
return [
{
"uri": "netbox://hosts",
"name": "NetBox Hosts",
"description": "Infrastructure hosts from NetBox",
"mimeType": "application/json"
},
{
"uri": "netbox://vms",
"name": "NetBox Virtual Machines",
"description": "Virtual machines and containers from NetBox",
"mimeType": "application/json"
},
{
"uri": "netbox://ips",
"name": "NetBox IP Addresses",
"description": "IP addresses from NetBox",
"mimeType": "application/json"
},
{
"uri": "netbox://vlans",
"name": "NetBox VLANs",
"description": "VLANs from NetBox",
"mimeType": "application/json"
}
]
@server.read_resource()
async def read_resource(uri: str) -> str:
"""Read a resource with structured data."""
if not netbox_client:
return json.dumps({"error": "NetBox client not initialized"})
# Parse URI
if uri.startswith("netbox://"):
resource_type = uri.split("://")[1]
else:
return json.dumps({"error": "Invalid URI format"})
# Get data based on resource type
data = []
if resource_type == "hosts":
devices = netbox_client.list_devices(limit=100)
data = [netbox_client._record_to_dict(device) for device in devices]
elif resource_type == "vms":
vms = netbox_client.list_virtual_machines(limit=100)
data = [netbox_client._record_to_dict(vm) for vm in vms]
elif resource_type == "ips":
ips = netbox_client.list_ip_addresses(limit=100)
data = [netbox_client._record_to_dict(ip) for ip in ips]
elif resource_type == "vlans":
vlans = netbox_client.list_vlans(limit=100)
data = [netbox_client._record_to_dict(vlan) for vlan in vlans]
else:
return json.dumps({"error": f"Unknown resource type: {resource_type}"})
# Return structured JSON
return json.dumps({
"resource_type": resource_type,
"data": data,
"count": len(data),
"timestamp": time.time(),
"source": "netbox"
}, indent=2)
async def main():
"""Main entry point for structured MCP server."""
logger.info("Starting Structured NetBox MCP Server...")
logger.info(f"NetBox URL: {os.getenv('NETBOX_URL', 'http://localhost:8000')}")
logger.info(f"Vault Address: {os.getenv('VAULT_ADDR', 'http://localhost:8200')}")
logger.info("Optimized for full-stack control with structured JSON protocol")
# Initialize clients
initialize_clients()
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options(),
)
if __name__ == "__main__":
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
asyncio.run(main())