Skip to main content
Glama

NetBox MCP Server

by fringemonkey
netbox_client.py18.2 kB
"""NetBox API client with Vault token integration.""" import logging from typing import Any, Dict, List, Optional import asyncio import time from functools import lru_cache import pynetbox from pynetbox.core.response import Record from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from src.vault_client import VaultClient logger = logging.getLogger(__name__) class NetBoxClient: """Client for NetBox API with Vault-based authentication.""" def __init__(self, netbox_url: str, vault_client: VaultClient, vault_path: str): """Initialize NetBox client. Args: netbox_url: NetBox API URL vault_client: VaultClient instance for token minting vault_path: Vault path for NetBox tokens """ self.netbox_url = netbox_url self.vault_client = vault_client self.vault_path = vault_path self._api: Optional[pynetbox.api] = None self._token: Optional[str] = None # Performance optimizations - like good Soviet engineering! self._cache = {} # Simple in-memory cache self._cache_ttl = 300 # 5 minutes cache TTL self._last_cache_cleanup = time.time() self._session_config = self._create_session_config() def _create_session_config(self) -> Dict[str, Any]: """Create optimized session configuration for better performance. Returns: Session configuration dictionary """ # Retry strategy - like good Soviet persistence! retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["HEAD", "GET", "POST", "PUT", "DELETE", "OPTIONS", "TRACE"] ) # HTTP adapter with connection pooling adapter = HTTPAdapter( max_retries=retry_strategy, pool_connections=10, # Number of connection pools pool_maxsize=20, # Maximum connections per pool pool_block=False # Don't block when pool is full ) return { 'adapter': adapter, 'timeout': 30, # 30 second timeout 'verify': True # SSL verification } def _clean_cache(self) -> None: """Clean expired cache entries - like good housekeeping!""" current_time = time.time() if current_time - self._last_cache_cleanup > 60: # Clean every minute expired_keys = [ key for key, (_, timestamp) in self._cache.items() if current_time - timestamp > self._cache_ttl ] for key in expired_keys: del self._cache[key] self._last_cache_cleanup = current_time def _get_cache_key(self, method: str, **kwargs) -> str: """Generate cache key for method call.""" # Sort kwargs for consistent cache keys sorted_kwargs = sorted(kwargs.items()) return f"{method}:{str(sorted_kwargs)}" def _ensure_token(self) -> bool: """Ensure we have a valid NetBox API token. Returns: True if token is available, False otherwise """ token = self.vault_client.mint_netbox_token(self.vault_path) if token: self._token = token return True return False def _get_api(self) -> Optional[pynetbox.api]: """Get NetBox API client instance with optimized session. Returns: pynetbox API instance or None if token unavailable """ if not self._token and not self._ensure_token(): logger.error("Failed to obtain NetBox API token") return None if not self._api or self._api.token != self._token: self._api = pynetbox.api(self.netbox_url, token=self._token) # Apply optimized session configuration - like tuning skates! session = self._api.http_session session.mount('http://', self._session_config['adapter']) session.mount('https://', self._session_config['adapter']) session.timeout = self._session_config['timeout'] session.verify = self._session_config['verify'] return self._api def _record_to_dict(self, record: Record) -> Dict[str, Any]: """Convert NetBox Record to dictionary. Args: record: NetBox Record object Returns: Dictionary representation of the record """ if not record: return {} data = {} for key, value in record.items(): if isinstance(value, Record): # Handle nested records (e.g., device.rack) data[key] = { "id": value.id, "display": str(value), "url": getattr(value, "url", None), } elif isinstance(value, list): # Handle lists of records data[key] = [ { "id": item.id, "display": str(item), "url": getattr(item, "url", None), } if isinstance(item, Record) else item for item in value ] else: data[key] = value return data def _get_cached_result(self, method: str, **kwargs) -> Optional[List[Dict[str, Any]]]: """Get cached result if available and not expired.""" self._clean_cache() cache_key = self._get_cache_key(method, **kwargs) if cache_key in self._cache: result, timestamp = self._cache[cache_key] if time.time() - timestamp < self._cache_ttl: logger.debug(f"Cache hit for {method}") return result else: del self._cache[cache_key] return None def _cache_result(self, method: str, result: List[Dict[str, Any]], **kwargs) -> None: """Cache result for future use.""" cache_key = self._get_cache_key(method, **kwargs) self._cache[cache_key] = (result, time.time()) # Device (Host) Methods def list_devices( self, name: Optional[str] = None, primary_ip: Optional[str] = None, role: Optional[str] = None, limit: int = 100, ) -> List[Dict[str, Any]]: """List devices (hosts) from NetBox with caching. Args: name: Filter by device name (partial match) primary_ip: Filter by primary IP address role: Filter by device role limit: Maximum number of results Returns: List of device dictionaries """ # Check cache first - like checking if skates are already sharpened! cached_result = self._get_cached_result( 'list_devices', name=name, primary_ip=primary_ip, role=role, limit=limit ) if cached_result is not None: return cached_result api = self._get_api() if not api: return [] try: devices = api.dcim.devices.all() filters = {} if name: filters["name__ic"] = name if primary_ip: filters["primary_ip4__address"] = primary_ip if role: filters["device_role__slug"] = role if filters: devices = devices.filter(**filters) results = [] for device in devices[:limit]: results.append(self._record_to_dict(device)) # Cache the result - like storing sharpened skates for next time! self._cache_result( 'list_devices', results, name=name, primary_ip=primary_ip, role=role, limit=limit ) return results except Exception as e: logger.error(f"Error listing devices: {e}") return [] def get_device(self, name: str) -> Optional[Dict[str, Any]]: """Get a specific device by name. Args: name: Device name Returns: Device dictionary or None if not found """ api = self._get_api() if not api: return None try: device = api.dcim.devices.get(name=name) if device: return self._record_to_dict(device) return None except Exception as e: logger.error(f"Error getting device {name}: {e}") return None def search_devices(self, query: str, limit: int = 50) -> List[Dict[str, Any]]: """Search devices by name or IP address. Args: query: Search query limit: Maximum number of results Returns: List of matching device dictionaries """ api = self._get_api() if not api: return [] try: # Try searching by name first devices = api.dcim.devices.filter(name__ic=query) results = [] seen_ids = set() for device in devices[:limit]: if device.id not in seen_ids: results.append(self._record_to_dict(device)) seen_ids.add(device.id) # Also search by IP if query looks like an IP if "." in query or ":" in query: ip_addresses = api.ipam.ip_addresses.filter(address=query) for ip_addr in ip_addresses: if ip_addr.assigned_object_type == "dcim.interface": interface = ip_addr.assigned_object if interface.device and interface.device.id not in seen_ids: results.append(self._record_to_dict(interface.device)) seen_ids.add(interface.device.id) return results[:limit] except Exception as e: logger.error(f"Error searching devices: {e}") return [] # Virtual Machine Methods def list_virtual_machines( self, name: Optional[str] = None, role: Optional[str] = None, primary_ip: Optional[str] = None, limit: int = 100, ) -> List[Dict[str, Any]]: """List virtual machines from NetBox. Args: name: Filter by VM name (partial match) role: Filter by VM role primary_ip: Filter by primary IP address limit: Maximum number of results Returns: List of VM dictionaries """ api = self._get_api() if not api: return [] try: vms = api.virtualization.virtual_machines.all() filters = {} if name: filters["name__ic"] = name if role: filters["role__slug"] = role if primary_ip: filters["primary_ip4__address"] = primary_ip if filters: vms = vms.filter(**filters) results = [] for vm in vms[:limit]: results.append(self._record_to_dict(vm)) return results except Exception as e: logger.error(f"Error listing virtual machines: {e}") return [] def get_virtual_machine(self, name: str) -> Optional[Dict[str, Any]]: """Get a specific virtual machine by name. Args: name: VM name Returns: VM dictionary or None if not found """ api = self._get_api() if not api: return None try: vm = api.virtualization.virtual_machines.get(name=name) if vm: return self._record_to_dict(vm) return None except Exception as e: logger.error(f"Error getting VM {name}: {e}") return None def list_vm_interfaces(self, vm_name: str) -> List[Dict[str, Any]]: """List network interfaces for a VM. Args: vm_name: Virtual machine name Returns: List of interface dictionaries """ api = self._get_api() if not api: return [] try: vm = api.virtualization.virtual_machines.get(name=vm_name) if not vm: return [] interfaces = [] for interface in vm.interfaces.all(): interface_data = self._record_to_dict(interface) # Get IP addresses assigned to this interface ip_addresses = [] for ip_addr in interface.ip_addresses.all(): ip_addresses.append(self._record_to_dict(ip_addr)) interface_data["ip_addresses"] = ip_addresses interfaces.append(interface_data) return interfaces except Exception as e: logger.error(f"Error listing VM interfaces for {vm_name}: {e}") return [] # IP Address Methods def list_ip_addresses( self, address: Optional[str] = None, device: Optional[str] = None, limit: int = 100, ) -> List[Dict[str, Any]]: """List IP addresses from NetBox. Args: address: Filter by IP address device: Filter by device name limit: Maximum number of results Returns: List of IP address dictionaries """ api = self._get_api() if not api: return [] try: ip_addresses = api.ipam.ip_addresses.all() filters = {} if address: filters["address"] = address if device: filters["device"] = device if filters: ip_addresses = ip_addresses.filter(**filters) results = [] for ip_addr in ip_addresses[:limit]: results.append(self._record_to_dict(ip_addr)) return results except Exception as e: logger.error(f"Error listing IP addresses: {e}") return [] def get_ip_address(self, address: str) -> Optional[Dict[str, Any]]: """Get a specific IP address. Args: address: IP address (with CIDR notation if needed) Returns: IP address dictionary or None if not found """ api = self._get_api() if not api: return None try: ip_addr = api.ipam.ip_addresses.get(address=address) if ip_addr: return self._record_to_dict(ip_addr) return None except Exception as e: logger.error(f"Error getting IP address {address}: {e}") return None def search_ip_addresses(self, query: str, limit: int = 50) -> List[Dict[str, Any]]: """Search IP addresses by address or hostname. Args: query: Search query (IP address or hostname) limit: Maximum number of results Returns: List of matching IP address dictionaries """ api = self._get_api() if not api: return [] try: ip_addresses = api.ipam.ip_addresses.filter(address__ic=query) results = [] for ip_addr in ip_addresses[:limit]: results.append(self._record_to_dict(ip_addr)) return results except Exception as e: logger.error(f"Error searching IP addresses: {e}") return [] # VLAN Methods def list_vlans( self, vid: Optional[int] = None, name: Optional[str] = None, site: Optional[str] = None, limit: int = 100, ) -> List[Dict[str, Any]]: """List VLANs from NetBox. Args: vid: Filter by VLAN ID name: Filter by VLAN name site: Filter by site slug limit: Maximum number of results Returns: List of VLAN dictionaries """ api = self._get_api() if not api: return [] try: vlans = api.ipam.vlans.all() filters = {} if vid: filters["vid"] = vid if name: filters["name__ic"] = name if site: filters["site__slug"] = site if filters: vlans = vlans.filter(**filters) results = [] for vlan in vlans[:limit]: results.append(self._record_to_dict(vlan)) return results except Exception as e: logger.error(f"Error listing VLANs: {e}") return [] def get_vlan(self, vid: int) -> Optional[Dict[str, Any]]: """Get a specific VLAN by ID. Args: vid: VLAN ID Returns: VLAN dictionary or None if not found """ api = self._get_api() if not api: return None try: vlan = api.ipam.vlans.get(vid=vid) if vlan: return self._record_to_dict(vlan) return None except Exception as e: logger.error(f"Error getting VLAN {vid}: {e}") return None def list_vlan_ip_addresses(self, vid: int) -> List[Dict[str, Any]]: """List IP addresses assigned to a VLAN. Args: vid: VLAN ID Returns: List of IP address dictionaries """ api = self._get_api() if not api: return [] try: vlan = api.ipam.vlans.get(vid=vid) if not vlan: return [] ip_addresses = [] for ip_addr in vlan.ip_addresses.all(): ip_addresses.append(self._record_to_dict(ip_addr)) return ip_addresses except Exception as e: logger.error(f"Error listing IP addresses for VLAN {vid}: {e}") return []

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fringemonkey/mcp-dc'

If you have feedback or need assistance with the MCP directory API, please join our Discord server