Skip to main content
Glama
chadkunsman

NetBox MCP Server

by chadkunsman
prefixes.py14.2 kB
""" MCP tools for interacting with NetBox prefixes. """ from typing import Dict, List, Optional, Union, Any import re from fastmcp import Context from fastmcp.exceptions import ToolError from config.netbox import get_netbox_client from models.prefix import PrefixFilterParameters, PrefixQuery, PrefixSummary def _parse_natural_language_query(query: str) -> PrefixFilterParameters: """ Parse a natural language query into structured filter parameters. Args: query: Natural language query string Returns: PrefixFilterParameters object with appropriate filters set """ params = PrefixFilterParameters() # Extract site information (standardized) site_match = re.search(r'(?:at|in|from)\s+(?:site\s+)?(\w+)', query, re.IGNORECASE) if site_match: params.site = site_match.group(1) # Extract IP family information (keep domain-specific intelligence) if re.search(r'ipv4|ip4|v4', query, re.IGNORECASE): params.family = 4 elif re.search(r'ipv6|ip6|v6', query, re.IGNORECASE): params.family = 6 # Extract status information (keep limited valid values) if re.search(r'active', query, re.IGNORECASE): params.status = 'active' elif re.search(r'reserved', query, re.IGNORECASE): params.status = 'reserved' elif re.search(r'deprecated', query, re.IGNORECASE): params.status = 'deprecated' elif re.search(r'container', query, re.IGNORECASE): params.status = 'container' # Extract pool information if re.search(r'pool', query, re.IGNORECASE): params.is_pool = True # Extract prefix pattern (CIDR notation - genuinely useful) prefix_match = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d{1,2})', query) if not prefix_match: # Try IPv6 pattern prefix_match = re.search(r'([0-9a-fA-F:]+/\d{1,3})', query) if prefix_match: params.prefix = prefix_match.group(1) # Extract VRF information vrf_match = re.search(r'vrf\s+(\w+)', query, re.IGNORECASE) if vrf_match: params.vrf = vrf_match.group(1) # Extract VLAN information (simplified) vlan_match = re.search(r'(?:vlan|vid)\s*(\w+)', query, re.IGNORECASE) if vlan_match: params.vlan = vlan_match.group(1) # Extract limit information limit_match = re.search(r'(?:first|top|limit)\s+(\d+)', query, re.IGNORECASE) if limit_match: params.limit = min(int(limit_match.group(1)), 1000) # If no specific filters found, use general search if not any([params.site, params.family, params.status, params.prefix, params.vrf, params.vlan, params.is_pool]): params.search = query return params def _format_prefix_for_display(prefix_data: Dict[str, Any]) -> PrefixSummary: """ Convert NetBox prefix data into a formatted summary. Args: prefix_data: Raw prefix data from NetBox API Returns: PrefixSummary object with formatted data """ # Extract nested object names safely site_name = None if prefix_data.get('site'): site_name = prefix_data['site'].get('name') if isinstance(prefix_data['site'], dict) else str(prefix_data['site']) vrf_name = None if prefix_data.get('vrf'): vrf_name = prefix_data['vrf'].get('name') if isinstance(prefix_data['vrf'], dict) else str(prefix_data['vrf']) tenant_name = None if prefix_data.get('tenant'): tenant_name = prefix_data['tenant'].get('name') if isinstance(prefix_data['tenant'], dict) else str(prefix_data['tenant']) vlan_name = None if prefix_data.get('vlan'): if isinstance(prefix_data['vlan'], dict): # VLAN object with name and vid vlan_name = prefix_data['vlan'].get('name', '') vlan_vid = prefix_data['vlan'].get('vid', '') if vlan_name and vlan_vid: vlan_name = f"{vlan_name} (VID {vlan_vid})" elif vlan_vid: vlan_name = f"VLAN {vlan_vid}" else: vlan_name = str(prefix_data['vlan']) role_name = None if prefix_data.get('role'): role_name = prefix_data['role'].get('name') if isinstance(prefix_data['role'], dict) else str(prefix_data['role']) # Handle status status = 'unknown' if prefix_data.get('status'): if isinstance(prefix_data['status'], dict): status = prefix_data['status'].get('label', prefix_data['status'].get('value', 'unknown')) else: status = str(prefix_data['status']) # Handle family family = 'IPv4' if prefix_data.get('family'): if isinstance(prefix_data['family'], dict): family_value = prefix_data['family'].get('value', 4) family = 'IPv6' if family_value == 6 else 'IPv4' elif isinstance(prefix_data['family'], int): family = 'IPv6' if prefix_data['family'] == 6 else 'IPv4' # Extract tags tags = [] if prefix_data.get('tags'): if isinstance(prefix_data['tags'], list): for tag in prefix_data['tags']: if isinstance(tag, dict): tags.append(tag.get('name', str(tag))) else: tags.append(str(tag)) return PrefixSummary( id=prefix_data.get('id', 0), prefix=prefix_data.get('prefix', ''), site=site_name, vrf=vrf_name, tenant=tenant_name, vlan=vlan_name, status=status, role=role_name, family=family, is_pool=prefix_data.get('is_pool', False), description=prefix_data.get('description', ''), tags=tags, utilization=prefix_data.get('utilization'), available_ips=prefix_data.get('available_ips'), created=prefix_data.get('created'), last_updated=prefix_data.get('last_updated') ) def get_prefixes(filter_params: PrefixFilterParameters, ctx: Context) -> Dict[str, Any]: """ Retrieve prefixes from NetBox with filtering options. Args: filter_params: Filter parameters for prefix search ctx: FastMCP context Returns: Dictionary containing prefixes and metadata Raises: ToolError: If the request fails or no prefixes are found """ try: nb = get_netbox_client() # Build query parameters query_params = {} # Handle cross-field search if filter_params.search: query_params['q'] = filter_params.search # NetBox's general search parameter if filter_params.prefix: query_params['prefix'] = filter_params.prefix # Special handling for site (similar to devices tool) if filter_params.site: site_value = filter_params.site if site_value.isdigit(): query_params['site_id'] = site_value else: # Try to find site by name try: site = nb.dcim.sites.get(name=site_value) if site: query_params['site_id'] = site.id else: # Try case-insensitive search sites = list(nb.dcim.sites.filter(name__ic=site_value)) if sites: query_params['site_id'] = sites[0].id else: raise ToolError(f"Site '{site_value}' not found. Please check the site name.") except Exception as e: if "not found" in str(e): raise e raise ToolError(f"Error looking up site '{site_value}': {str(e)}") if filter_params.vrf: query_params['vrf'] = filter_params.vrf if filter_params.tenant: query_params['tenant'] = filter_params.tenant # Special handling for VLAN (similar to site) if filter_params.vlan: vlan_value = filter_params.vlan if vlan_value.isdigit(): # Could be VLAN ID or VID try: vlan = nb.ipam.vlans.get(id=vlan_value) if vlan: query_params['vlan_id'] = vlan.id else: # Try by VID vlan = nb.ipam.vlans.get(vid=vlan_value) if vlan: query_params['vlan_id'] = vlan.id else: raise ToolError(f"VLAN with ID or VID '{vlan_value}' not found.") except Exception as e: if "not found" in str(e): raise e raise ToolError(f"Error looking up VLAN '{vlan_value}': {str(e)}") else: # Try to find VLAN by name try: vlan = nb.ipam.vlans.get(name=vlan_value) if vlan: query_params['vlan_id'] = vlan.id else: # Try case-insensitive search vlans = list(nb.ipam.vlans.filter(name__ic=vlan_value)) if vlans: query_params['vlan_id'] = vlans[0].id else: raise ToolError(f"VLAN '{vlan_value}' not found. Please check the VLAN name.") except Exception as e: if "not found" in str(e): raise e raise ToolError(f"Error looking up VLAN '{vlan_value}': {str(e)}") if filter_params.status: query_params['status'] = filter_params.status if filter_params.role: query_params['role'] = filter_params.role if filter_params.family: query_params['family'] = filter_params.family if filter_params.is_pool is not None: query_params['is_pool'] = filter_params.is_pool if filter_params.tag: query_params['tag'] = filter_params.tag query_params['limit'] = filter_params.limit # Execute query prefixes = list(nb.ipam.prefixes.filter(**query_params)) if not prefixes: return { "message": "No prefixes found matching the specified criteria", "count": 0, "prefixes": [] } # Format prefixes for display formatted_prefixes = [] for prefix in prefixes: try: formatted_prefix = _format_prefix_for_display(dict(prefix)) formatted_prefixes.append(formatted_prefix.model_dump()) except Exception as e: # Log the error but continue processing other prefixes continue return { "message": f"Found {len(formatted_prefixes)} prefix(es)", "count": len(formatted_prefixes), "prefixes": formatted_prefixes } except Exception as e: raise ToolError(f"Failed to retrieve prefixes: {str(e)}") def get_prefix(prefix_id: int, ctx: Context) -> Dict[str, Any]: """ Retrieve detailed information about a specific prefix by ID. Args: prefix_id: NetBox prefix ID ctx: FastMCP context Returns: Dictionary containing detailed prefix information Raises: ToolError: If the prefix is not found or request fails """ try: nb = get_netbox_client() prefix = nb.ipam.prefixes.get(prefix_id) if not prefix: raise ToolError(f"Prefix with ID {prefix_id} not found") formatted_prefix = _format_prefix_for_display(dict(prefix)) return { "message": f"Retrieved prefix details for {formatted_prefix.prefix}", "prefix": formatted_prefix.model_dump() } except Exception as e: raise ToolError(f"Failed to retrieve prefix {prefix_id}: {str(e)}") def ask_about_prefixes(query: PrefixQuery, ctx: Context) -> Dict[str, Any]: """ Process natural language queries about prefixes and return relevant results. Args: query: Natural language query about prefixes ctx: FastMCP context Returns: Dictionary containing query results and explanations Raises: ToolError: If the query cannot be processed or fails """ try: # Parse the natural language query filter_params = _parse_natural_language_query(query.query) # Execute the search result = get_prefixes(filter_params, ctx) # Add query interpretation to the response interpretation = [] if filter_params.prefix: interpretation.append(f"Prefix: {filter_params.prefix}") if filter_params.site: interpretation.append(f"Site: {filter_params.site}") if filter_params.family: family_str = "IPv6" if filter_params.family == 6 else "IPv4" interpretation.append(f"IP Family: {family_str}") if filter_params.status: interpretation.append(f"Status: {filter_params.status}") if filter_params.is_pool is not None: interpretation.append(f"Pool: {'Yes' if filter_params.is_pool else 'No'}") if filter_params.vrf: interpretation.append(f"VRF: {filter_params.vrf}") if filter_params.tenant: interpretation.append(f"Tenant: {filter_params.tenant}") if filter_params.vlan: interpretation.append(f"VLAN: {filter_params.vlan}") if filter_params.limit != 50: interpretation.append(f"Limit: {filter_params.limit}") result["query_interpretation"] = interpretation result["original_query"] = query.query return result except Exception as e: raise ToolError(f"Failed to process prefix query '{query.query}': {str(e)}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/chadkunsman/netbox_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server