Model Control Plane (MCP) Server
by dvladimirov
- scripts
#!/usr/bin/env python3
"""
Test the Prometheus integration with MCP with a focus on memory metrics and alerts
"""
import os
import sys
import json
import time
from datetime import datetime, timedelta
from langflow import MCPAIComponent
def test_prometheus(prometheus_url=None):
"""Test the Prometheus integration with MCP"""
# Use provided Prometheus URL or default
if prometheus_url is None:
prometheus_url = os.getenv("PROMETHEUS_URL", "http://localhost:9090")
print(f"Testing Prometheus integration with URL: {prometheus_url}")
# Create MCPAIComponent with environment URL or default
mcp_server_url = os.getenv("MCP_SERVER_URL", "http://localhost:8000")
print(f"Connecting to MCP server at: {mcp_server_url}")
# Initialize the client
mcp = MCPAIComponent(mcp_server_url=mcp_server_url)
# Set the PROMETHEUS_URL environment variable for the server
os.environ["PROMETHEUS_URL"] = prometheus_url
# Basic query test
print("\n1. Testing basic Prometheus query...")
try:
query_result = mcp.prometheus_query("up")
print(f"Query status: {query_result.get('status')}")
data = query_result.get('data', {})
result_type = data.get('resultType')
results = data.get('result', [])
print(f"Result type: {result_type}")
print(f"Found {len(results)} series")
if results:
for i, result in enumerate(results[:5]): # Show first 5 results only
print(f" Series {i+1}:")
print(f" Metric: {result.get('metric')}")
print(f" Value: {result.get('value')}")
except Exception as e:
print(f"Query failed: {e}")
# Memory metric queries
print("\n2. Testing memory metric queries...")
# Current memory usage percentage
try:
print("\n2.1 Current memory usage percentage:")
query = "(1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100"
query_result = mcp.prometheus_query(query)
print(f"Query: {query}")
print(f"Status: {query_result.get('status')}")
data = query_result.get('data', {})
results = data.get('result', [])
if results:
for i, result in enumerate(results[:3]):
print(f" Host {i+1}: {result.get('metric', {}).get('instance', 'unknown')}")
# Value is a tuple with [timestamp, value]
try:
value = float(result.get('value', [0, 0])[1])
print(f" Memory usage: {value:.2f}%")
# Add interpretation
if value > 90:
status = "CRITICAL"
elif value > 80:
status = "WARNING"
elif value > 70:
status = "ELEVATED"
else:
status = "NORMAL"
print(f" Status: {status}")
except (ValueError, TypeError, IndexError):
print(" Unable to parse value")
else:
print(" No results found. Node exporter might not be running.")
except Exception as e:
print(f" Memory usage query failed: {e}")
# Total memory capacity
try:
print("\n2.2 Total memory capacity:")
query = "node_memory_MemTotal_bytes"
query_result = mcp.prometheus_query(query)
data = query_result.get('data', {})
results = data.get('result', [])
if results:
for i, result in enumerate(results[:3]):
print(f" Host {i+1}: {result.get('metric', {}).get('instance', 'unknown')}")
try:
bytes_value = float(result.get('value', [0, 0])[1])
gb_value = bytes_value / (1024 * 1024 * 1024)
print(f" Total Memory: {gb_value:.2f} GB")
except (ValueError, TypeError, IndexError):
print(" Unable to parse value")
else:
print(" No results found. Node exporter might not be running.")
except Exception as e:
print(f" Total memory query failed: {e}")
# Memory usage over time
print("\n2.3 Memory usage over time (last 15 minutes):")
try:
# Calculate time range for the last 15 minutes
end_time = datetime.now()
start_time = end_time - timedelta(minutes=15)
# Format timestamps for Prometheus
end_str = end_time.isoformat("T") + "Z"
start_str = start_time.isoformat("T") + "Z"
# Query for memory usage percentage over time
query = "(1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100"
range_result = mcp.prometheus_query_range(
query=query,
start=start_str,
end=end_str,
step="60s" # 1-minute intervals
)
print(f"Query: {query}")
print(f"Status: {range_result.get('status')}")
data = range_result.get('data', {})
results = data.get('result', [])
if results:
for i, result in enumerate(results[:2]):
print(f" Host {i+1}: {result.get('metric', {}).get('instance', 'unknown')}")
values = result.get('values', [])
if values:
print(f" Data points: {len(values)}")
# Calculate average, min, max
try:
value_points = [float(v[1]) for v in values]
avg_value = sum(value_points) / len(value_points)
min_value = min(value_points)
max_value = max(value_points)
print(f" Average memory usage: {avg_value:.2f}%")
print(f" Minimum memory usage: {min_value:.2f}%")
print(f" Maximum memory usage: {max_value:.2f}%")
# Memory usage trend
if len(value_points) > 1:
first_value = value_points[0]
last_value = value_points[-1]
if last_value > first_value * 1.1: # 10% increase
trend = "INCREASING"
elif last_value < first_value * 0.9: # 10% decrease
trend = "DECREASING"
else:
trend = "STABLE"
print(f" Memory usage trend: {trend}")
except (ValueError, TypeError, IndexError) as e:
print(f" Error calculating statistics: {e}")
else:
print(" No data points found in the time range")
else:
print(" No results found. Node exporter might not be running.")
except Exception as e:
print(f" Memory range query failed: {e}")
# Check for memory-related alerts
print("\n3. Checking memory alerts...")
try:
# First check if there are any active memory alerts
alerts_result = mcp.prometheus_get_alerts()
data = alerts_result.get('data', {})
all_alerts = data.get('alerts', [])
# Filter for memory-related alerts - including both node and container alerts
memory_alerts = [
alert for alert in all_alerts
if 'alertname' in alert.get('labels', {}) and
any(memory_term in alert.get('labels', {}).get('alertname', '').lower()
for memory_term in ['memory', 'mem', 'swap', 'container'])
]
print(f"Found {len(memory_alerts)} memory-related alerts out of {len(all_alerts)} total alerts")
if memory_alerts:
# Group alerts by type
node_memory_alerts = []
container_memory_alerts = []
for alert in memory_alerts:
alert_name = alert.get('labels', {}).get('alertname', 'Unknown')
# Check if this is a container alert
if 'container' in alert_name.lower() or 'name' in alert.get('labels', {}):
container_memory_alerts.append(alert)
else:
node_memory_alerts.append(alert)
# Get AI recommendations for alerts
if node_memory_alerts or container_memory_alerts:
ai_recommendations = get_ai_recommendations(mcp, node_memory_alerts, container_memory_alerts)
print("\n=== AI-ASSISTED ANALYSIS AND RECOMMENDATIONS ===")
print(ai_recommendations)
print("===================================================")
# Display node memory alerts
if node_memory_alerts:
print("\n Node Memory Alerts:")
for i, alert in enumerate(node_memory_alerts):
alert_name = alert.get('labels', {}).get('alertname', 'Unknown')
severity = alert.get('labels', {}).get('severity', 'unknown')
state = alert.get('state', 'unknown')
instance = alert.get('labels', {}).get('instance', 'unknown')
summary = alert.get('annotations', {}).get('summary', 'No summary available')
print(f" Alert {i+1}: {alert_name}")
print(f" Instance: {instance}")
print(f" Severity: {severity}")
print(f" State: {state}")
print(f" Summary: {summary}")
# For firing alerts, recommend actions
if state.lower() == 'firing':
if 'high' in alert_name.lower():
print(" Recommended action: Consider freeing up memory or increasing capacity")
elif 'critical' in alert_name.lower():
print(" Recommended action: Immediately free up memory, potential system instability")
elif 'medium' in alert_name.lower():
print(" Recommended action: Monitor memory usage trends")
elif 'low' in alert_name.lower():
print(" Recommended action: Normal operations, no action needed")
elif 'swap' in alert_name.lower():
print(" Recommended action: Increase swap space or reduce memory pressure")
# Display container memory alerts
if container_memory_alerts:
print("\n Container Memory Alerts:")
for i, alert in enumerate(container_memory_alerts):
alert_name = alert.get('labels', {}).get('alertname', 'Unknown')
severity = alert.get('labels', {}).get('severity', 'unknown')
state = alert.get('state', 'unknown')
# Get container information
container_name = alert.get('labels', {}).get('name',
alert.get('labels', {}).get('container_name', 'unknown'))
summary = alert.get('annotations', {}).get('summary', 'No summary available')
print(f" Alert {i+1}: {alert_name}")
print(f" Container: {container_name}")
print(f" Severity: {severity}")
print(f" State: {state}")
print(f" Summary: {summary}")
# For firing alerts, recommend actions specific to containers
if state.lower() == 'firing':
print(" Recommended actions:")
print(" - Increase container memory limit")
print(" - Optimize container application memory usage")
print(" - Check for memory leaks in the container application")
print(" - Consider scaling horizontally instead of vertically")
# Query container memory usage
try:
print("\n Current Container Memory Usage:")
query = 'container_memory_usage_bytes{container_name!=""}'
mem_usage_result = mcp.prometheus_query(query)
query_limit = 'container_spec_memory_limit_bytes{container_name!=""}'
mem_limit_result = mcp.prometheus_query(query_limit)
usage_data = mem_usage_result.get('data', {}).get('result', [])
limit_data = mem_limit_result.get('data', {}).get('result', [])
if usage_data and limit_data:
for result in usage_data:
container = result.get('metric', {}).get('container_name', 'unknown')
# Skip pause containers and empty names
if container in ['', 'POD', 'pause'] or not container:
continue
try:
# Get memory usage in bytes
memory_bytes = float(result.get('value', [0, 0])[1])
memory_mb = memory_bytes / (1024 * 1024)
# Find memory limit for this container
memory_limit_bytes = next(
(float(r.get('value', [0, 0])[1])
for r in limit_data
if r.get('metric', {}).get('container_name') == container),
float('inf')
)
# Handle unlimited containers
if memory_limit_bytes == float('inf') or memory_limit_bytes == 0:
print(f" Container '{container}': {memory_mb:.2f} MB (no limit set)")
continue
memory_limit_mb = memory_limit_bytes / (1024 * 1024)
usage_percent = (memory_bytes / memory_limit_bytes) * 100
# Determine status
if usage_percent > 80:
status = "CRITICAL"
elif usage_percent > 60:
status = "WARNING"
elif usage_percent > 40:
status = "ELEVATED"
else:
status = "NORMAL"
print(f" Container '{container}': {memory_mb:.2f} MB / {memory_limit_mb:.2f} MB ({usage_percent:.1f}%) - {status}")
except (ValueError, TypeError, ZeroDivisionError) as e:
print(f" Container '{container}': Error processing data - {e}")
else:
print(" No container memory data available")
except Exception as e:
print(f" Error querying container memory: {e}")
else:
print(" No memory-related alerts found.")
# Create sample/demo alerts if no real ones exist
print("\n Demo memory alerts (for testing):")
print(" Alert 1: HighMemoryUsage")
print(" Severity: warning")
print(" State: firing")
print(" Summary: High memory usage on demo-host-1")
print(" Memory usage: 87.5%")
print(" Recommended action: Free up memory by stopping unnecessary processes")
print("\n Alert 2: CriticalMemoryUsage")
print(" Severity: critical")
print(" State: firing")
print(" Summary: Critical memory usage on demo-host-2")
print(" Memory usage: 96.3%")
print(" Recommended action: Immediately add memory or stop critical processes")
print("\n Alert 3: SwapUsageHigh")
print(" Severity: warning")
print(" State: pending")
print(" Summary: High swap usage on demo-host-3")
print(" Swap usage: 82.7%")
print(" Recommended action: Increase swap space or reduce memory consumption")
except Exception as e:
print(f" Alerts query failed: {e}")
# Show dummy alerts if the query failed
print("\n Demo memory alerts (query failed):")
print(" Alert 1: HighMemoryUsage (Demo)")
print(" Severity: warning")
print(" State: firing")
print(" Summary: High memory usage on demo-host")
print(" Recommended action: Free up memory by stopping unnecessary processes")
# Generate a memory health report
print("\n4. Memory health report:")
try:
# Query current memory metrics
mem_total_result = mcp.prometheus_query("node_memory_MemTotal_bytes")
mem_available_result = mcp.prometheus_query("node_memory_MemAvailable_bytes")
mem_usage_pct_result = mcp.prometheus_query("(1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100")
# Check results and generate report
mem_total_data = mem_total_result.get('data', {}).get('result', [])
mem_available_data = mem_available_result.get('data', {}).get('result', [])
mem_usage_pct_data = mem_usage_pct_result.get('data', {}).get('result', [])
if mem_total_data and mem_available_data and mem_usage_pct_data:
print(" Memory Health Report (actual data):")
# Process each host
for i, result in enumerate(mem_usage_pct_data[:3]):
instance = result.get('metric', {}).get('instance', f'host-{i+1}')
# Get memory usage percentage
try:
mem_usage_pct = float(result.get('value', [0, 0])[1])
# Find corresponding total memory value
mem_total = next(
(float(r.get('value', [0, 0])[1]) for r in mem_total_data
if r.get('metric', {}).get('instance') == instance),
0
)
# Find corresponding available memory value
mem_available = next(
(float(r.get('value', [0, 0])[1]) for r in mem_available_data
if r.get('metric', {}).get('instance') == instance),
0
)
# Convert to GB for readability
mem_total_gb = mem_total / (1024 * 1024 * 1024)
mem_available_gb = mem_available / (1024 * 1024 * 1024)
mem_used_gb = mem_total_gb - mem_available_gb
print(f"\n Host: {instance}")
print(f" Total Memory: {mem_total_gb:.2f} GB")
print(f" Used Memory: {mem_used_gb:.2f} GB")
print(f" Available Memory: {mem_available_gb:.2f} GB")
print(f" Memory Usage: {mem_usage_pct:.2f}%")
# Memory health status
if mem_usage_pct > 90:
status = "CRITICAL - Immediate action required"
elif mem_usage_pct > 80:
status = "WARNING - High memory usage"
elif mem_usage_pct > 70:
status = "ATTENTION - Elevated memory usage"
else:
status = "HEALTHY - Normal memory usage"
print(f" Status: {status}")
# Recommendations
print(" Recommendations:")
if mem_usage_pct > 90:
print(" - Immediately free up memory by stopping non-critical processes")
print(" - Consider adding additional memory capacity")
print(" - Check for memory leaks in applications")
elif mem_usage_pct > 80:
print(" - Monitor memory usage closely")
print(" - Plan for potential memory expansion")
print(" - Optimize memory-intensive applications")
elif mem_usage_pct > 70:
print(" - Review memory usage trends")
print(" - Identify memory-intensive applications")
else:
print(" - Continue regular monitoring")
except (ValueError, TypeError, IndexError) as e:
print(f" Error processing data: {e}")
else:
# Generate demo report if no real data
print(" Memory Health Report (demo data):")
hosts = [
{
"name": "demo-host-1",
"total_gb": 16.0,
"usage_pct": 85.6,
"status": "WARNING"
},
{
"name": "demo-host-2",
"total_gb": 32.0,
"usage_pct": 62.4,
"status": "HEALTHY"
},
{
"name": "demo-host-3",
"total_gb": 8.0,
"usage_pct": 93.2,
"status": "CRITICAL"
}
]
for host in hosts:
name = host["name"]
total_gb = host["total_gb"]
usage_pct = host["usage_pct"]
status = host["status"]
used_gb = total_gb * (usage_pct / 100)
available_gb = total_gb - used_gb
print(f"\n Host: {name}")
print(f" Total Memory: {total_gb:.2f} GB")
print(f" Used Memory: {used_gb:.2f} GB")
print(f" Available Memory: {available_gb:.2f} GB")
print(f" Memory Usage: {usage_pct:.2f}%")
print(f" Status: {status}")
print(" Recommendations:")
if status == "CRITICAL":
print(" - Immediately free up memory by stopping non-critical processes")
print(" - Consider adding additional memory capacity")
print(" - Check for memory leaks in applications")
elif status == "WARNING":
print(" - Monitor memory usage closely")
print(" - Plan for potential memory expansion")
print(" - Optimize memory-intensive applications")
else:
print(" - Continue regular monitoring")
except Exception as e:
print(f" Memory report generation failed: {e}")
# Show demo report if query failed
print(" Memory Health Report (demo data - error fallback):")
print(" Host: demo-host-fallback")
print(" Total Memory: 16.00 GB")
print(" Used Memory: 13.60 GB")
print(" Available Memory: 2.40 GB")
print(" Memory Usage: 85.00%")
print(" Status: WARNING - High memory usage")
print(" Recommendations:")
print(" - Monitor memory usage closely")
print(" - Plan for potential memory expansion")
print(" - Optimize memory-intensive applications")
print("\nPrometheus memory monitoring test completed.")
def get_ai_recommendations(mcp, node_alerts, container_alerts):
"""Get AI-powered recommendations for memory alerts"""
try:
# Skip if no model with chat capability is available
available_models = mcp.list_models()
# List available chat models for debugging
chat_models = [model for model in available_models
if any(cap == "chat" for cap in model.get('capabilities', []))]
print("\nAvailable chat models for recommendations:")
for model in chat_models:
print(f" - {model['id']}")
if not chat_models:
return "No AI models with chat capability available for recommendations."
# Get preferred model from environment
preferred_model = os.getenv("OPENAI_CHAT_MODEL", "gpt-4o-mini")
print(f"Preferred model from environment: {preferred_model}")
# First try to find exact model ID match
model_id = None
for model in chat_models:
if model['id'] == preferred_model:
model_id = model['id']
print(f"Using exact match model: {model_id}")
break
# Then try to find model containing the preferred name
if not model_id:
for model in chat_models:
if preferred_model.lower() in model['id'].lower():
model_id = model['id']
print(f"Using partial match model: {model_id}")
break
# Then try to find any OpenAI model
if not model_id:
for model in chat_models:
if "openai" in model['id'].lower():
model_id = model['id']
print(f"Using OpenAI model: {model_id}")
break
# If still no model found, use the first available
if not model_id:
model_id = chat_models[0]['id']
print(f"Using first available model: {model_id}")
# Format memory alert information for the AI
node_alert_text = ""
if node_alerts:
node_alert_text = "Host Memory Alerts:\n"
for i, alert in enumerate(node_alerts):
alert_name = alert.get('labels', {}).get('alertname', 'Unknown')
severity = alert.get('labels', {}).get('severity', 'unknown')
state = alert.get('state', 'unknown')
instance = alert.get('labels', {}).get('instance', 'unknown')
summary = alert.get('annotations', {}).get('summary', 'No summary available')
node_alert_text += f"- Alert {i+1}: {alert_name}\n"
node_alert_text += f" Instance: {instance}\n"
node_alert_text += f" Severity: {severity}\n"
node_alert_text += f" State: {state}\n"
node_alert_text += f" Summary: {summary}\n\n"
container_alert_text = ""
if container_alerts:
container_alert_text = "Container Memory Alerts:\n"
for i, alert in enumerate(container_alerts):
alert_name = alert.get('labels', {}).get('alertname', 'Unknown')
severity = alert.get('labels', {}).get('severity', 'unknown')
state = alert.get('state', 'unknown')
container_name = alert.get('labels', {}).get('name',
alert.get('labels', {}).get('container_name', 'unknown'))
summary = alert.get('annotations', {}).get('summary', 'No summary available')
container_alert_text += f"- Alert {i+1}: {alert_name}\n"
container_alert_text += f" Container: {container_name}\n"
container_alert_text += f" Severity: {severity}\n"
container_alert_text += f" State: {state}\n"
container_alert_text += f" Summary: {summary}\n\n"
# Create the prompt for the AI
prompt = f"""
I need expert advice on handling the following memory alerts from Prometheus monitoring:
{node_alert_text}
{container_alert_text}
Please provide:
1. A brief analysis of what these alerts indicate
2. Detailed technical recommendations for addressing each issue
3. Potential long-term solutions to prevent these issues from recurring
4. Any relevant commands or actions that might help diagnose or fix the problems
Keep your response focused, technical, and actionable.
"""
# Query the AI model
print(f"\nQuerying AI model ({model_id}) for recommendations...")
# Prepare messages for chat
messages = [
{"role": "system", "content": "You are an expert systems administrator specializing in memory management, performance optimization, and containerization technologies."},
{"role": "user", "content": prompt}
]
try:
# Make the API call
response = mcp.process(
input_type="chat",
model_id=model_id,
messages=messages,
max_tokens=1000
)
# Extract the response content
if isinstance(response, dict) and 'choices' in response and len(response['choices']) > 0:
content = response['choices'][0]['message']['content']
return content
else:
print(f"Unexpected response format: {response}")
return "AI recommendation failed: Could not parse response from the model."
except Exception as e:
print(f"Error querying AI model: {e}")
return f"AI recommendation failed: {str(e)}\n\nFallback Recommendations:\n"\
"- For host memory issues: Check running processes with 'top' or 'htop'\n"\
"- For container memory issues: Consider increasing container memory limits\n"\
"- Monitor memory usage trends to identify patterns\n"\
"- Consider implementing memory limits or quotas"
except Exception as e:
return f"AI recommendation failed: {str(e)}\n\nFallback Recommendations:\n"\
"- For host memory issues: Check running processes with 'top' or 'htop'\n"\
"- For container memory issues: Consider increasing container memory limits\n"\
"- Monitor memory usage trends to identify patterns\n"\
"- Consider implementing memory limits or quotas"
if __name__ == "__main__":
prometheus_url = None
if len(sys.argv) > 1:
prometheus_url = sys.argv[1]
test_prometheus(prometheus_url)