Model Control Plane (MCP) Server

  • scripts
#!/usr/bin/env python3 """ MCP Memory Dashboard - A Prometheus Memory Monitoring Dashboard using MCP """ import os import sys import time import json import curses import argparse from datetime import datetime, timedelta from typing import Dict, List, Any, Optional # Import the MCP client from langflow import MCPAIComponent class MemoryDashboard: """Memory monitoring dashboard using Prometheus MCP integration""" def __init__(self, mcp_server_url: str = "http://localhost:8000", prometheus_url: str = "http://localhost:9090", refresh_interval: int = 5): """Initialize the dashboard Args: mcp_server_url: URL of the MCP server prometheus_url: URL of the Prometheus server refresh_interval: Dashboard refresh interval in seconds """ self.mcp_server_url = mcp_server_url self.prometheus_url = prometheus_url self.refresh_interval = refresh_interval self.mcp = MCPAIComponent(mcp_server_url=mcp_server_url) # Set the Prometheus URL in the environment os.environ["PROMETHEUS_URL"] = prometheus_url # Store the latest data self.latest_data = { "system_memory": {}, "container_memory": {}, "alerts": [], "last_update": None } def fetch_system_memory(self) -> Dict[str, Any]: """Fetch system memory metrics from Prometheus""" try: # Get total memory total_mem_result = self.mcp.prometheus_query("node_memory_MemTotal_bytes") available_mem_result = self.mcp.prometheus_query("node_memory_MemAvailable_bytes") # Calculate usage percentage usage_pct_result = self.mcp.prometheus_query( "(1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100" ) # Process results hosts = {} if usage_pct_result.get('status') == 'success': results = usage_pct_result.get('data', {}).get('result', []) for result in results: instance = result.get('metric', {}).get('instance', 'unknown') try: usage_pct = float(result.get('value', [0, 0])[1]) # Find matching total and available memory total_mem = next( (float(r.get('value', [0, 0])[1]) for r in total_mem_result.get('data', {}).get('result', []) if r.get('metric', {}).get('instance') == instance), 0 ) available_mem = next( (float(r.get('value', [0, 0])[1]) for r in available_mem_result.get('data', {}).get('result', []) if r.get('metric', {}).get('instance') == instance), 0 ) # Convert to GB total_gb = total_mem / (1024 * 1024 * 1024) available_gb = available_mem / (1024 * 1024 * 1024) used_gb = total_gb - available_gb # Determine status if usage_pct > 80: status = "CRITICAL" elif usage_pct > 60: status = "WARNING" elif usage_pct > 40: status = "ELEVATED" else: status = "NORMAL" hosts[instance] = { "total_gb": total_gb, "used_gb": used_gb, "available_gb": available_gb, "usage_pct": usage_pct, "status": status } except (ValueError, TypeError, IndexError) as e: hosts[instance] = {"error": str(e)} return hosts except Exception as e: return {"error": str(e)} def fetch_container_memory(self) -> Dict[str, Any]: """Fetch container memory metrics from Prometheus""" try: # Get container memory usage and limits usage_result = self.mcp.prometheus_query('container_memory_usage_bytes{container_name!=""}') limit_result = self.mcp.prometheus_query('container_spec_memory_limit_bytes{container_name!=""}') containers = {} if usage_result.get('status') == 'success': usage_data = usage_result.get('data', {}).get('result', []) limit_data = limit_result.get('data', {}).get('result', []) for result in usage_data: container = result.get('metric', {}).get('container_name', 'unknown') # Skip pause containers if container in ['', 'POD', 'pause'] or not container: continue try: # Get memory usage in bytes memory_bytes = float(result.get('value', [0, 0])[1]) memory_mb = memory_bytes / (1024 * 1024) # Find memory limit for this container memory_limit_bytes = next( (float(r.get('value', [0, 0])[1]) for r in limit_data if r.get('metric', {}).get('container_name') == container), float('inf') ) # Handle unlimited containers if memory_limit_bytes == float('inf') or memory_limit_bytes == 0: usage_percent = 0 memory_limit_mb = float('inf') else: memory_limit_mb = memory_limit_bytes / (1024 * 1024) usage_percent = (memory_bytes / memory_limit_bytes) * 100 # Determine status if usage_percent > 80: status = "CRITICAL" elif usage_percent > 60: status = "WARNING" elif usage_percent > 40: status = "ELEVATED" else: status = "NORMAL" # Store container info containers[container] = { "memory_mb": memory_mb, "limit_mb": memory_limit_mb if memory_limit_mb != float('inf') else None, "usage_percent": usage_percent, "status": status } except (ValueError, TypeError, ZeroDivisionError) as e: containers[container] = {"error": str(e)} return containers except Exception as e: return {"error": str(e)} def fetch_alerts(self) -> List[Dict[str, Any]]: """Fetch memory-related alerts from Prometheus""" try: alerts_result = self.mcp.prometheus_get_alerts() if alerts_result.get('status') == 'success': all_alerts = alerts_result.get('data', {}).get('alerts', []) # Filter for memory-related alerts memory_alerts = [ alert for alert in all_alerts if 'alertname' in alert.get('labels', {}) and any(term in alert.get('labels', {}).get('alertname', '').lower() for term in ['memory', 'mem', 'swap', 'container']) ] # Format alerts for display formatted_alerts = [] for alert in memory_alerts: alert_name = alert.get('labels', {}).get('alertname', 'Unknown') severity = alert.get('labels', {}).get('severity', 'unknown') state = alert.get('state', 'unknown') # Get target (container or instance) if 'container' in alert_name.lower() or 'name' in alert.get('labels', {}): target = alert.get('labels', {}).get('name', alert.get('labels', {}).get('container_name', 'unknown')) target_type = "container" else: target = alert.get('labels', {}).get('instance', 'unknown') target_type = "host" summary = alert.get('annotations', {}).get('summary', 'No summary available') formatted_alerts.append({ "name": alert_name, "severity": severity, "state": state, "target": target, "target_type": target_type, "summary": summary }) return formatted_alerts else: return [{"error": f"Failed to fetch alerts: {alerts_result.get('error', 'Unknown error')}"}] except Exception as e: return [{"error": str(e)}] def update_data(self): """Update all dashboard data""" self.latest_data["system_memory"] = self.fetch_system_memory() self.latest_data["container_memory"] = self.fetch_container_memory() self.latest_data["alerts"] = self.fetch_alerts() self.latest_data["last_update"] = datetime.now() def print_dashboard(self): """Print the memory dashboard to the console""" self.update_data() print("\n" + "=" * 80) print(f"MCP MEMORY DASHBOARD - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 80) # System Memory Section print("\nSYSTEM MEMORY") print("-" * 80) system_mem = self.latest_data["system_memory"] if "error" in system_mem: print(f"Error fetching system memory: {system_mem['error']}") else: for instance, data in system_mem.items(): if "error" in data: print(f"Host {instance}: Error - {data['error']}") continue status_color = { "NORMAL": "", "ELEVATED": "", "WARNING": "", "CRITICAL": "" }[data["status"]] print(f"Host: {instance}") print(f" Memory: {data['used_gb']:.2f} GB / {data['total_gb']:.2f} GB ({data['usage_pct']:.1f}%)") print(f" Available: {data['available_gb']:.2f} GB") print(f" Status: {data['status']}") print() # Container Memory Section print("\nCONTAINER MEMORY") print("-" * 80) container_mem = self.latest_data["container_memory"] if "error" in container_mem: print(f"Error fetching container memory: {container_mem['error']}") else: # Sort containers by usage percentage (highest first) sorted_containers = sorted( [(name, data) for name, data in container_mem.items() if "error" not in data], key=lambda x: x[1]["usage_percent"], reverse=True ) for container, data in sorted_containers: limit_text = f"/ {data['limit_mb']:.2f} MB ({data['usage_percent']:.1f}%)" if data["limit_mb"] else "(no limit)" print(f"Container: {container}") print(f" Memory: {data['memory_mb']:.2f} MB {limit_text}") print(f" Status: {data['status']}") print() # Print containers with errors error_containers = [(name, data) for name, data in container_mem.items() if "error" in data] if error_containers: print("\nContainers with errors:") for container, data in error_containers: print(f" {container}: {data['error']}") # Alerts Section print("\nACTIVE ALERTS") print("-" * 80) alerts = self.latest_data["alerts"] if not alerts: print("No memory-related alerts found.") elif "error" in alerts[0]: print(f"Error fetching alerts: {alerts[0]['error']}") else: # Filter for active alerts and sort by severity active_alerts = [a for a in alerts if a["state"].lower() == "firing"] severity_order = {"critical": 0, "warning": 1, "info": 2, "unknown": 3} sorted_alerts = sorted( active_alerts, key=lambda x: severity_order.get(x["severity"].lower(), 4) ) if not sorted_alerts: print("No active memory alerts.") else: for i, alert in enumerate(sorted_alerts): target_type = "Container" if alert["target_type"] == "container" else "Host" print(f"Alert {i+1}: {alert['name']}") print(f" {target_type}: {alert['target']}") print(f" Severity: {alert['severity'].upper()}") print(f" Summary: {alert['summary']}") print() print("=" * 80) print(f"Last update: {self.latest_data['last_update'].strftime('%Y-%m-%d %H:%M:%S')}") print(f"MCP Server: {self.mcp_server_url}") print(f"Prometheus: {self.prometheus_url}") print("=" * 80) def run_dashboard(self): """Run the dashboard with periodic updates""" try: while True: # Clear screen os.system('cls' if os.name == 'nt' else 'clear') # Update and print dashboard self.print_dashboard() # Wait for next refresh print(f"\nRefreshing in {self.refresh_interval} seconds... (Press Ctrl+C to exit)") time.sleep(self.refresh_interval) except KeyboardInterrupt: print("\nDashboard stopped.") def main(): """Main function to run the dashboard""" parser = argparse.ArgumentParser(description="MCP Memory Dashboard") parser.add_argument("--mcp-url", default=os.getenv("MCP_SERVER_URL", "http://localhost:8000"), help="MCP server URL") parser.add_argument("--prometheus-url", default=os.getenv("PROMETHEUS_URL", "http://localhost:9090"), help="Prometheus server URL") parser.add_argument("--refresh", type=int, default=5, help="Dashboard refresh interval in seconds") parser.add_argument("--once", action="store_true", help="Print dashboard once and exit") args = parser.parse_args() dashboard = MemoryDashboard( mcp_server_url=args.mcp_url, prometheus_url=args.prometheus_url, refresh_interval=args.refresh ) if args.once: dashboard.print_dashboard() else: dashboard.run_dashboard() if __name__ == "__main__": main()