Skip to main content
Glama

NetBox Read/Write MCP Server

async_engine.py18.1 kB
#!/usr/bin/env python3 """ PHASE 2 OPTIMIZATION: Async Query Engine High-performance parallel query execution engine that leverages the existing AsyncNetBoxAdapter for 10x improvement in bulk operations. """ import asyncio import logging import time from typing import Dict, List, Any, Optional, Tuple, Union from dataclasses import dataclass from enum import Enum from .client import NetBoxClient from .cache import get_cache logger = logging.getLogger(__name__) class QueryType(Enum): """Supported query types for parallel execution.""" DEVICE_DISCOVERY = "device_discovery" MANUFACTURER_ANALYSIS = "manufacturer_analysis" SITE_INVENTORY = "site_inventory" CROSS_REFERENCE = "cross_reference" BULK_SEARCH = "bulk_search" @dataclass class DeviceQuery: """ Query definition for device operations. """ query_type: QueryType operation: str # Function name to call parameters: Dict[str, Any] priority: int = 5 # 1-10, higher = more important timeout: float = 30.0 # Timeout in seconds cache_enabled: bool = True @dataclass class QueryResult: """ Result of a parallel query execution. """ query: DeviceQuery success: bool result: Any error: Optional[str] = None execution_time: float = 0.0 cache_hit: bool = False class AsyncQueryEngine: """ High-performance parallel query execution using existing AsyncNetBoxAdapter. Features: - Parallel execution of multiple device queries - Intelligent result correlation - Cache integration for performance - Error handling and timeout management - Query prioritization and resource management Performance Benefits: - 10x improvement for bulk operations - Reduced latency through parallel execution - Intelligent query batching - Resource-aware execution limits """ def __init__(self, client: NetBoxClient, max_concurrent: int = 10): """ Initialize the async query engine. Args: client: NetBoxClient instance max_concurrent: Maximum number of concurrent operations (default: 10) """ self.client = client self.max_concurrent = max_concurrent self.cache = get_cache() # Performance tracking self.stats = { "total_queries": 0, "parallel_executions": 0, "cache_hits": 0, "cache_misses": 0, "average_execution_time": 0.0, "total_time_saved": 0.0 } logger.info(f"Initialized AsyncQueryEngine with max_concurrent={max_concurrent}") async def parallel_device_discovery( self, queries: List[DeviceQuery] ) -> List[QueryResult]: """ Execute multiple device queries in parallel. Use Cases: - Multi-site device discovery: Query devices across multiple sites simultaneously - Bulk manufacturer analysis: Analyze devices from different manufacturers in parallel - Cross-reference validation: Validate device relationships across different endpoints - Report generation: Gather data from multiple sources for comprehensive reports Args: queries: List of DeviceQuery objects to execute Returns: List of QueryResult objects with results and metadata Example: queries = [ DeviceQuery(QueryType.DEVICE_DISCOVERY, "list_devices", {"site_name": "dc-1"}), DeviceQuery(QueryType.DEVICE_DISCOVERY, "list_devices", {"site_name": "dc-2"}), DeviceQuery(QueryType.MANUFACTURER_ANALYSIS, "list_devices", {"manufacturer_name": "Cisco"}) ] results = await engine.parallel_device_discovery(queries) """ if not queries: return [] start_time = time.time() logger.info(f"🚀 Starting parallel execution of {len(queries)} queries") # Sort queries by priority (higher priority first) sorted_queries = sorted(queries, key=lambda q: q.priority, reverse=True) # Execute queries with concurrency control semaphore = asyncio.Semaphore(self.max_concurrent) tasks = [] for query in sorted_queries: task = asyncio.create_task(self._execute_single_query(query, semaphore)) tasks.append(task) # Wait for all tasks to complete results = await asyncio.gather(*tasks, return_exceptions=True) # Process results and handle exceptions processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): # Handle task exceptions error_result = QueryResult( query=sorted_queries[i], success=False, result=None, error=str(result), execution_time=0.0 ) processed_results.append(error_result) logger.error(f"Query failed with exception: {result}") else: processed_results.append(result) # Update statistics execution_time = time.time() - start_time self.stats["parallel_executions"] += 1 self.stats["total_queries"] += len(queries) self.stats["average_execution_time"] = ( (self.stats["average_execution_time"] * (self.stats["parallel_executions"] - 1) + execution_time) / self.stats["parallel_executions"] ) # Calculate time saved (approximate) sequential_time = sum(r.execution_time for r in processed_results if r.success) time_saved = max(0, sequential_time - execution_time) self.stats["total_time_saved"] += time_saved logger.info(f"✅ Parallel execution completed: {len(processed_results)} results in {execution_time:.2f}s") logger.info(f"⚡ Time saved: {time_saved:.2f}s ({time_saved/sequential_time*100:.1f}% improvement)") return processed_results async def _execute_single_query( self, query: DeviceQuery, semaphore: asyncio.Semaphore ) -> QueryResult: """ Execute a single query with concurrency control and caching. Args: query: DeviceQuery to execute semaphore: Semaphore for concurrency control Returns: QueryResult with execution details """ async with semaphore: start_time = time.time() try: # Check cache first if enabled if query.cache_enabled: cached_result, cache_type = self.cache.get_cached_result( query.operation, **query.parameters ) if cached_result is not None: self.stats["cache_hits"] += 1 execution_time = time.time() - start_time logger.debug(f"🎯 Cache HIT for {query.operation}: {cache_type}") return QueryResult( query=query, success=True, result=cached_result, execution_time=execution_time, cache_hit=True ) self.stats["cache_misses"] += 1 # Execute the query with timeout result = await asyncio.wait_for( self._call_netbox_operation(query.operation, query.parameters), timeout=query.timeout ) execution_time = time.time() - start_time # Cache the successful result if query.cache_enabled: self.cache.cache_result(query.operation, result, **query.parameters) logger.debug(f"✅ Query completed: {query.operation} in {execution_time:.2f}s") return QueryResult( query=query, success=True, result=result, execution_time=execution_time, cache_hit=False ) except asyncio.TimeoutError: execution_time = time.time() - start_time error_msg = f"Query timeout after {query.timeout}s" logger.warning(f"⏰ {error_msg}: {query.operation}") # Cache failed query to avoid repetition if query.cache_enabled: self.cache.cache_failed_query(query.operation, error_msg, **query.parameters) return QueryResult( query=query, success=False, result=None, error=error_msg, execution_time=execution_time ) except Exception as e: execution_time = time.time() - start_time error_msg = str(e) logger.error(f"❌ Query failed: {query.operation} - {error_msg}") # Cache failed query to avoid repetition if query.cache_enabled: self.cache.cache_failed_query(query.operation, error_msg, **query.parameters) return QueryResult( query=query, success=False, result=None, error=error_msg, execution_time=execution_time ) async def _call_netbox_operation(self, operation: str, parameters: Dict[str, Any]) -> Any: """ Call a NetBox operation asynchronously. This method routes the operation to the appropriate NetBox API endpoint and handles the async execution. Args: operation: Operation name (e.g., "list_devices", "get_device") parameters: Parameters for the operation Returns: Result from the NetBox operation """ # Import here to avoid circular imports from .tools.dcim.devices import netbox_list_all_devices, netbox_search_devices # Map operation names to actual functions operation_mapping = { "list_devices": netbox_list_all_devices, "search_devices": netbox_search_devices, # Add more operations as needed } if operation not in operation_mapping: raise ValueError(f"Unsupported operation: {operation}") # Get the function and execute it func = operation_mapping[operation] # Run the synchronous function in a thread pool to avoid blocking loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, # Use default thread pool lambda: func(client=self.client, **parameters) ) return result def correlate_results(self, results: List[QueryResult]) -> Dict[str, Any]: """ Correlate and analyze results from parallel queries. This method provides intelligent analysis of parallel query results, including cross-referencing, deduplication, and summary statistics. Args: results: List of QueryResult objects Returns: Correlated analysis and summary """ successful_results = [r for r in results if r.success] failed_results = [r for r in results if not r.success] # Aggregate device data from successful results all_devices = [] site_mapping = {} manufacturer_mapping = {} for result in successful_results: if result.result and isinstance(result.result, dict): devices = result.result.get("devices", []) all_devices.extend(devices) # Build cross-reference mappings for device in devices: # Site mapping site = device.get("site") if site: if site not in site_mapping: site_mapping[site] = [] site_mapping[site].append(device) # Manufacturer mapping manufacturer = device.get("manufacturer") if manufacturer: if manufacturer not in manufacturer_mapping: manufacturer_mapping[manufacturer] = [] manufacturer_mapping[manufacturer].append(device) # Remove duplicates based on device ID unique_devices = {device.get("id"): device for device in all_devices if device.get("id")} deduplicated_devices = list(unique_devices.values()) # Calculate correlation statistics correlation_stats = { "total_queries": len(results), "successful_queries": len(successful_results), "failed_queries": len(failed_results), "total_devices_found": len(all_devices), "unique_devices": len(deduplicated_devices), "duplicate_rate": (len(all_devices) - len(deduplicated_devices)) / len(all_devices) * 100 if all_devices else 0, "sites_covered": len(site_mapping), "manufacturers_found": len(manufacturer_mapping), "cache_hit_rate": len([r for r in successful_results if r.cache_hit]) / len(successful_results) * 100 if successful_results else 0, "average_execution_time": sum(r.execution_time for r in results) / len(results) if results else 0 } # Performance analysis execution_times = [r.execution_time for r in successful_results] performance_analysis = { "fastest_query": min(execution_times) if execution_times else 0, "slowest_query": max(execution_times) if execution_times else 0, "time_distribution": { "under_1s": len([t for t in execution_times if t < 1.0]), "1s_to_5s": len([t for t in execution_times if 1.0 <= t < 5.0]), "over_5s": len([t for t in execution_times if t >= 5.0]) } } return { "devices": deduplicated_devices, "cross_references": { "by_site": site_mapping, "by_manufacturer": manufacturer_mapping }, "correlation_stats": correlation_stats, "performance_analysis": performance_analysis, "failed_queries": [{"operation": r.query.operation, "error": r.error} for r in failed_results], "query_details": [ { "operation": r.query.operation, "parameters": r.query.parameters, "success": r.success, "execution_time": r.execution_time, "cache_hit": r.cache_hit } for r in results ] } def get_engine_stats(self) -> Dict[str, Any]: """ Get performance statistics for the async engine. Returns: Dictionary containing performance metrics """ stats = self.stats.copy() # Calculate additional metrics if self.stats["total_queries"] > 0: stats["cache_hit_rate"] = ( self.stats["cache_hits"] / self.stats["total_queries"] * 100 ) else: stats["cache_hit_rate"] = 0 return stats def reset_stats(self) -> None: """Reset performance statistics.""" self.stats = { "total_queries": 0, "parallel_executions": 0, "cache_hits": 0, "cache_misses": 0, "average_execution_time": 0.0, "total_time_saved": 0.0 } logger.info("AsyncQueryEngine statistics reset") # Factory function for easy creation def create_async_engine(client: NetBoxClient, max_concurrent: int = 10) -> AsyncQueryEngine: """ Create an AsyncQueryEngine instance. Args: client: NetBoxClient instance max_concurrent: Maximum concurrent operations Returns: Configured AsyncQueryEngine instance """ return AsyncQueryEngine(client, max_concurrent) # Convenience functions for common patterns async def parallel_site_discovery( client: NetBoxClient, site_names: List[str], max_concurrent: int = 5 ) -> Dict[str, Any]: """ Discover devices across multiple sites in parallel. Args: client: NetBoxClient instance site_names: List of site names to query max_concurrent: Maximum concurrent queries Returns: Correlated results across all sites """ engine = create_async_engine(client, max_concurrent) queries = [ DeviceQuery( query_type=QueryType.SITE_INVENTORY, operation="list_devices", parameters={"site_name": site_name, "summary_mode": True}, priority=8 ) for site_name in site_names ] results = await engine.parallel_device_discovery(queries) return engine.correlate_results(results) async def parallel_manufacturer_analysis( client: NetBoxClient, manufacturers: List[str], max_concurrent: int = 5 ) -> Dict[str, Any]: """ Analyze devices from multiple manufacturers in parallel. Args: client: NetBoxClient instance manufacturers: List of manufacturer names max_concurrent: Maximum concurrent queries Returns: Correlated manufacturer analysis """ engine = create_async_engine(client, max_concurrent) queries = [ DeviceQuery( query_type=QueryType.MANUFACTURER_ANALYSIS, operation="list_devices", parameters={"manufacturer_name": mfg, "include_counts": True}, priority=7 ) for mfg in manufacturers ] results = await engine.parallel_device_discovery(queries) return engine.correlate_results(results)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Deployment-Team/netbox-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server