Skip to main content
Glama
sitemap.py14.7 kB
"""Network site mapping and device tracking functionality.""" import json from dataclasses import asdict, dataclass from datetime import datetime from typing import Any from .database import calculate_data_hash, get_database_adapter @dataclass class NetworkDevice: """Represents a discovered network device.""" hostname: str connection_ip: str last_seen: str status: str # success, error cpu_model: str | None = None cpu_cores: int | None = None memory_total: str | None = None memory_used: str | None = None memory_free: str | None = None memory_available: str | None = None disk_filesystem: str | None = None disk_size: str | None = None disk_used: str | None = None disk_available: str | None = None disk_use_percent: str | None = None disk_mount: str | None = None network_interfaces: str | None = None # JSON string uptime: str | None = None os_info: str | None = None error_message: str | None = None class NetworkSiteMap: """Manages the network site map database.""" def __init__( self, db_path: str | None = None, db_type: str | None = None, **db_kwargs: Any ): """Initialize the site map with database connection.""" self.db_adapter = get_database_adapter( db_type=db_type, db_path=db_path, **db_kwargs ) self._init_database() def _init_database(self) -> None: """Initialize the database schema.""" self.db_adapter.init_schema() def parse_discovery_output(self, discovery_json: str) -> NetworkDevice: """Parse SSH discovery output into a NetworkDevice object.""" try: data = json.loads(discovery_json) device = NetworkDevice( hostname=data.get("hostname", ""), connection_ip=data.get("connection_ip", ""), last_seen=datetime.now().isoformat(), status=data.get("status", "error"), ) if data.get("status") == "success" and "data" in data: discovery_data = data["data"] # CPU information if "cpu" in discovery_data: cpu_info = discovery_data["cpu"] device.cpu_model = cpu_info.get("model") try: device.cpu_cores = int(cpu_info.get("cores", 0)) except (ValueError, TypeError): device.cpu_cores = None # Memory information if "memory" in discovery_data: mem_info = discovery_data["memory"] device.memory_total = mem_info.get("total") device.memory_used = mem_info.get("used") device.memory_free = mem_info.get("free") device.memory_available = mem_info.get("available") # Disk information if "disk" in discovery_data: disk_info = discovery_data["disk"] device.disk_filesystem = disk_info.get("filesystem") device.disk_size = disk_info.get("size") device.disk_used = disk_info.get("used") device.disk_available = disk_info.get("available") device.disk_use_percent = disk_info.get("use_percent") device.disk_mount = disk_info.get("mount") # Network interfaces (store as JSON) if "network" in discovery_data: device.network_interfaces = json.dumps(discovery_data["network"]) # System information device.uptime = discovery_data.get("uptime") device.os_info = discovery_data.get("os") elif data.get("status") == "error": device.error_message = data.get("error", "Unknown error") return device except json.JSONDecodeError as e: # Create error device for invalid JSON return NetworkDevice( hostname="unknown", connection_ip="unknown", last_seen=datetime.now().isoformat(), status="error", error_message=f"JSON parse error: {str(e)}", ) def store_device(self, device: NetworkDevice) -> int: """Store or update a device in the database.""" device_data = asdict(device) return self.db_adapter.store_device(device_data) def store_discovery_history(self, device_id: int, discovery_data: str) -> None: """Store discovery data in history for change tracking.""" data_hash = calculate_data_hash(discovery_data) self.db_adapter.store_discovery_history(device_id, discovery_data, data_hash) def get_all_devices(self) -> list[dict[str, Any]]: """Get all devices from the database.""" return self.db_adapter.get_all_devices() def get_device_changes( self, device_id: int, limit: int = 10 ) -> list[dict[str, Any]]: """Get change history for a specific device.""" return self.db_adapter.get_device_changes(device_id, limit) def analyze_network_topology(self) -> dict[str, Any]: """Analyze the network topology and provide insights.""" devices = self.get_all_devices() analysis = { "total_devices": len(devices), "online_devices": len([d for d in devices if d["status"] == "success"]), "offline_devices": len([d for d in devices if d["status"] == "error"]), "operating_systems": {}, "cpu_architectures": {}, "network_segments": {}, "resource_utilization": { "high_memory_usage": [], "high_disk_usage": [], "low_resources": [], }, } for device in devices: if device["status"] != "success": continue # OS distribution os_info = device.get("os_info", "Unknown") if isinstance(analysis["operating_systems"], dict): analysis["operating_systems"][os_info] = ( analysis["operating_systems"].get(os_info, 0) + 1 ) # CPU models cpu_model = device.get("cpu_model", "Unknown") if isinstance(analysis["cpu_architectures"], dict): analysis["cpu_architectures"][cpu_model] = ( analysis["cpu_architectures"].get(cpu_model, 0) + 1 ) # Network segments (by IP prefix) connection_ip = device.get("connection_ip", "") if "." in connection_ip: network_prefix = ".".join(connection_ip.split(".")[:3]) + ".0/24" if isinstance(analysis["network_segments"], dict): analysis["network_segments"][network_prefix] = ( analysis["network_segments"].get(network_prefix, 0) + 1 ) # Resource utilization analysis if device.get("disk_use_percent"): try: disk_usage = int(device["disk_use_percent"].rstrip("%")) if disk_usage > 80: if ( isinstance(analysis["resource_utilization"], dict) and "high_disk_usage" in analysis["resource_utilization"] ): analysis["resource_utilization"]["high_disk_usage"].append( { "hostname": device["hostname"], "usage": device["disk_use_percent"], } ) except (ValueError, AttributeError): pass # Identify resource-constrained devices cpu_cores = device.get("cpu_cores") if cpu_cores is not None and cpu_cores <= 2: memory_total = device.get("memory_total") if memory_total: memory_gb = self._parse_memory_gb(str(memory_total)) if ( memory_gb <= 2 and isinstance(analysis["resource_utilization"], dict) and "low_resources" in analysis["resource_utilization"] ): analysis["resource_utilization"]["low_resources"].append( { "hostname": device["hostname"], "cpu_cores": device["cpu_cores"], "memory": device["memory_total"], } ) return analysis def _parse_memory_gb(self, memory_str: str) -> float: """Parse memory string and return value in GB.""" if not memory_str: return 0.0 memory_str = str(memory_str).strip() if memory_str.endswith("Gi"): try: return float(memory_str.rstrip("Gi")) except (ValueError, AttributeError): return 0.0 elif memory_str.endswith("G"): try: return float(memory_str.rstrip("G")) except (ValueError, AttributeError): return 0.0 else: return 0.0 def suggest_deployments(self) -> dict[str, Any]: """Suggest optimal deployment locations based on current network state.""" devices = self.get_all_devices() online_devices = [d for d in devices if d["status"] == "success"] suggestions: dict[str, list[dict[str, str]]] = { "load_balancer_candidates": [], "database_candidates": [], "monitoring_targets": [], "upgrade_recommendations": [], } for device in online_devices: hostname = device["hostname"] # Load balancer candidates (high CPU, good memory) cpu_cores = device.get("cpu_cores") or 0 if cpu_cores >= 4: memory_total = device.get("memory_total") memory_gb = self._parse_memory_gb( str(memory_total) if memory_total else "" ) if memory_gb >= 4: suggestions["load_balancer_candidates"].append( { "hostname": hostname, "reason": f"{cpu_cores} cores, {device['memory_total']} RAM", } ) # Database candidates (good disk space, memory) if device.get("disk_use_percent"): try: disk_usage = int(device["disk_use_percent"].rstrip("%")) if disk_usage < 50: # Plenty of disk space memory_total = device.get("memory_total") memory_gb = self._parse_memory_gb( str(memory_total) if memory_total else "" ) if memory_gb >= 8: suggestions["database_candidates"].append( { "hostname": hostname, "reason": f"Low disk usage ({device['disk_use_percent']}), {device['memory_total']} RAM", } ) except (ValueError, AttributeError): pass # Monitoring targets (all online devices should be monitored) suggestions["monitoring_targets"].append( { "hostname": hostname, "connection_ip": device["connection_ip"], "os": device.get("os_info", "Unknown"), } ) # Upgrade recommendations cpu_cores = device.get("cpu_cores") or 0 if cpu_cores <= 2: memory_total = device.get("memory_total") memory_gb = self._parse_memory_gb( str(memory_total) if memory_total else "" ) if memory_gb <= 4: suggestions["upgrade_recommendations"].append( { "hostname": hostname, "reason": f"Limited resources: {cpu_cores} cores, {device.get('memory_total', 'Unknown')} RAM", } ) return suggestions async def discover_and_store( sitemap: NetworkSiteMap, hostname: str, username: str, password: str | None = None, key_path: str | None = None, port: int = 22, ) -> str: """Discover a device and store it in the site map.""" from .ssh_tools import ssh_discover_system # Perform discovery discovery_result = await ssh_discover_system( hostname, username, password, key_path, port ) # Parse and store the result device = sitemap.parse_discovery_output(discovery_result) device_id = sitemap.store_device(device) sitemap.store_discovery_history(device_id, discovery_result) return json.dumps( { "status": "success", "device_id": device_id, "hostname": device.hostname, "discovery_status": device.status, "stored_at": datetime.now().isoformat(), }, indent=2, ) async def bulk_discover_and_store( sitemap: NetworkSiteMap, targets: list[dict[str, Any]] ) -> str: """Discover multiple devices and store them in the site map.""" results = [] for target in targets: try: result = await discover_and_store( sitemap, target["hostname"], target["username"], target.get("password"), target.get("key_path"), target.get("port", 22), ) results.append(json.loads(result)) except Exception as e: results.append( { "status": "error", "hostname": target.get("hostname", "unknown"), "error": str(e), } ) return json.dumps( { "status": "success", "total_targets": len(targets), "results": results, "completed_at": datetime.now().isoformat(), }, indent=2, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/washyu/mcp_python_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server