llm_mcp_server.py•8.87 kB
"""LLM-optimized MCP server for local LLMs via OpenAI API gateway.
This server is specifically optimized for local LLMs accessing NetBox data
through an OpenAI API gateway, with response formatting and caching optimized
for LLM consumption.
"""
import asyncio
import logging
import os
import sys
from typing import Any, Dict, List, Optional
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from .vault_client import VaultClient
from .netbox_client import NetBoxClient
from .state_confidence import StateConfidenceClient
from .llm_optimizer import LLMOptimizer, LLMResponseFormatter, LLMCache
from .tools import hosts, virtual_machines, ip_addresses, vlans
logger = logging.getLogger(__name__)
# Global variables for clients
vault_client: Optional[VaultClient] = None
netbox_client: Optional[NetBoxClient] = None
state_client: Optional[StateConfidenceClient] = None
llm_optimizer: Optional[LLMOptimizer] = None
llm_cache: Optional[LLMCache] = None
# Initialize MCP server
server = Server("netbox-llm-mcp-server")
def initialize_clients() -> None:
"""Initialize all clients with LLM optimizations."""
global vault_client, netbox_client, state_client, llm_optimizer, llm_cache
logger.info("Initializing LLM-optimized MCP server...")
# Initialize Vault client
vault_addr = os.getenv("VAULT_ADDR", "http://localhost:8200")
vault_role_id = os.getenv("VAULT_ROLE_ID")
vault_secret_id = os.getenv("VAULT_SECRET_ID")
if vault_role_id and vault_secret_id:
vault_client = VaultClient(vault_addr, vault_role_id, vault_secret_id)
if not vault_client.authenticate():
logger.warning("Failed to authenticate with Vault. NetBox access will fail.")
else:
logger.warning("VAULT_ROLE_ID or VAULT_SECRET_ID not set. Vault authentication will fail.")
# Initialize NetBox client
netbox_url = os.getenv("NETBOX_URL", "http://localhost:8000")
vault_path = os.getenv("VAULT_PATH", "secret/netbox/jit-tokens")
if vault_client:
netbox_client = NetBoxClient(netbox_url, vault_client, vault_path)
logger.info(f"NetBox client initialized for {netbox_url}")
else:
logger.error("Vault client not available. NetBox client not initialized.")
# Initialize state confidence client
postgres_host = os.getenv("POSTGRES_HOST")
postgres_port = int(os.getenv("POSTGRES_PORT", "5432"))
postgres_db = os.getenv("POSTGRES_DB")
postgres_user = os.getenv("POSTGRES_USER")
postgres_password = os.getenv("POSTGRES_PASSWORD")
if all([postgres_host, postgres_db, postgres_user, postgres_password]):
state_client = StateConfidenceClient(
host=postgres_host,
port=postgres_port,
database=postgres_db,
user=postgres_user,
password=postgres_password,
)
logger.info("PostgreSQL state confidence client initialized")
else:
logger.info("PostgreSQL configuration not provided. State confidence scores will not be available.")
# Initialize LLM optimizer
llm_optimizer = LLMOptimizer(max_workers=4)
llm_cache = LLMCache(max_size=500, ttl=300) # 5 minute cache
logger.info("LLM-optimized MCP server initialization complete")
@server.list_tools()
async def list_tools() -> List[Tool]:
"""List all available LLM-optimized tools."""
if not netbox_client:
logger.error("NetBox client not initialized")
return []
tools = []
# Host tools
tools.extend(hosts.get_host_tools(netbox_client, state_client))
# VM tools
tools.extend(virtual_machines.get_vm_tools(netbox_client, state_client))
# IP tools
tools.extend(ip_addresses.get_ip_tools(netbox_client, state_client))
# VLAN tools
tools.extend(vlans.get_vlan_tools(netbox_client, state_client))
return tools
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Call tool with LLM optimization."""
if not netbox_client or not llm_optimizer:
return [TextContent(type="text", text="Error: MCP server not properly initialized")]
# Create cache key for this request
cache_key = f"{name}:{str(sorted(arguments.items()))}"
# Check cache first
if llm_cache:
cached_response = llm_cache.get(cache_key)
if cached_response:
logger.debug(f"Cache hit for {name}")
return [TextContent(type="text", text=cached_response.content)]
# Route to appropriate handler
try:
if name.startswith("list_hosts"):
result = await hosts.handle_list_hosts(arguments, netbox_client, state_client)
elif name.startswith("get_host"):
result = await hosts.handle_get_host(arguments, netbox_client, state_client)
elif name.startswith("search_hosts"):
result = await hosts.handle_search_hosts(arguments, netbox_client, state_client)
elif name.startswith("list_vms"):
result = await virtual_machines.handle_list_vms(arguments, netbox_client, state_client)
elif name.startswith("get_vm"):
result = await virtual_machines.handle_get_vm(arguments, netbox_client, state_client)
elif name.startswith("list_vm_interfaces"):
result = await virtual_machines.handle_list_vm_interfaces(arguments, netbox_client, state_client)
elif name.startswith("list_ips"):
result = await ip_addresses.handle_list_ips(arguments, netbox_client, state_client)
elif name.startswith("get_ip"):
result = await ip_addresses.handle_get_ip(arguments, netbox_client, state_client)
elif name.startswith("search_ips"):
result = await ip_addresses.handle_search_ips(arguments, netbox_client, state_client)
elif name.startswith("list_vlans"):
result = await vlans.handle_list_vlans(arguments, netbox_client, state_client)
elif name.startswith("get_vlan"):
result = await vlans.handle_get_vlan(arguments, netbox_client, state_client)
elif name.startswith("list_vlan_ips"):
result = await vlans.handle_list_vlan_ips(arguments, netbox_client, state_client)
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
# Optimize response for LLM
if result and len(result) > 0:
# Extract data from result for optimization
raw_data = []
for item in result:
if hasattr(item, 'text') and item.text:
# Parse JSON from text if possible
try:
import json
data = json.loads(item.text)
if isinstance(data, list):
raw_data.extend(data)
else:
raw_data.append(data)
except:
# If not JSON, treat as text
raw_data.append({'content': item.text})
# Determine response type
response_type = "list"
if name.startswith("get_"):
response_type = "detail"
elif name.startswith("search_"):
response_type = "search"
# Optimize for LLM
optimized_response = llm_optimizer.optimize_for_llm(raw_data, response_type)
# Cache the optimized response
if llm_cache:
llm_cache.put(cache_key, optimized_response)
# Return optimized content
return [TextContent(type="text", text=optimized_response.content)]
else:
return [TextContent(type="text", text="No results found.")]
except Exception as e:
logger.error(f"Error calling tool {name}: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def main():
"""Main entry point for LLM-optimized MCP server."""
logger.info("Starting LLM-optimized NetBox MCP Server...")
logger.info(f"NetBox URL: {os.getenv('NETBOX_URL', 'http://localhost:8000')}")
logger.info(f"Vault Address: {os.getenv('VAULT_ADDR', 'http://localhost:8200')}")
logger.info("Optimized for local LLMs via OpenAI API gateway")
# Initialize clients
initialize_clients()
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options(),
)
if __name__ == "__main__":
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
asyncio.run(main())