Skip to main content
Glama

NetBox Read/Write MCP Server

cables.py47.4 kB
#!/usr/bin/env python3 """ DCIM Cable Management Tools High-level tools for managing NetBox cables, cable terminations, and physical connectivity documentation with comprehensive enterprise-grade functionality. """ from typing import Dict, Optional, Any, List import logging from datetime import datetime from ...registry import mcp_tool from ...client import NetBoxClient from ...validation import CableValidator logger = logging.getLogger(__name__) @mcp_tool(category="dcim") def netbox_create_cable_connection( client: NetBoxClient, device_a_name: str, interface_a_name: str, device_b_name: str, interface_b_name: str, cable_type: str = "cat6", cable_status: str = "connected", cable_color: Optional[str] = None, cable_length: Optional[int] = None, cable_length_unit: str = "m", label: Optional[str] = None, description: Optional[str] = None, confirm: bool = False ) -> Dict[str, Any]: """ Create a physical cable connection between two device interfaces. This enterprise-grade cable management tool handles the complete workflow of documenting physical cable connections with comprehensive validation, conflict detection, and cache invalidation to ensure data consistency. Args: client: NetBoxClient instance (injected) device_a_name: Name of the first device interface_a_name: Name of the interface on device A device_b_name: Name of the second device interface_b_name: Name of the interface on device B cable_type: Type of cable (cat5e, cat6, cat6a, cat7, cat8, mmf, smf, dac-active, dac-passive, coaxial, power) cable_status: Cable status (planned, installed, connected, decommissioning) cable_color: Cable color specification (e.g., "pink", "red", "blue", "green", "yellow", "orange", "purple", "grey") cable_length: Optional cable length cable_length_unit: Length unit (mm, cm, m, km, in, ft, mi) label: Optional cable label description: Optional description confirm: Must be True to execute (safety mechanism) Returns: Cable creation result with comprehensive termination information Example: netbox_create_cable_connection( device_a_name="sw-access-01", interface_a_name="GigabitEthernet1/0/1", device_b_name="sw-core-01", interface_b_name="GigabitEthernet1/1", cable_type="cat6", cable_color="pink", cable_length=15, confirm=True ) """ try: if not all([device_a_name, interface_a_name, device_b_name, interface_b_name]): return { "success": False, "error": "Both device names and interface names are required", "error_type": "ValidationError" } # Prevent self-connection if device_a_name == device_b_name and interface_a_name == interface_b_name: return { "success": False, "error": "Cannot connect an interface to itself", "error_type": "ValidationError" } # Validate cable type using shared validator try: cable_type = CableValidator.validate_type(cable_type) except Exception as e: return { "success": False, "error": str(e), "error_type": "ValidationError" } # Validate cable status valid_statuses = ["planned", "installed", "connected", "decommissioning"] if cable_status not in valid_statuses: return { "success": False, "error": f"Invalid cable_status '{cable_status}'. Valid statuses: {valid_statuses}", "error_type": "ValidationError" } # Validate length unit valid_units = ["mm", "cm", "m", "km", "in", "ft", "mi"] if cable_length_unit not in valid_units: return { "success": False, "error": f"Invalid cable_length_unit '{cable_length_unit}'. Valid units: {valid_units}", "error_type": "ValidationError" } # Validate cable color using shared validator try: cable_color = CableValidator.validate_color(cable_color) except Exception as e: return { "success": False, "error": str(e), "error_type": "ValidationError" } logger.info(f"Creating cable connection: {device_a_name}:{interface_a_name} <-> {device_b_name}:{interface_b_name}") # Step 1: Find device A and its interface logger.debug(f"Looking up device A: {device_a_name}") # ULTRATHINK FIX 1: Expand search parameters with defensive handling search_params = { "expand": ["device_type", "device_type__manufacturer", "site", "rack", "tenant", "role"], "limit": 50 } # ULTRATHINK FIX 2: ID resolution with fallback patterns devices_a = None if device_a_name.isdigit(): devices_a = list(client.dcim.devices.filter(id=int(device_a_name), **search_params)) if not devices_a: devices_a = list(client.dcim.devices.filter(name=device_a_name, **search_params)) # ULTRATHINK FIX 4: Slug-based fallback mechanisms if not devices_a: devices_a = list(client.dcim.devices.filter(name__icontains=device_a_name, **search_params)) if not devices_a: return { "success": False, "error": f"Device A '{device_a_name}' not found", "error_type": "NotFoundError" } # ULTRATHINK FIX 3: Defensive dict/object access patterns device_a = devices_a[0] if isinstance(devices_a, list) else devices_a device_a_id = device_a.get("id") if isinstance(device_a, dict) else getattr(device_a, "id", None) if not device_a_id: return { "success": False, "error": f"Device A '{device_a_name}' has no valid ID", "error_type": "DataError" } logger.debug(f"Looking up interface A: {interface_a_name} on device {device_a.get('name', device_a_name)}") # ULTRATHINK FIX 1: Expand interface search parameters interface_search_params = { "device_id": device_a_id, "expand": ["device", "type", "cable"], "limit": 50 } # ULTRATHINK FIX 2: Multi-strategy interface search interfaces_a = list(client.dcim.interfaces.filter(name=interface_a_name, **interface_search_params)) # ULTRATHINK FIX 4: Fallback interface search if not interfaces_a: interfaces_a = list(client.dcim.interfaces.filter(name__icontains=interface_a_name, **interface_search_params)) if not interfaces_a: return { "success": False, "error": f"Interface A '{interface_a_name}' not found on device '{device_a_name}'", "error_type": "NotFoundError" } # ULTRATHINK FIX 3: Defensive interface handling interface_a_dict = interfaces_a[0] if isinstance(interfaces_a, list) else interfaces_a interface_a_id = interface_a_dict.get("id") if isinstance(interface_a_dict, dict) else getattr(interface_a_dict, "id", None) if not interface_a_id: return { "success": False, "error": f"Interface A '{interface_a_name}' has no valid ID", "error_type": "DataError" } # Step 2: Find device B and its interface logger.debug(f"Looking up device B: {device_b_name}") # ULTRATHINK FIX 1: Expand search parameters for device B search_params_b = { "expand": ["device_type", "device_type__manufacturer", "site", "rack", "tenant", "role"], "limit": 50 } # ULTRATHINK FIX 2: ID resolution with fallback patterns for device B devices_b = None if device_b_name.isdigit(): devices_b = list(client.dcim.devices.filter(id=int(device_b_name), **search_params_b)) if not devices_b: devices_b = list(client.dcim.devices.filter(name=device_b_name, **search_params_b)) # ULTRATHINK FIX 4: Slug-based fallback for device B if not devices_b: devices_b = list(client.dcim.devices.filter(name__icontains=device_b_name, **search_params_b)) if not devices_b: return { "success": False, "error": f"Device B '{device_b_name}' not found", "error_type": "NotFoundError" } # ULTRATHINK FIX 3: Defensive dict/object access for device B device_b = devices_b[0] if isinstance(devices_b, list) else devices_b device_b_id = device_b.get("id") if isinstance(device_b, dict) else getattr(device_b, "id", None) if not device_b_id: return { "success": False, "error": f"Device B '{device_b_name}' has no valid ID", "error_type": "DataError" } logger.debug(f"Looking up interface B: {interface_b_name} on device {device_b.get('name', device_b_name)}") # ULTRATHINK FIX 1: Expand interface search parameters for interface B interface_search_params_b = { "device_id": device_b_id, "expand": ["device", "type", "cable"], "limit": 50 } # ULTRATHINK FIX 2: Multi-strategy interface search for interface B interfaces_b = list(client.dcim.interfaces.filter(name=interface_b_name, **interface_search_params_b)) # ULTRATHINK FIX 4: Fallback interface search for interface B if not interfaces_b: interfaces_b = list(client.dcim.interfaces.filter(name__icontains=interface_b_name, **interface_search_params_b)) if not interfaces_b: return { "success": False, "error": f"Interface B '{interface_b_name}' not found on device '{device_b_name}'", "error_type": "NotFoundError" } # ULTRATHINK FIX 3: Defensive interface handling for interface B interface_b_dict = interfaces_b[0] if isinstance(interfaces_b, list) else interfaces_b interface_b_id = interface_b_dict.get("id") if isinstance(interface_b_dict, dict) else getattr(interface_b_dict, "id", None) if not interface_b_id: return { "success": False, "error": f"Interface B '{interface_b_name}' has no valid ID", "error_type": "DataError" } # Step 3: Check for existing cable connections (conflict detection) logger.debug("Checking for existing cable connections...") # Check interface A if interface_a_dict.get("cable"): return { "success": False, "error": f"Interface A '{device_a_name}:{interface_a_name}' already has a cable connection", "error_type": "ConflictError" } # Check interface B if interface_b_dict.get("cable"): return { "success": False, "error": f"Interface B '{device_b_name}:{interface_b_name}' already has a cable connection", "error_type": "ConflictError" } if not confirm: # Dry run mode - return what would be created without actually creating logger.info(f"DRY RUN: Would create cable connection between {device_a_name}:{interface_a_name} and {device_b_name}:{interface_b_name}") return { "success": True, "action": "dry_run", "object_type": "cable", "cable": { "termination_a": {"device": device_a["name"], "interface": interface_a_dict["name"]}, "termination_b": {"device": device_b["name"], "interface": interface_b_dict["name"]}, "type": cable_type, "status": cable_status, "color": cable_color, "length": cable_length, "length_unit": cable_length_unit, "label": label, "dry_run": True }, "dry_run": True } # Step 4: Create the cable with terminations # NetBox API expects termination arrays with object_type and object_id cable_data = { "a_terminations": [{"object_type": "dcim.interface", "object_id": interface_a_id}], "b_terminations": [{"object_type": "dcim.interface", "object_id": interface_b_id}], "type": cable_type, "status": cable_status } if cable_length is not None: cable_data["length"] = cable_length cable_data["length_unit"] = cable_length_unit if label: cable_data["label"] = label if description: cable_data["description"] = description if cable_color: cable_data["color"] = cable_color logger.info(f"Creating cable with termination IDs - A: {interface_a_id}, B: {interface_b_id}") logger.debug(f"Full cable payload: {cable_data}") result = client.dcim.cables.create(confirm=True, **cable_data) # Step 5: Cache invalidation for data consistency (Issue #29 pattern) try: client.cache.invalidate_for_object("dcim.interfaces", interface_a_id) client.cache.invalidate_for_object("dcim.interfaces", interface_b_id) client.cache.invalidate_pattern("dcim.cables") except Exception as cache_error: logger.warning(f"Cache invalidation failed: {cache_error}") return { "success": True, "action": "created", "object_type": "cable", "cable": result, "terminations": { "termination_a": { "device": {"name": device_a["name"], "id": device_a_id}, "interface": {"name": interface_a_dict["name"], "id": interface_a_id} }, "termination_b": { "device": {"name": device_b["name"], "id": device_b_id}, "interface": {"name": interface_b_dict["name"], "id": interface_b_id} } }, "cable_specs": { "type": cable_type, "status": cable_status, "color": cable_color, "length": f"{cable_length}{cable_length_unit}" if cable_length else None, "label": label }, "dry_run": result.get("dry_run", False) } except Exception as e: logger.error(f"Failed to create cable connection: {e}") return { "success": False, "error": str(e), "error_type": type(e).__name__ } @mcp_tool(category="dcim") def netbox_disconnect_cable( client: NetBoxClient, cable_id: Optional[int] = None, device_name: Optional[str] = None, interface_name: Optional[str] = None, confirm: bool = False ) -> Dict[str, Any]: """ Disconnect a cable by removing it from NetBox. Args: client: NetBoxClient instance (injected) cable_id: Specific cable ID to disconnect (optional) device_name: Device name to find cable by interface (optional) interface_name: Interface name to find cable (required if device_name provided) confirm: Must be True to execute (safety mechanism) Returns: Cable disconnection result with details Example: netbox_disconnect_cable(cable_id=123, confirm=True) netbox_disconnect_cable(device_name="sw-01", interface_name="eth0", confirm=True) """ try: if not confirm: return { "success": False, "error": "Confirmation required for cable disconnection", "error_type": "ConfirmationError" } cable = None # Find cable by ID if cable_id: cables = client.dcim.cables.filter(id=cable_id) if not cables: return { "success": False, "error": f"Cable ID {cable_id} not found", "error_type": "NotFoundError" } cable = cables[0] # Find cable by device interface elif device_name and interface_name: devices = client.dcim.devices.filter(name=device_name) if not devices: return { "success": False, "error": f"Device '{device_name}' not found", "error_type": "NotFoundError" } device = devices[0] interfaces = client.dcim.interfaces.filter(device_id=device["id"], name=interface_name) if not interfaces: return { "success": False, "error": f"Interface '{interface_name}' not found on device '{device_name}'", "error_type": "NotFoundError" } interface = interfaces[0] if not interface.get("cable"): return { "success": False, "error": f"No cable connected to interface '{device_name}:{interface_name}'", "error_type": "NotFoundError" } cable_id = interface["cable"]["id"] cables = client.dcim.cables.filter(id=cable_id) if cables: cable = cables[0] else: return { "success": False, "error": "Either cable_id or both device_name and interface_name must be provided", "error_type": "ValidationError" } if not cable: return { "success": False, "error": "Cable not found", "error_type": "NotFoundError" } logger.info(f"Disconnecting cable ID: {cable['id']}") # Delete the cable result = client.dcim.cables.delete(cable["id"], confirm=True) # Cache invalidation for data consistency try: client.cache.invalidate_pattern("dcim.cables") client.cache.invalidate_pattern("dcim.interfaces") except Exception as cache_error: logger.warning(f"Cache invalidation failed: {cache_error}") return { "success": True, "action": "disconnected", "object_type": "cable", "disconnected_cable": { "id": cable["id"], "type": cable.get("type", "N/A"), "status": cable.get("status", "N/A"), "label": cable.get("label", "N/A") } } except Exception as e: logger.error(f"Failed to disconnect cable: {e}") return { "success": False, "error": str(e), "error_type": type(e).__name__ } @mcp_tool(category="dcim") def netbox_get_cable_info( client: NetBoxClient, cable_id: Optional[int] = None, device_name: Optional[str] = None, interface_name: Optional[str] = None ) -> Dict[str, Any]: """ Get detailed information about a specific cable. Args: client: NetBoxClient instance (injected) cable_id: Specific cable ID to query (optional) device_name: Device name to find cable by interface (optional) interface_name: Interface name to find cable (required if device_name provided) Returns: Detailed cable information including terminations Example: netbox_get_cable_info(cable_id=123) netbox_get_cable_info(device_name="sw-01", interface_name="eth0") """ try: cable = None # Find cable by ID if cable_id: cables = client.dcim.cables.filter(id=cable_id) if not cables: return { "success": False, "error": f"Cable ID {cable_id} not found", "error_type": "NotFoundError" } cable = cables[0] # Find cable by device interface elif device_name and interface_name: devices = client.dcim.devices.filter(name=device_name) if not devices: return { "success": False, "error": f"Device '{device_name}' not found", "error_type": "NotFoundError" } device = devices[0] interfaces = client.dcim.interfaces.filter(device_id=device["id"], name=interface_name) if not interfaces: return { "success": False, "error": f"Interface '{interface_name}' not found on device '{device_name}'", "error_type": "NotFoundError" } interface = interfaces[0] if not interface.get("cable"): return { "success": False, "error": f"No cable connected to interface '{device_name}:{interface_name}'", "error_type": "NotFoundError" } cable_id = interface["cable"]["id"] cables = client.dcim.cables.filter(id=cable_id) if cables: cable = cables[0] else: return { "success": False, "error": "Either cable_id or both device_name and interface_name must be provided", "error_type": "ValidationError" } if not cable: return { "success": False, "error": "Cable not found", "error_type": "NotFoundError" } # Get termination details using defensive dictionary access termination_a_info = {} termination_b_info = {} if cable.get("termination_a_type") == "dcim.interface" and cable.get("termination_a_id"): interface_a = client.dcim.interfaces.get(cable["termination_a_id"]) if interface_a: device_a = None if interface_a.get("device") and interface_a["device"].get("id"): device_a = client.dcim.devices.get(interface_a["device"]["id"]) termination_a_info = { "interface": { "id": interface_a.get("id"), "name": interface_a.get("name", "N/A"), "type": interface_a.get("type", {}).get("label", "N/A") if isinstance(interface_a.get("type"), dict) else str(interface_a.get("type", "N/A")) }, "device": { "id": device_a.get("id") if device_a else None, "name": device_a.get("name", "N/A") if device_a else "N/A" } if device_a else {} } if cable.get("termination_b_type") == "dcim.interface" and cable.get("termination_b_id"): interface_b = client.dcim.interfaces.get(cable["termination_b_id"]) if interface_b: device_b = None if interface_b.get("device") and interface_b["device"].get("id"): device_b = client.dcim.devices.get(interface_b["device"]["id"]) termination_b_info = { "interface": { "id": interface_b.get("id"), "name": interface_b.get("name", "N/A"), "type": interface_b.get("type", {}).get("label", "N/A") if isinstance(interface_b.get("type"), dict) else str(interface_b.get("type", "N/A")) }, "device": { "id": device_b.get("id") if device_b else None, "name": device_b.get("name", "N/A") if device_b else "N/A" } if device_b else {} } # Build comprehensive cable information cable_info = { "id": cable.get("id"), "type": cable.get("type", "N/A"), "status": cable.get("status", {}).get("label", "N/A") if isinstance(cable.get("status"), dict) else str(cable.get("status", "N/A")), "label": cable.get("label", "N/A"), "description": cable.get("description", "N/A"), "length": cable.get("length"), "length_unit": cable.get("length_unit", "N/A") if cable.get("length") else None, "terminations": { "termination_a": termination_a_info, "termination_b": termination_b_info }, "created": cable.get("created"), "last_updated": cable.get("last_updated") } return { "success": True, "cable": cable_info } except Exception as e: logger.error(f"Failed to get cable info: {e}") return { "success": False, "error": str(e), "error_type": type(e).__name__ } @mcp_tool(category="dcim") def netbox_list_all_cables( client: NetBoxClient, limit: int = 100, site_name: Optional[str] = None, cable_type: Optional[str] = None, cable_status: Optional[str] = None ) -> Dict[str, Any]: """ Get summarized list of cables with optional filtering (dual-tool pattern). Args: client: NetBoxClient instance (injected) limit: Maximum number of cables to return site_name: Filter by site name (optional) cable_type: Filter by cable type (optional) cable_status: Filter by cable status (optional) Returns: Comprehensive cable list with summary statistics Example: netbox_list_all_cables() netbox_list_all_cables(site_name="datacenter-1", cable_type="cat6") """ try: # Build filter parameters filter_params = {"limit": limit} # Apply filters if provided if cable_type: filter_params["type"] = cable_type if cable_status: filter_params["status"] = cable_status logger.info(f"Fetching cables with filters: {filter_params}") cables = client.dcim.cables.filter(**filter_params) if not cables: return { "success": True, "cables": [], "summary": { "total_count": 0, "message": "No cables found with the specified criteria" } } # Process cables with defensive dictionary access cable_list = [] status_counts = {} type_counts = {} length_stats = {"total_length": 0, "with_length": 0} for cable in cables: # Safe dictionary access for status status_obj = cable.get("status", {}) if isinstance(status_obj, dict): status = status_obj.get("label", "N/A") else: status = str(status_obj) if status_obj else "N/A" # Count statistics status_counts[status] = status_counts.get(status, 0) + 1 cable_type_val = cable.get("type", "N/A") type_counts[cable_type_val] = type_counts.get(cable_type_val, 0) + 1 # Length statistics if cable.get("length"): length_stats["total_length"] += cable["length"] length_stats["with_length"] += 1 # Get termination summary termination_summary = "N/A -> N/A" if (cable.get("termination_a_type") == "dcim.interface" and cable.get("termination_b_type") == "dcim.interface"): # Try to get device names from terminations device_a_name = "Device A" device_b_name = "Device B" try: if cable.get("termination_a_id"): interface_a = client.dcim.interfaces.get(cable["termination_a_id"]) if interface_a and interface_a.get("device"): device_a = client.dcim.devices.get(interface_a["device"]["id"]) if device_a: device_a_name = device_a.get("name", "Device A") if cable.get("termination_b_id"): interface_b = client.dcim.interfaces.get(cable["termination_b_id"]) if interface_b and interface_b.get("device"): device_b = client.dcim.devices.get(interface_b["device"]["id"]) if device_b: device_b_name = device_b.get("name", "Device B") termination_summary = f"{device_a_name} -> {device_b_name}" except Exception: # Fallback to generic summary termination_summary = "Interface -> Interface" cable_info = { "id": cable.get("id"), "type": cable_type_val, "status": status, "label": cable.get("label", "N/A"), "length": f"{cable.get('length')}{cable.get('length_unit', 'm')}" if cable.get("length") else "Not specified", "termination_summary": termination_summary, "last_updated": cable.get("last_updated") } cable_list.append(cable_info) # Generate summary statistics summary = { "total_count": len(cable_list), "status_breakdown": status_counts, "type_breakdown": type_counts, "length_statistics": { "cables_with_length": length_stats["with_length"], "total_length": f"{length_stats['total_length']}m" if length_stats["total_length"] > 0 else "Not available", "average_length": f"{length_stats['total_length'] / length_stats['with_length']:.1f}m" if length_stats["with_length"] > 0 else "Not available" }, "filters_applied": { "site_name": site_name, "cable_type": cable_type, "cable_status": cable_status, "limit": limit } } return { "success": True, "cables": cable_list, "summary": summary } except Exception as e: logger.error(f"Failed to list cables: {e}") return { "success": False, "error": str(e), "error_type": type(e).__name__ } @mcp_tool(category="dcim") def netbox_bulk_create_cable_connections( client: NetBoxClient, cable_connections: List[Dict[str, str]], cable_type: str = "cat6", cable_color: Optional[str] = None, cable_status: str = "connected", cable_length: Optional[int] = None, cable_length_unit: str = "m", batch_size: int = 10, rollback_on_error: bool = True, confirm: bool = False ) -> Dict[str, Any]: """ Create multiple cable connections in bulk operation with rollback support. This enterprise-grade bulk cable management tool handles mass cable creation with comprehensive error handling, progress tracking, and rollback capabilities for production infrastructure deployments. Args: client: NetBoxClient instance (injected) cable_connections: List of connection specifications: [ { "device_a_name": "XM13", "interface_a_name": "lom1", "device_b_name": "switch1.K3", "interface_b_name": "Te1/1/1" }, ... ] cable_type: Type of cable for all connections (cat5e, cat6, cat6a, cat7, cat8, etc.) cable_color: Color for all cables in batch (e.g., "pink", "red", "blue") cable_status: Cable status for all connections (planned, installed, connected, decommissioning) cable_length: Optional cable length for all connections cable_length_unit: Length unit (mm, cm, m, km, in, ft, mi) batch_size: Number of cables to create per batch (default: 10) rollback_on_error: Remove successfully created cables if batch fails (default: True) confirm: Must be True to execute (safety mechanism) Returns: Bulk operation results with success/failure details and rollback information Example: netbox_bulk_create_cable_connections( cable_connections=[ { "device_a_name": "XM13", "interface_a_name": "lom1", "device_b_name": "switch1.K3", "interface_b_name": "Te1/1/1" }, { "device_a_name": "XM14", "interface_a_name": "lom1", "device_b_name": "switch1.K3", "interface_b_name": "Te1/1/2" } ], cable_type="cat6", cable_color="pink", batch_size=5, confirm=True ) """ class BulkCableOperationResult: def __init__(self): self.successful_connections = [] self.failed_connections = [] self.rollback_actions = [] self.total_attempted = 0 self.start_time = datetime.now() self.end_time = None def add_success(self, connection_spec, cable_result): self.successful_connections.append({ "connection": connection_spec, "cable_id": cable_result.get("cable", {}).get("id"), "cable_result": cable_result, "created_at": datetime.now() }) def add_failure(self, connection_spec, error): self.failed_connections.append({ "connection": connection_spec, "error": str(error), "error_type": type(error).__name__, "failed_at": datetime.now() }) def add_rollback(self, cable_id, rollback_result): self.rollback_actions.append({ "cable_id": cable_id, "rollback_result": rollback_result, "rolled_back_at": datetime.now() }) def calculate_success_rate(self): if self.total_attempted == 0: return 0.0 return len(self.successful_connections) / self.total_attempted * 100 def finalize(self): self.end_time = datetime.now() def get_summary(self): duration = (self.end_time - self.start_time).total_seconds() if self.end_time else 0 return { "total_attempted": self.total_attempted, "successful_count": len(self.successful_connections), "failed_count": len(self.failed_connections), "success_rate": f"{self.calculate_success_rate():.1f}%", "rollback_count": len(self.rollback_actions), "duration_seconds": duration, "batch_size": batch_size } try: # Validate input parameters if not cable_connections: return { "success": False, "error": "No cable connections provided", "error_type": "ValidationError" } if not isinstance(cable_connections, list): return { "success": False, "error": "cable_connections must be a list of connection specifications", "error_type": "ValidationError" } # Validate each connection specification required_fields = ["device_a_name", "interface_a_name", "device_b_name", "interface_b_name"] for i, connection in enumerate(cable_connections): if not isinstance(connection, dict): return { "success": False, "error": f"Connection {i+1} must be a dictionary", "error_type": "ValidationError" } missing_fields = [field for field in required_fields if field not in connection] if missing_fields: return { "success": False, "error": f"Connection {i+1} missing required fields: {missing_fields}", "error_type": "ValidationError" } # Validate cable parameters using shared validator try: cable_type = CableValidator.validate_type(cable_type) except Exception as e: return { "success": False, "error": str(e), "error_type": "ValidationError" } valid_statuses = ["planned", "installed", "connected", "decommissioning"] if cable_status not in valid_statuses: return { "success": False, "error": f"Invalid cable_status '{cable_status}'. Valid statuses: {valid_statuses}", "error_type": "ValidationError" } try: cable_color = CableValidator.validate_color(cable_color) except Exception as e: return { "success": False, "error": str(e), "error_type": "ValidationError" } logger.info(f"Starting bulk cable creation: {len(cable_connections)} connections, batch_size={batch_size}") # Initialize operation tracking operation_result = BulkCableOperationResult() operation_result.total_attempted = len(cable_connections) if not confirm: # Dry run mode - return what would be created without actually creating logger.info(f"DRY RUN: Would create {len(cable_connections)} cable connections") return { "success": True, "action": "dry_run", "object_type": "bulk_cables", "bulk_operation": { "total_connections": len(cable_connections), "cable_type": cable_type, "cable_color": cable_color, "cable_status": cable_status, "batch_size": batch_size, "rollback_on_error": rollback_on_error, "connections_preview": cable_connections[:5] # Show first 5 for preview }, "dry_run": True } # Process connections in batches for batch_start in range(0, len(cable_connections), batch_size): batch_end = min(batch_start + batch_size, len(cable_connections)) batch_connections = cable_connections[batch_start:batch_end] logger.info(f"Processing batch {batch_start//batch_size + 1}: connections {batch_start+1}-{batch_end}") batch_success_count = 0 batch_failure_count = 0 # Process each connection in the batch for connection in batch_connections: try: # Use the existing single cable creation tool for each connection cable_result = netbox_create_cable_connection( client=client, device_a_name=connection["device_a_name"], interface_a_name=connection["interface_a_name"], device_b_name=connection["device_b_name"], interface_b_name=connection["interface_b_name"], cable_type=cable_type, cable_status=cable_status, cable_color=cable_color, cable_length=cable_length, cable_length_unit=cable_length_unit, label=connection.get("label"), description=connection.get("description"), confirm=True ) if cable_result.get("success"): operation_result.add_success(connection, cable_result) batch_success_count += 1 logger.debug(f"Successfully created cable: {connection['device_a_name']}:{connection['interface_a_name']} -> {connection['device_b_name']}:{connection['interface_b_name']}") else: operation_result.add_failure(connection, cable_result.get("error", "Unknown error")) batch_failure_count += 1 logger.warning(f"Failed to create cable: {connection['device_a_name']}:{connection['interface_a_name']} -> {connection['device_b_name']}:{connection['interface_b_name']}, Error: {cable_result.get('error')}") except Exception as e: operation_result.add_failure(connection, str(e)) batch_failure_count += 1 logger.error(f"Exception creating cable: {connection['device_a_name']}:{connection['interface_a_name']} -> {connection['device_b_name']}:{connection['interface_b_name']}, Error: {e}") # Check if rollback is needed for this batch if rollback_on_error and batch_failure_count > 0: logger.warning(f"Batch {batch_start//batch_size + 1} had {batch_failure_count} failures, initiating rollback") # Rollback successful connections from this batch for success_record in operation_result.successful_connections[-batch_success_count:]: try: cable_id = success_record["cable_id"] if cable_id: logger.info(f"Rolling back cable ID: {cable_id}") rollback_result = netbox_disconnect_cable( client=client, cable_id=cable_id, confirm=True ) operation_result.add_rollback(cable_id, rollback_result) except Exception as rollback_error: logger.error(f"Failed to rollback cable {cable_id}: {rollback_error}") # Remove the rolled-back successes from the success list operation_result.successful_connections = operation_result.successful_connections[:-batch_success_count] # If rollback_on_error is enabled, stop processing remaining batches logger.error(f"Stopping bulk operation due to batch failures and rollback_on_error=True") break # Finalize operation operation_result.finalize() # Cache invalidation for data consistency try: client.cache.invalidate_pattern("dcim.cables") client.cache.invalidate_pattern("dcim.interfaces") except Exception as cache_error: logger.warning(f"Cache invalidation failed: {cache_error}") # Determine overall success success_rate = operation_result.calculate_success_rate() overall_success = success_rate > 0 and (not rollback_on_error or len(operation_result.failed_connections) == 0) return { "success": overall_success, "action": "bulk_created", "object_type": "bulk_cables", "operation_summary": operation_result.get_summary(), "successful_connections": [ { "device_a_name": conn["connection"]["device_a_name"], "interface_a_name": conn["connection"]["interface_a_name"], "device_b_name": conn["connection"]["device_b_name"], "interface_b_name": conn["connection"]["interface_b_name"], "cable_id": conn["cable_id"], "created_at": conn["created_at"].isoformat() } for conn in operation_result.successful_connections ], "failed_connections": [ { "device_a_name": conn["connection"]["device_a_name"], "interface_a_name": conn["connection"]["interface_a_name"], "device_b_name": conn["connection"]["device_b_name"], "interface_b_name": conn["connection"]["interface_b_name"], "error": conn["error"], "error_type": conn["error_type"], "failed_at": conn["failed_at"].isoformat() } for conn in operation_result.failed_connections ], "rollback_actions": [ { "cable_id": action["cable_id"], "rollback_success": action["rollback_result"].get("success", False), "rolled_back_at": action["rolled_back_at"].isoformat() } for action in operation_result.rollback_actions ], "cable_specs": { "type": cable_type, "color": cable_color, "status": cable_status, "length": f"{cable_length}{cable_length_unit}" if cable_length else None, "batch_size": batch_size, "rollback_on_error": rollback_on_error }, "dry_run": False } except Exception as e: logger.error(f"Failed to create bulk cable connections: {e}") return { "success": False, "error": str(e), "error_type": type(e).__name__ } # TODO: Future cable management tools: # - netbox_trace_cable_path: Follow cable connections through multiple hops # - netbox_validate_cable_terminations: Check for proper cable termination patterns # - netbox_audit_cable_inventory: Generate cable audit reports

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Deployment-Team/netbox-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server