Model Control Plane (MCP) Server

  • scripts
#!/usr/bin/env python3 """ AI-powered Memory Diagnostics - Uses MCP and AI to analyze Prometheus memory alerts and provide detailed technical recommendations. """ import os import sys import json import argparse from datetime import datetime, timedelta from typing import Dict, List, Any, Optional # Import the MCP client from langflow import MCPAIComponent class MemoryDiagnostics: """AI-powered memory diagnostics using Prometheus and MCP""" def __init__(self, mcp_server_url: str = "http://localhost:8000", prometheus_url: str = "http://localhost:9090", ai_model_id: str = None): """Initialize the diagnostics Args: mcp_server_url: URL of the MCP server prometheus_url: URL of the Prometheus server ai_model_id: ID of the AI model to use for analysis (or auto-select if None) """ self.mcp_server_url = mcp_server_url self.prometheus_url = prometheus_url self.ai_model_id = ai_model_id self.mcp = MCPAIComponent(mcp_server_url=mcp_server_url) # Set the Prometheus URL in the environment os.environ["PROMETHEUS_URL"] = prometheus_url # Auto-select AI model if not specified if not self.ai_model_id: self._select_ai_model() def _select_ai_model(self): """Auto-select an appropriate AI model""" try: # Get available models models = self.mcp.list_models() chat_models = [model for model in models if any(cap == "chat" for cap in model.get('capabilities', []))] # Print all available models for debugging print("\nAvailable chat models:") for model in chat_models: print(f" - {model['id']} ({', '.join(model.get('capabilities', []))})") print() if not chat_models: print("Warning: No chat-capable AI models found") self.ai_model_id = None return # Get preferred model from environment preferred_chat_model = os.getenv("OPENAI_CHAT_MODEL", "gpt-4o-mini") print(f"Preferred model from environment: {preferred_chat_model}") # Look for models in order of preference # First check for exact matching model ID for model in chat_models: if model['id'].lower() == preferred_chat_model.lower(): self.ai_model_id = model['id'] print(f"Selected AI model (exact match): {self.ai_model_id}") return # Then check for models containing the preferred name for model in chat_models: if preferred_chat_model.lower() in model['id'].lower(): self.ai_model_id = model['id'] print(f"Selected AI model (partial match): {self.ai_model_id}") return # Then look for any OpenAI model for model in chat_models: if "openai" in model['id'].lower(): self.ai_model_id = model['id'] print(f"Selected AI model (OpenAI): {self.ai_model_id}") return # Prefer more advanced models (GPT-4 > Claude > GPT-3.5) for model_name_part in ["gpt-4", "claude", "gpt-3.5", "gpt"]: for model in chat_models: if model_name_part in model['id'].lower(): self.ai_model_id = model['id'] print(f"Selected AI model (by type): {self.ai_model_id}") return # If no preference matches, use the first one self.ai_model_id = chat_models[0]['id'] print(f"Selected AI model (first available): {self.ai_model_id}") except Exception as e: print(f"Warning: Error selecting AI model: {e}") print("Falling back to basic analysis without AI") self.ai_model_id = None def fetch_alerts(self) -> List[Dict[str, Any]]: """Fetch memory-related alerts from Prometheus""" try: alerts_result = self.mcp.prometheus_get_alerts() if alerts_result.get('status') == 'success': all_alerts = alerts_result.get('data', {}).get('alerts', []) # Filter for memory-related alerts memory_alerts = [ alert for alert in all_alerts if 'alertname' in alert.get('labels', {}) and any(term in alert.get('labels', {}).get('alertname', '').lower() for term in ['memory', 'mem', 'swap', 'container']) ] # Format alerts for display formatted_alerts = [] for alert in memory_alerts: alert_name = alert.get('labels', {}).get('alertname', 'Unknown') severity = alert.get('labels', {}).get('severity', 'unknown') state = alert.get('state', 'unknown') # Get target (container or instance) if 'container' in alert_name.lower() or 'name' in alert.get('labels', {}): target = alert.get('labels', {}).get('name', alert.get('labels', {}).get('container_name', 'unknown')) target_type = "container" else: target = alert.get('labels', {}).get('instance', 'unknown') target_type = "host" summary = alert.get('annotations', {}).get('summary', 'No summary available') description = alert.get('annotations', {}).get('description', '') formatted_alerts.append({ "name": alert_name, "severity": severity, "state": state, "target": target, "target_type": target_type, "summary": summary, "description": description }) return formatted_alerts else: print(f"Error fetching alerts: {alerts_result.get('error', 'Unknown error')}") return [] except Exception as e: print(f"Exception fetching alerts: {str(e)}") return [] def get_memory_metrics(self) -> Dict[str, Any]: """Collect comprehensive memory metrics""" metrics = { "system": {}, "containers": {} } try: # System memory metrics # Total memory total_mem_result = self.mcp.prometheus_query("node_memory_MemTotal_bytes") # Available memory avail_mem_result = self.mcp.prometheus_query("node_memory_MemAvailable_bytes") # Memory usage percent usage_pct_result = self.mcp.prometheus_query( "(1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100" ) # Process results if usage_pct_result.get('status') == 'success': results = usage_pct_result.get('data', {}).get('result', []) for result in results: instance = result.get('metric', {}).get('instance', 'unknown') try: usage_pct = float(result.get('value', [0, 0])[1]) # Find matching total and available memory total_result = next( (r for r in total_mem_result.get('data', {}).get('result', []) if r.get('metric', {}).get('instance') == instance), None ) avail_result = next( (r for r in avail_mem_result.get('data', {}).get('result', []) if r.get('metric', {}).get('instance') == instance), None ) if total_result and avail_result: total_bytes = float(total_result.get('value', [0, 0])[1]) avail_bytes = float(avail_result.get('value', [0, 0])[1]) metrics["system"][instance] = { "total_bytes": total_bytes, "total_gb": total_bytes / (1024 * 1024 * 1024), "available_bytes": avail_bytes, "available_gb": avail_bytes / (1024 * 1024 * 1024), "used_bytes": total_bytes - avail_bytes, "used_gb": (total_bytes - avail_bytes) / (1024 * 1024 * 1024), "usage_percent": usage_pct } except (ValueError, TypeError, IndexError) as e: print(f"Error processing memory data for {instance}: {e}") # Container memory metrics # Memory usage container_mem_result = self.mcp.prometheus_query('container_memory_usage_bytes{container_name!=""}') # Memory limits container_limit_result = self.mcp.prometheus_query('container_spec_memory_limit_bytes{container_name!=""}') if container_mem_result.get('status') == 'success': results = container_mem_result.get('data', {}).get('result', []) for result in results: container = result.get('metric', {}).get('container_name', 'unknown') # Skip pause containers if container in ['', 'POD', 'pause'] or not container: continue try: memory_bytes = float(result.get('value', [0, 0])[1]) # Find matching limit limit_result = next( (r for r in container_limit_result.get('data', {}).get('result', []) if r.get('metric', {}).get('container_name') == container), None ) limit_bytes = 0 usage_percent = 0 if limit_result: limit_bytes = float(limit_result.get('value', [0, 0])[1]) if limit_bytes > 0: usage_percent = (memory_bytes / limit_bytes) * 100 metrics["containers"][container] = { "memory_bytes": memory_bytes, "memory_mb": memory_bytes / (1024 * 1024), "limit_bytes": limit_bytes if limit_bytes > 0 else None, "limit_mb": limit_bytes / (1024 * 1024) if limit_bytes > 0 else None, "usage_percent": usage_percent } except (ValueError, TypeError, IndexError) as e: print(f"Error processing container memory data for {container}: {e}") return metrics except Exception as e: print(f"Error collecting memory metrics: {e}") return metrics def get_historical_trends(self, lookback_hours: int = 3) -> Dict[str, Any]: """Get historical memory usage trends""" trends = { "system": {}, "containers": {} } try: # Calculate time range end_time = datetime.now() start_time = end_time - timedelta(hours=lookback_hours) # Format timestamps for Prometheus end_str = end_time.isoformat("T") + "Z" start_str = start_time.isoformat("T") + "Z" # Get system memory trends query = "(1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100" range_result = self.mcp.prometheus_query_range( query=query, start=start_str, end=end_str, step="5m" # 5-minute intervals ) if range_result.get('status') == 'success': results = range_result.get('data', {}).get('result', []) for result in results: instance = result.get('metric', {}).get('instance', 'unknown') values = result.get('values', []) if values: # Process trend data - get min, max, avg, trend direction try: timestamps = [v[0] for v in values] data_points = [float(v[1]) for v in values] trends["system"][instance] = { "min": min(data_points), "max": max(data_points), "avg": sum(data_points) / len(data_points), "trend": "increasing" if data_points[-1] > data_points[0] else "decreasing" if data_points[-1] < data_points[0] else "stable", "samples": len(data_points), "time_range": f"{lookback_hours} hours" } except (ValueError, TypeError, IndexError) as e: print(f"Error processing system trend data for {instance}: {e}") # Get container memory trends for selected containers for container in list(self.get_memory_metrics()["containers"].keys())[:5]: # Limit to 5 containers query = f'container_memory_usage_bytes{{container_name="{container}"}}' range_result = self.mcp.prometheus_query_range( query=query, start=start_str, end=end_str, step="5m" # 5-minute intervals ) if range_result.get('status') == 'success': results = range_result.get('data', {}).get('result', []) for result in results: values = result.get('values', []) if values: # Process trend data try: timestamps = [v[0] for v in values] data_points = [float(v[1]) / (1024 * 1024) for v in values] # Convert to MB trends["containers"][container] = { "min_mb": min(data_points), "max_mb": max(data_points), "avg_mb": sum(data_points) / len(data_points), "trend": "increasing" if data_points[-1] > data_points[0] else "decreasing" if data_points[-1] < data_points[0] else "stable", "samples": len(data_points), "time_range": f"{lookback_hours} hours" } except (ValueError, TypeError, IndexError) as e: print(f"Error processing container trend data for {container}: {e}") return trends except Exception as e: print(f"Error collecting trend data: {e}") return trends def get_ai_analysis(self, alerts, metrics, trends, detailed=False) -> str: """Get AI analysis of memory metrics and alerts""" if not self.ai_model_id: return "No AI model available for analysis." try: # Format alerts for the prompt alert_text = "" if alerts: alert_text = "Current Memory Alerts:\n" for i, alert in enumerate(alerts): alert_text += f"- {alert['name']} ({alert['severity']})\n" alert_text += f" Target: {alert['target']} ({alert['target_type']})\n" alert_text += f" State: {alert['state']}\n" alert_text += f" Summary: {alert['summary']}\n\n" else: alert_text = "No active memory alerts.\n\n" # Format metrics for the prompt metric_text = "Current Memory Metrics:\n" # System metrics for instance, data in metrics["system"].items(): metric_text += f"Host: {instance}\n" metric_text += f"- Total Memory: {data.get('total_gb', 0):.2f} GB\n" metric_text += f"- Used Memory: {data.get('used_gb', 0):.2f} GB\n" metric_text += f"- Available Memory: {data.get('available_gb', 0):.2f} GB\n" metric_text += f"- Usage: {data.get('usage_percent', 0):.1f}%\n\n" # Container metrics (top 5 by usage) sorted_containers = sorted( metrics["containers"].items(), key=lambda x: x[1].get("usage_percent", 0), reverse=True ) if sorted_containers: metric_text += "Top Containers by Memory Usage:\n" for container, data in sorted_containers[:5]: memory_mb = data.get("memory_mb", 0) limit_text = f" / {data.get('limit_mb', 0):.2f} MB ({data.get('usage_percent', 0):.1f}%)" if data.get("limit_mb") else " (no limit)" metric_text += f"- {container}: {memory_mb:.2f} MB{limit_text}\n" # Format trends for the prompt trend_text = "\nMemory Usage Trends:\n" # System trends for instance, data in trends["system"].items(): trend_text += f"Host: {instance}\n" trend_text += f"- Trend: {data.get('trend', 'unknown').upper()}\n" trend_text += f"- Min: {data.get('min', 0):.1f}%, Max: {data.get('max', 0):.1f}%, Avg: {data.get('avg', 0):.1f}%\n" trend_text += f"- Time Range: {data.get('time_range', 'unknown')}\n\n" # Container trends if trends["containers"]: trend_text += "Container Trends:\n" for container, data in trends["containers"].items(): trend_text += f"- {container}: {data.get('trend', 'unknown').upper()}, " trend_text += f"Min: {data.get('min_mb', 0):.2f} MB, " trend_text += f"Max: {data.get('max_mb', 0):.2f} MB, " trend_text += f"Avg: {data.get('avg_mb', 0):.2f} MB\n" # Create the prompt if detailed: prompt = f""" I need a detailed expert analysis of the memory situation in a system being monitored by Prometheus. {alert_text} {metric_text} {trend_text} Please provide: 1. A detailed technical analysis of the current memory situation 2. Specific recommendations for addressing any issues, including exact commands where appropriate 3. Potential root causes of any memory problems detected 4. Long-term strategies for memory optimization 5. If no immediate issues are detected, preventative measures and best practices Focus on being technical, detailed, and practical. """ else: prompt = f""" Analyze the following memory metrics and alerts from a system being monitored by Prometheus. {alert_text} {metric_text} {trend_text} Please provide a concise analysis including: 1. Summary of the current memory situation 2. Key recommendations to address any issues 3. Brief assessment of memory usage trends Keep the response short and actionable. """ # Prepare system prompt based on whether there are alerts if alerts: system_prompt = "You are an expert systems administrator specializing in memory management and performance troubleshooting. You provide technical, accurate, and actionable advice for resolving memory issues." else: system_prompt = "You are an expert systems administrator specializing in memory management. You provide concise technical advice on memory optimization and preventative measures." # Prepare messages for chat messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt} ] # Get token limit based on model max_tokens = 1000 if "gpt-4" in self.ai_model_id.lower(): max_tokens = 2000 # Handle different model types print(f"Querying model {self.ai_model_id}...") try: # Make the API call response = self.mcp.process( input_type="chat", model_id=self.ai_model_id, messages=messages, max_tokens=max_tokens, temperature=0.3 # Lower temperature for more focused responses ) # Extract the response content if isinstance(response, dict): if 'choices' in response and len(response['choices']) > 0: content = response['choices'][0]['message']['content'] return content elif 'error' in response: return f"AI analysis failed: {response['error']}" else: # Try to find content in other response formats for key in ['text', 'content', 'message']: if key in response: return response[key] return "AI analysis failed: Unexpected response format" except Exception as inner_e: print(f"Error during model query: {inner_e}") # Try fallback to openai model if azure fails if "azure" in self.ai_model_id.lower(): try: print("Trying fallback to OpenAI model...") fallback_model_id = f"openai-{os.getenv('OPENAI_CHAT_MODEL', 'gpt-4o-mini')}" response = self.mcp.process( input_type="chat", model_id=fallback_model_id, messages=messages, max_tokens=max_tokens, temperature=0.3 ) if isinstance(response, dict) and 'choices' in response: content = response['choices'][0]['message']['content'] return content except Exception as fallback_e: print(f"Fallback also failed: {fallback_e}") return f"AI analysis failed: {inner_e}" except Exception as e: return f"AI analysis failed: {str(e)}\n\nBasic Recommendations:\n"\ "- For host memory issues: Check running processes with 'top' or 'htop'\n"\ "- For container memory issues: Consider increasing container memory limits\n"\ "- Monitor memory usage trends to identify patterns" def run_diagnostics(self, detailed=False, lookback_hours=3) -> None: """Run full memory diagnostics""" print("\n" + "=" * 80) print("AI-POWERED MEMORY DIAGNOSTICS") print("=" * 80) # Get alerts print("Fetching memory alerts...") alerts = self.fetch_alerts() print(f"Found {len(alerts)} memory-related alerts") # Get current metrics print("Collecting current memory metrics...") metrics = self.get_memory_metrics() # Get historical trends print(f"Analyzing memory trends over the past {lookback_hours} hours...") trends = self.get_historical_trends(lookback_hours) # Get AI analysis if self.ai_model_id: print(f"Requesting AI analysis using model {self.ai_model_id}...") analysis = self.get_ai_analysis(alerts, metrics, trends, detailed) print("\n" + "=" * 80) print("MEMORY ANALYSIS AND RECOMMENDATIONS") print("=" * 80 + "\n") print(analysis) else: print("\nAI analysis not available - no chat model found") # Print a basic report without AI print("\n" + "=" * 80) print("BASIC MEMORY REPORT") print("=" * 80 + "\n") # Print alerts if alerts: print("Memory Alerts:") for alert in alerts: print(f"- {alert['name']} ({alert['severity']}) on {alert['target']}") print(f" {alert['summary']}") else: print("No memory alerts detected.") # Print system metrics for instance, data in metrics["system"].items(): print(f"\nHost: {instance}") print(f"Memory Usage: {data.get('usage_percent', 0):.1f}%") print(f"Total: {data.get('total_gb', 0):.2f} GB") print(f"Used: {data.get('used_gb', 0):.2f} GB") print(f"Available: {data.get('available_gb', 0):.2f} GB") # Basic recommendations print("\nBasic Recommendations:") if any(data.get('usage_percent', 0) > 70 for data in metrics["system"].values()): print("- High system memory usage detected. Consider freeing up memory.") print("- Check for memory leaks or runaway processes.") else: print("- System memory usage appears normal.") # Check for high container usage high_containers = [ (name, data) for name, data in metrics["containers"].items() if data.get('usage_percent', 0) > 70 ] if high_containers: print("\n- High memory usage in containers:") for name, data in high_containers: print(f" * {name}: {data.get('usage_percent', 0):.1f}%") print("- Consider increasing container memory limits or optimizing applications.") print("\n" + "=" * 80) print(f"Diagnostics completed at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 80) def main(): """Main function""" parser = argparse.ArgumentParser(description="AI-powered Memory Diagnostics") parser.add_argument("--mcp-url", default=os.getenv("MCP_SERVER_URL", "http://localhost:8000"), help="MCP server URL") parser.add_argument("--prometheus-url", default=os.getenv("PROMETHEUS_URL", "http://localhost:9090"), help="Prometheus server URL") parser.add_argument("--ai-model", default=None, help="Specific AI model ID to use") parser.add_argument("--model-type", choices=["openai", "azure"], default="openai", help="Type of model to use (openai or azure)") parser.add_argument("--detailed", action="store_true", help="Generate a detailed analysis") parser.add_argument("--lookback", type=int, default=3, help="Hours to look back for trend analysis") parser.add_argument("--list-models", action="store_true", help="List available AI models and exit") args = parser.parse_args() # Initialize basic client to list models if needed mcp = MCPAIComponent(mcp_server_url=args.mcp_url) # Just list models and exit if requested if args.list_models: models = mcp.list_models() print("\nAvailable AI Models:") print("-" * 60) for model in models: model_id = model.get('id', 'unknown') capabilities = ", ".join(model.get('capabilities', [])) print(f"ID: {model_id}") print(f"Capabilities: {capabilities}") print("-" * 60) return # Don't try to construct a model ID, instead initialize diagnostics without a specific model # and let the auto-selection logic find an available one diagnostics = MemoryDiagnostics( mcp_server_url=args.mcp_url, prometheus_url=args.prometheus_url, ai_model_id=args.ai_model # This might be None, which is fine ) # Run diagnostics diagnostics.run_diagnostics(detailed=args.detailed, lookback_hours=args.lookback) if __name__ == "__main__": main()