Model Control Plane (MCP) Server
by dvladimirov
- scripts
#!/usr/bin/env python3
"""
MCP Memory Alerting - Handle and notify about Prometheus memory alerts
"""
import os
import sys
import time
import json
import argparse
import smtplib
import subprocess
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
# Import the MCP client
from langflow import MCPAIComponent
class MemoryAlertHandler:
"""Handle memory alerts from Prometheus MCP integration"""
def __init__(self, mcp_server_url: str = "http://localhost:8000",
prometheus_url: str = "http://localhost:9090",
check_interval: int = 60):
"""Initialize the alert handler
Args:
mcp_server_url: URL of the MCP server
prometheus_url: URL of the Prometheus server
check_interval: Alert check interval in seconds
"""
self.mcp_server_url = mcp_server_url
self.prometheus_url = prometheus_url
self.check_interval = check_interval
self.mcp = MCPAIComponent(mcp_server_url=mcp_server_url)
# Set the Prometheus URL in the environment
os.environ["PROMETHEUS_URL"] = prometheus_url
# Keep track of already notified alerts
self.notified_alerts = set()
# Configuration for notification methods
self.notification_config = {
"email": {
"enabled": False,
"smtp_server": "smtp.example.com",
"smtp_port": 587,
"username": "",
"password": "",
"from_addr": "alerts@example.com",
"to_addrs": ["admin@example.com"]
},
"slack": {
"enabled": False,
"webhook_url": ""
},
"log": {
"enabled": True,
"file": "memory_alerts.log"
},
"system": {
"enabled": True, # For desktop notifications
}
}
def fetch_alerts(self) -> List[Dict[str, Any]]:
"""Fetch memory-related alerts from Prometheus"""
try:
alerts_result = self.mcp.prometheus_get_alerts()
if alerts_result.get('status') == 'success':
all_alerts = alerts_result.get('data', {}).get('alerts', [])
# Filter for memory-related alerts
memory_alerts = [
alert for alert in all_alerts
if 'alertname' in alert.get('labels', {}) and
any(term in alert.get('labels', {}).get('alertname', '').lower()
for term in ['memory', 'mem', 'swap', 'container'])
]
# Format alerts for handling
formatted_alerts = []
for alert in memory_alerts:
alert_name = alert.get('labels', {}).get('alertname', 'Unknown')
severity = alert.get('labels', {}).get('severity', 'unknown')
state = alert.get('state', 'unknown')
# Get target (container or instance)
if 'container' in alert_name.lower() or 'name' in alert.get('labels', {}):
target = alert.get('labels', {}).get('name',
alert.get('labels', {}).get('container_name', 'unknown'))
target_type = "container"
else:
target = alert.get('labels', {}).get('instance', 'unknown')
target_type = "host"
summary = alert.get('annotations', {}).get('summary', 'No summary available')
description = alert.get('annotations', {}).get('description', '')
# Create a unique alert ID
alert_id = f"{alert_name}_{target}_{severity}"
# Format start time if available
starts_at = alert.get('startsAt', '')
if starts_at:
try:
# Parse ISO format timestamp
starts_at_dt = datetime.fromisoformat(starts_at.replace('Z', '+00:00'))
formatted_time = starts_at_dt.strftime('%Y-%m-%d %H:%M:%S UTC')
except (ValueError, TypeError):
formatted_time = starts_at
else:
formatted_time = "Unknown"
formatted_alerts.append({
"id": alert_id,
"name": alert_name,
"severity": severity,
"state": state,
"target": target,
"target_type": target_type,
"summary": summary,
"description": description,
"starts_at": formatted_time
})
return formatted_alerts
else:
print(f"Error fetching alerts: {alerts_result.get('error', 'Unknown error')}")
return []
except Exception as e:
print(f"Exception fetching alerts: {str(e)}")
return []
def get_memory_info(self, alert: Dict[str, Any]) -> Dict[str, Any]:
"""Get additional memory information for an alert target"""
try:
target = alert["target"]
target_type = alert["target_type"]
memory_info = {}
if target_type == "host":
# Query host memory metrics
query = f'(1 - node_memory_MemAvailable_bytes{{instance="{target}"}} / node_memory_MemTotal_bytes{{instance="{target}"}}) * 100'
result = self.mcp.prometheus_query(query)
if result.get('status') == 'success':
data_result = result.get('data', {}).get('result', [])
if data_result:
usage_pct = float(data_result[0].get('value', [0, '0'])[1])
memory_info["usage_percent"] = usage_pct
# Get total memory
query = f'node_memory_MemTotal_bytes{{instance="{target}"}}'
result = self.mcp.prometheus_query(query)
if result.get('status') == 'success':
data_result = result.get('data', {}).get('result', [])
if data_result:
total_bytes = float(data_result[0].get('value', [0, '0'])[1])
memory_info["total_gb"] = total_bytes / (1024 * 1024 * 1024)
elif target_type == "container":
# Query container memory metrics
query = f'container_memory_usage_bytes{{container_name="{target}"}}'
result = self.mcp.prometheus_query(query)
if result.get('status') == 'success':
data_result = result.get('data', {}).get('result', [])
if data_result:
usage_bytes = float(data_result[0].get('value', [0, '0'])[1])
memory_info["usage_mb"] = usage_bytes / (1024 * 1024)
# Get memory limit
query = f'container_spec_memory_limit_bytes{{container_name="{target}"}}'
result = self.mcp.prometheus_query(query)
if result.get('status') == 'success':
data_result = result.get('data', {}).get('result', [])
if data_result:
limit_bytes = float(data_result[0].get('value', [0, '0'])[1])
if limit_bytes > 0:
memory_info["limit_mb"] = limit_bytes / (1024 * 1024)
if "usage_mb" in memory_info:
memory_info["usage_percent"] = (memory_info["usage_mb"] * 1024 * 1024 / limit_bytes) * 100
return memory_info
except Exception as e:
print(f"Error getting memory info: {e}")
return {}
def format_alert_message(self, alert: Dict[str, Any], memory_info: Dict[str, Any]) -> str:
"""Format an alert for notification"""
message = []
message.append(f"MEMORY ALERT: {alert['name']}")
message.append(f"Severity: {alert['severity'].upper()}")
message.append(f"Target: {alert['target']} ({alert['target_type']})")
message.append(f"Status: {alert['state'].upper()}")
message.append(f"Started: {alert['starts_at']}")
message.append(f"Summary: {alert['summary']}")
if alert['description']:
message.append(f"Description: {alert['description']}")
# Add memory info
message.append("\nCurrent Memory Details:")
if alert['target_type'] == 'host':
if 'usage_percent' in memory_info:
message.append(f"Usage: {memory_info['usage_percent']:.1f}%")
if 'total_gb' in memory_info:
message.append(f"Total Memory: {memory_info['total_gb']:.2f} GB")
elif alert['target_type'] == 'container':
if 'usage_mb' in memory_info:
message.append(f"Usage: {memory_info['usage_mb']:.2f} MB")
if 'limit_mb' in memory_info:
message.append(f"Limit: {memory_info['limit_mb']:.2f} MB")
if 'usage_percent' in memory_info:
message.append(f"Percent: {memory_info['usage_percent']:.1f}%")
# Add recommendations
message.append("\nRecommended Actions:")
if 'high' in alert['name'].lower() or 'critical' in alert['name'].lower():
if alert['target_type'] == 'host':
message.append("- Investigate and stop unnecessary processes")
message.append("- Consider adding more memory to the system")
message.append("- Check for memory leaks in applications")
else: # container
message.append("- Increase container memory limit")
message.append("- Optimize container application memory usage")
message.append("- Consider scaling horizontally with more containers")
elif 'medium' in alert['name'].lower():
message.append("- Monitor the situation for further degradation")
message.append("- Plan for potential memory expansion")
message.append(f"\nPrometheus URL: {self.prometheus_url}/alerts")
return "\n".join(message)
def send_email_notification(self, alert: Dict[str, Any], message: str) -> bool:
"""Send an email notification for an alert"""
config = self.notification_config["email"]
if not config["enabled"]:
return False
try:
msg = MIMEMultipart()
msg["From"] = config["from_addr"]
msg["To"] = ", ".join(config["to_addrs"])
msg["Subject"] = f"Memory Alert: {alert['name']} - {alert['severity'].upper()}"
msg.attach(MIMEText(message, "plain"))
server = smtplib.SMTP(config["smtp_server"], config["smtp_port"])
server.starttls()
if config["username"] and config["password"]:
server.login(config["username"], config["password"])
server.send_message(msg)
server.quit()
return True
except Exception as e:
print(f"Error sending email: {e}")
return False
def send_slack_notification(self, alert: Dict[str, Any], message: str) -> bool:
"""Send a Slack notification for an alert"""
config = self.notification_config["slack"]
if not config["enabled"] or not config["webhook_url"]:
return False
try:
import requests
# Color based on severity
color = {
"critical": "#FF0000", # Red
"warning": "#FFA500", # Orange
"info": "#0000FF" # Blue
}.get(alert["severity"].lower(), "#808080") # Default gray
payload = {
"attachments": [
{
"fallback": f"Memory Alert: {alert['name']}",
"color": color,
"title": f"Memory Alert: {alert['name']}",
"text": message.replace("\n", "\n>"),
"footer": "MCP Memory Alerting",
"ts": int(time.time())
}
]
}
response = requests.post(
config["webhook_url"],
json=payload,
headers={"Content-Type": "application/json"}
)
return response.status_code == 200
except Exception as e:
print(f"Error sending Slack notification: {e}")
return False
def send_system_notification(self, alert: Dict[str, Any], message: str) -> bool:
"""Send a system notification (desktop notification)"""
config = self.notification_config["system"]
if not config["enabled"]:
return False
try:
title = f"Memory Alert: {alert['name']} - {alert['severity'].upper()}"
summary = f"{alert['target']} ({alert['target_type']}): {alert['summary']}"
# Try different notification methods based on platform
if sys.platform == "linux" or sys.platform == "linux2":
# Linux: try notify-send
subprocess.call(["notify-send", title, summary])
return True
elif sys.platform == "darwin":
# macOS: try osascript
apple_script = f'display notification "{summary}" with title "{title}"'
subprocess.call(["osascript", "-e", apple_script])
return True
elif sys.platform == "win32":
# Windows: try PowerShell
script = f"""
[Windows.UI.Notifications.ToastNotificationManager, Windows.UI.Notifications, ContentType = WindowsRuntime] | Out-Null
[Windows.Data.Xml.Dom.XmlDocument, Windows.Data.Xml.Dom.XmlDocument, ContentType = WindowsRuntime] | Out-Null
$app_id = 'MCP.MemoryAlertHandler'
$content = @"
<toast>
<visual>
<binding template="ToastText02">
<text id="1">{title}</text>
<text id="2">{summary}</text>
</binding>
</visual>
</toast>
"@
$xml = New-Object Windows.Data.Xml.Dom.XmlDocument
$xml.LoadXml($content)
[Windows.UI.Notifications.ToastNotificationManager]::CreateToastNotifier($app_id).Show($xml)
"""
subprocess.call(["powershell", "-Command", script])
return True
except Exception as e:
print(f"Error sending system notification: {e}")
return False
def log_alert(self, alert: Dict[str, Any], message: str) -> bool:
"""Log an alert to a file"""
config = self.notification_config["log"]
if not config["enabled"]:
return False
try:
log_file = config["file"]
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(log_file, "a") as f:
f.write(f"=== ALERT LOGGED AT {timestamp} ===\n")
f.write(message)
f.write("\n\n")
return True
except Exception as e:
print(f"Error logging alert: {e}")
return False
def process_alerts(self, dry_run: bool = False) -> None:
"""Process and handle alerts"""
alerts = self.fetch_alerts()
# Only process alerts in FIRING state
firing_alerts = [a for a in alerts if a["state"].lower() == "firing"]
if not firing_alerts:
print(f"No firing alerts found at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
return
print(f"Found {len(firing_alerts)} firing alerts")
for alert in firing_alerts:
alert_id = alert["id"]
# Skip if we've already notified about this alert
if alert_id in self.notified_alerts:
continue
# Get additional memory information
memory_info = self.get_memory_info(alert)
# Format alert message
message = self.format_alert_message(alert, memory_info)
print(f"\nProcessing alert: {alert['name']} ({alert['severity']})")
print(f"Target: {alert['target']} ({alert['target_type']})")
if dry_run:
print("DRY RUN - Would send notifications:")
print(message)
else:
# Send notifications based on configured methods
notification_sent = False
# Log to file
if self.log_alert(alert, message):
print(" - Alert logged to file")
notification_sent = True
# Send email
if self.notification_config["email"]["enabled"]:
if self.send_email_notification(alert, message):
print(" - Email notification sent")
notification_sent = True
# Send Slack notification
if self.notification_config["slack"]["enabled"]:
if self.send_slack_notification(alert, message):
print(" - Slack notification sent")
notification_sent = True
# Send system notification
if self.notification_config["system"]["enabled"]:
if self.send_system_notification(alert, message):
print(" - System notification sent")
notification_sent = True
if notification_sent:
# Add to notified set
self.notified_alerts.add(alert_id)
else:
print(" - No notifications sent")
def run_alerting(self) -> None:
"""Run the alerting system"""
print(f"Starting memory alert monitoring at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"MCP Server: {self.mcp_server_url}")
print(f"Prometheus: {self.prometheus_url}")
print(f"Check interval: {self.check_interval} seconds")
try:
while True:
self.process_alerts()
# Reset notification list after some time to allow re-notification
# for alerts that are still firing after 1 hour
for alert_id in list(self.notified_alerts):
if alert_id not in [a["id"] for a in self.fetch_alerts() if a["state"].lower() == "firing"]:
self.notified_alerts.remove(alert_id)
print(f"Alert {alert_id} resolved, removed from notification list")
time.sleep(self.check_interval)
except KeyboardInterrupt:
print("\nAlert monitoring stopped.")
def main():
"""Main function"""
parser = argparse.ArgumentParser(description="MCP Memory Alerting")
parser.add_argument("--mcp-url", default=os.getenv("MCP_SERVER_URL", "http://localhost:8000"),
help="MCP server URL")
parser.add_argument("--prometheus-url", default=os.getenv("PROMETHEUS_URL", "http://localhost:9090"),
help="Prometheus server URL")
parser.add_argument("--interval", type=int, default=60,
help="Alert check interval in seconds")
parser.add_argument("--log-file", default="memory_alerts.log",
help="Log file path")
parser.add_argument("--email", action="store_true",
help="Enable email notifications")
parser.add_argument("--email-to",
help="Email recipients (comma-separated)")
parser.add_argument("--slack", action="store_true",
help="Enable Slack notifications")
parser.add_argument("--slack-webhook",
help="Slack webhook URL")
parser.add_argument("--dry-run", action="store_true",
help="Don't send notifications, just print them")
parser.add_argument("--once", action="store_true",
help="Process alerts once and exit")
args = parser.parse_args()
alert_handler = MemoryAlertHandler(
mcp_server_url=args.mcp_url,
prometheus_url=args.prometheus_url,
check_interval=args.interval
)
# Configure notification methods
alert_handler.notification_config["log"]["file"] = args.log_file
if args.email:
alert_handler.notification_config["email"]["enabled"] = True
if args.email_to:
alert_handler.notification_config["email"]["to_addrs"] = args.email_to.split(",")
if args.slack:
alert_handler.notification_config["slack"]["enabled"] = True
if args.slack_webhook:
alert_handler.notification_config["slack"]["webhook_url"] = args.slack_webhook
if args.once:
alert_handler.process_alerts(dry_run=args.dry_run)
else:
alert_handler.run_alerting()
if __name__ == "__main__":
main()