Skip to main content
Glama

NetBox Read/Write MCP Server

vlans.py15.4 kB
#!/usr/bin/env python3 """ IPAM VLAN Management Tools High-level tools for managing NetBox VLANs and VLAN assignments. """ from typing import Dict, Optional, Any import logging from ...registry import mcp_tool from ...client import NetBoxClient logger = logging.getLogger(__name__) @mcp_tool(category="ipam") def netbox_create_vlan( client: NetBoxClient, name: str, vid: int, status: str = "active", description: Optional[str] = None, site: Optional[str] = None, group: Optional[str] = None, tenant: Optional[str] = None, confirm: bool = False ) -> Dict[str, Any]: """ Create a new VLAN in NetBox IPAM. Args: client: NetBoxClient instance (injected) name: VLAN name vid: VLAN ID (1-4094) status: VLAN status (active, reserved, deprecated) description: Optional description site: Optional site name or slug group: Optional VLAN group name tenant: Optional tenant name or slug confirm: Must be True to execute (safety mechanism) Returns: Created VLAN information or error details Example: netbox_create_vlan("Management", 100, status="active", confirm=True) """ try: if not name or not vid: return { "success": False, "error": "VLAN name and VID are required", "error_type": "ValidationError" } if not (1 <= vid <= 4094): return { "success": False, "error": "VID must be between 1 and 4094", "error_type": "ValidationError" } logger.info(f"Creating VLAN: {name} (VID: {vid})") # Build VLAN data vlan_data = { "name": name, "vid": vid, "status": status } if description: vlan_data["description"] = description if site: vlan_data["site"] = site if group: vlan_data["group"] = group if tenant: vlan_data["tenant"] = tenant # Use dynamic API with safety result = client.ipam.vlans.create(confirm=confirm, **vlan_data) return { "success": True, "action": "created", "object_type": "vlan", "vlan": result, "dry_run": result.get("dry_run", False) } except Exception as e: logger.error(f"Failed to create VLAN {name}: {e}") return { "success": False, "error": str(e), "error_type": type(e).__name__ } @mcp_tool(category="ipam") def netbox_find_available_vlan_id( client: NetBoxClient, site: Optional[str] = None, group: Optional[str] = None, start_vid: int = 1, end_vid: int = 4094 ) -> Dict[str, Any]: """ Find available VLAN IDs in a range. Args: client: NetBoxClient instance (injected) site: Optional site name or slug to filter VLANs group: Optional VLAN group name to filter VLANs start_vid: Starting VLAN ID (default: 1) end_vid: Ending VLAN ID (default: 4094) Returns: Available VLAN IDs or error details Example: netbox_find_available_vlan_id(site="main-dc", start_vid=100, end_vid=200) """ try: if not (1 <= start_vid <= 4094) or not (1 <= end_vid <= 4094): return { "success": False, "error": "VLAN IDs must be between 1 and 4094", "error_type": "ValidationError" } if start_vid > end_vid: return { "success": False, "error": "start_vid must be less than or equal to end_vid", "error_type": "ValidationError" } logger.info(f"Finding available VLAN IDs between {start_vid} and {end_vid}") # Build filter vlan_filter = {} if site: vlan_filter["site"] = site if group: vlan_filter["group"] = group # Get existing VLANs existing_vlans = client.ipam.vlans.filter(**vlan_filter) used_vids = {vlan["vid"] for vlan in existing_vlans} # Find available VIDs available_vids = [] for vid in range(start_vid, end_vid + 1): if vid not in used_vids: available_vids.append(vid) return { "success": True, "available_vids": available_vids, "count": len(available_vids), "range": {"start": start_vid, "end": end_vid}, "filter": vlan_filter } except Exception as e: logger.error(f"Failed to find available VLAN IDs: {e}") return { "success": False, "error": str(e), "error_type": type(e).__name__ } @mcp_tool(category="ipam") def netbox_list_all_vlans( client: NetBoxClient, limit: int = 100, site_name: Optional[str] = None, tenant_name: Optional[str] = None, status: Optional[str] = None, group_name: Optional[str] = None, role: Optional[str] = None ) -> Dict[str, Any]: """ Get summarized list of VLANs with optional filtering. This tool provides bulk VLAN discovery across the NetBox IPAM infrastructure, enabling efficient network segmentation management, VLAN planning, and capacity oversight. Essential for network administrators managing VLAN assignments. Args: client: NetBoxClient instance (injected by dependency system) limit: Maximum number of results to return (default: 100) site_name: Filter by site name (optional) tenant_name: Filter by tenant name (optional) status: Filter by VLAN status (active, reserved, deprecated) group_name: Filter by VLAN group name (optional) role: Filter by VLAN role (optional) Returns: Dictionary containing: - count: Total number of VLANs found - vlans: List of summarized VLAN information - filters_applied: Dictionary of filters that were applied - summary_stats: Aggregate statistics about the VLANs Example: netbox_list_all_vlans(status="active", site_name="datacenter-1") netbox_list_all_vlans(tenant_name="customer-a", group_name="production") netbox_list_all_vlans(role="user", limit=50) """ try: logger.info(f"Listing VLANs with filters - site: {site_name}, tenant: {tenant_name}, status: {status}, group: {group_name}") # Build filters dictionary - only include non-None values filters = {} if site_name: filters['site'] = site_name if tenant_name: filters['tenant'] = tenant_name if status: filters['status'] = status if group_name: filters['group'] = group_name if role: filters['role'] = role # Execute filtered query with limit vlans = list(client.ipam.vlans.filter(**filters)) # Apply limit after fetching if len(vlans) > limit: vlans = vlans[:limit] # Generate summary statistics status_counts = {} site_counts = {} tenant_counts = {} group_counts = {} role_counts = {} # VLAN tracking vid_ranges = {"1-100": 0, "101-1000": 0, "1001-4000": 0, "4001-4094": 0} total_vlans = len(vlans) vlans_with_interfaces = 0 for vlan in vlans: # Status breakdown with defensive dictionary access status_obj = vlan.get("status", {}) if isinstance(status_obj, dict): status = status_obj.get("label", "N/A") else: status = str(status_obj) if status_obj else "N/A" status_counts[status] = status_counts.get(status, 0) + 1 # Site breakdown with defensive dictionary access site_obj = vlan.get("site") if site_obj: if isinstance(site_obj, dict): site_name = site_obj.get("name", str(site_obj)) else: site_name = str(site_obj) site_counts[site_name] = site_counts.get(site_name, 0) + 1 # Tenant breakdown with defensive dictionary access tenant_obj = vlan.get("tenant") if tenant_obj: if isinstance(tenant_obj, dict): tenant_name = tenant_obj.get("name", str(tenant_obj)) else: tenant_name = str(tenant_obj) tenant_counts[tenant_name] = tenant_counts.get(tenant_name, 0) + 1 # Group breakdown with defensive dictionary access group_obj = vlan.get("group") if group_obj: if isinstance(group_obj, dict): group_name = group_obj.get("name", str(group_obj)) else: group_name = str(group_obj) group_counts[group_name] = group_counts.get(group_name, 0) + 1 # Role breakdown with defensive dictionary access role_obj = vlan.get("role") if role_obj: if isinstance(role_obj, dict): role_name = role_obj.get("name", str(role_obj)) else: role_name = str(role_obj) role_counts[role_name] = role_counts.get(role_name, 0) + 1 # VID range tracking with defensive dictionary access vid = vlan.get("vid", 0) if 1 <= vid <= 100: vid_ranges["1-100"] += 1 elif 101 <= vid <= 1000: vid_ranges["101-1000"] += 1 elif 1001 <= vid <= 4000: vid_ranges["1001-4000"] += 1 elif 4001 <= vid <= 4094: vid_ranges["4001-4094"] += 1 # Interface assignments (checking if VLAN has interfaces) try: vlan_id = vlan.get("id") vlan_interfaces = list(client.dcim.interfaces.filter(untagged_vlan_id=vlan_id)) tagged_interfaces = list(client.dcim.interfaces.filter(tagged_vlans=vlan_id)) if vlan_interfaces or tagged_interfaces: vlans_with_interfaces += 1 except: pass # Skip interface counting if API call fails # Create human-readable VLAN list vlan_list = [] for vlan in vlans: # Get interface assignments for this specific VLAN untagged_interfaces = [] tagged_interfaces = [] try: vlan_id = vlan.get("id") untagged_interfaces = list(client.dcim.interfaces.filter(untagged_vlan_id=vlan_id)) tagged_interfaces = list(client.dcim.interfaces.filter(tagged_vlans=vlan_id)) except: pass # Skip if interface queries fail # Defensive dictionary access for status status_obj = vlan.get("status", {}) if isinstance(status_obj, dict): status = status_obj.get("label", "N/A") else: status = str(status_obj) if status_obj else "N/A" # Defensive dictionary access for site site_obj = vlan.get("site") site_name = None if site_obj: if isinstance(site_obj, dict): site_name = site_obj.get("name") else: site_name = str(site_obj) # Defensive dictionary access for tenant tenant_obj = vlan.get("tenant") tenant_name = None if tenant_obj: if isinstance(tenant_obj, dict): tenant_name = tenant_obj.get("name") else: tenant_name = str(tenant_obj) # Defensive dictionary access for group group_obj = vlan.get("group") group_name = None if group_obj: if isinstance(group_obj, dict): group_name = group_obj.get("name") else: group_name = str(group_obj) # Defensive dictionary access for role role_obj = vlan.get("role") role_name = None if role_obj: if isinstance(role_obj, dict): role_name = role_obj.get("name") else: role_name = str(role_obj) vlan_info = { "name": vlan.get("name", "Unknown"), "vid": vlan.get("vid"), "status": status, "site": site_name, "tenant": tenant_name, "group": group_name, "role": role_name, "description": vlan.get("description"), "interface_assignments": { "untagged_count": len(untagged_interfaces), "tagged_count": len(tagged_interfaces), "total_interfaces": len(untagged_interfaces) + len(tagged_interfaces) }, "created": vlan.get("created"), "last_updated": vlan.get("last_updated") } vlan_list.append(vlan_info) result = { "count": len(vlan_list), "vlans": vlan_list, "filters_applied": {k: v for k, v in filters.items() if v is not None}, "summary_stats": { "total_vlans": total_vlans, "status_breakdown": status_counts, "site_breakdown": site_counts, "tenant_breakdown": tenant_counts, "group_breakdown": group_counts, "role_breakdown": role_counts, "vid_range_distribution": vid_ranges, "vlans_with_interfaces": vlans_with_interfaces, "vlans_without_interfaces": total_vlans - vlans_with_interfaces, "vlans_with_sites": len([v for v in vlan_list if v['site']]), "vlans_with_tenants": len([v for v in vlan_list if v['tenant']]), "vlans_with_groups": len([v for v in vlan_list if v['group']]), "vlans_with_roles": len([v for v in vlan_list if v['role']]) } } logger.info(f"Found {len(vlan_list)} VLANs matching criteria. Status breakdown: {status_counts}") return result except Exception as e: logger.error(f"Error listing VLANs: {e}") return { "count": 0, "vlans": [], "error": str(e), "error_type": type(e).__name__, "filters_applied": {k: v for k, v in { 'site_name': site_name, 'tenant_name': tenant_name, 'status': status, 'group_name': group_name, 'role': role }.items() if v is not None} }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Deployment-Team/netbox-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server