Skip to main content
Glama

MCP AI Monitor

by MedusaSH
monitor_ai.py16.4 kB
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Module de surveillance en temps réel avec IA pour MCP_AI_Monitor. Détecte les anomalies dans l'utilisation CPU/RAM et envoie des alertes. """ import os import time import joblib import psutil import numpy as np import pandas as pd from datetime import datetime from plyer import notification from colorama import init, Fore, Style, Back # Initialiser colorama init(autoreset=True) # Constantes MODEL_DIR = "model" MODEL_FILE = os.path.join(MODEL_DIR, "mcp_model.pkl") FEATURE_NAMES_FILE = os.path.join(MODEL_DIR, "feature_names.pkl") HISTORY_SIZE = 20 # Nombre de points de données à conserver dans l'historique INTERVAL = 5 # secondes ALERT_COOLDOWN = 60 # secondes entre les alertes # Seuils pour les ressources système CPU_THRESHOLD = 80 # Seuil d'utilisation CPU considéré comme élevé (%) RAM_THRESHOLD = 80 # Seuil d'utilisation RAM considéré comme élevé (%) # Périodes d'apprentissage et d'adaptation LEARNING_PERIOD = 10 # Nombre de points de données à collecter avant d'activer les alertes ADAPTATION_FACTOR = 0.8 # Facteur pour adapter les seuils en fonction de l'utilisation normale class MCPMonitor: """Classe principale pour la surveillance du système avec IA.""" def __init__(self): """Initialise le moniteur MCP.""" self.model = self._load_model() self.feature_names = self._load_feature_names() self.last_alert_time = 0 self.data_history = [] self.learning_data = [] self.anomaly_scores = [] # Pour la détection d'anomalies adaptative self.anomaly_threshold = -0.15 # Seuil de base pour les scores d'anomalies (moins négatif = plus sensible) self.baseline_cpu = None self.baseline_ram = None self.is_learning = True self.known_apps = {} # Pour mémoriser les applications connues et leur impact def _load_model(self): """Charge le modèle IA depuis le fichier.""" if not os.path.exists(MODEL_FILE): print(f"{Fore.RED}Erreur: Le modèle {MODEL_FILE} n'existe pas. Exécutez train_model.py d'abord.") raise FileNotFoundError(f"Le modèle {MODEL_FILE} n'existe pas. Exécutez train_model.py d'abord.") model = joblib.load(MODEL_FILE) print(f"{Fore.GREEN}Modèle chargé depuis: {MODEL_FILE}") return model def _load_feature_names(self): """Charge les noms des caractéristiques depuis le fichier.""" if not os.path.exists(FEATURE_NAMES_FILE): print(f"{Fore.YELLOW}Attention: Fichier de noms de caractéristiques non trouvé. Utilisation de noms par défaut.") return ["cpu_percent", "ram_percent"] feature_names = joblib.load(FEATURE_NAMES_FILE) print(f"{Fore.GREEN}Noms des caractéristiques chargés depuis: {FEATURE_NAMES_FILE}") return feature_names def collect_current_data(self): """Collecte les données système actuelles.""" cpu_percent = psutil.cpu_percent(interval=1) ram_percent = psutil.virtual_memory().percent timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Récupérer plus d'informations pour un diagnostic plus détaillé cpu_per_core = psutil.cpu_percent(interval=None, percpu=True) ram_used = psutil.virtual_memory().used / (1024 * 1024 * 1024) # En Go ram_total = psutil.virtual_memory().total / (1024 * 1024 * 1024) # En Go # Récupérer les processus actifs (pour identifier les applications ouvertes) active_processes = [] for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']): try: pinfo = proc.info if pinfo['cpu_percent'] > 1.0 or pinfo['memory_percent'] > 1.0: # Filtrer les processus actifs active_processes.append(pinfo) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass # Trier par utilisation CPU active_processes.sort(key=lambda x: x['cpu_percent'], reverse=True) data = { "timestamp": timestamp, "cpu_percent": cpu_percent, "ram_percent": ram_percent, "cpu_per_core": cpu_per_core, "ram_used": round(ram_used, 2), "ram_total": round(ram_total, 2), "active_processes": active_processes[:5] # Top 5 processus } # Ajouter à l'historique et limiter sa taille self.data_history.append(data) if len(self.data_history) > HISTORY_SIZE: self.data_history.pop(0) # Si nous sommes en phase d'apprentissage, ajoutons à learning_data if len(self.learning_data) < LEARNING_PERIOD: self.learning_data.append(data) # Si nous avons assez de données d'apprentissage, calculer les valeurs de référence if len(self.learning_data) == LEARNING_PERIOD: self._calculate_baselines() self.is_learning = False print(f"{Fore.CYAN}Phase d'apprentissage terminée. Référence CPU: {self.baseline_cpu:.1f}%, RAM: {self.baseline_ram:.1f}%") return data def _calculate_baselines(self): """Calcule les valeurs de référence à partir des données d'apprentissage.""" if not self.learning_data: return # Calculer la moyenne et l'écart-type de l'utilisation CPU et RAM cpu_values = [d["cpu_percent"] for d in self.learning_data] ram_values = [d["ram_percent"] for d in self.learning_data] self.baseline_cpu = np.mean(cpu_values) self.baseline_ram = np.mean(ram_values) self.cpu_std = np.std(cpu_values) if len(cpu_values) > 1 else 5.0 self.ram_std = np.std(ram_values) if len(ram_values) > 1 else 5.0 # Ajuster le seuil d'anomalie en fonction des données d'apprentissage # Plus la variance est élevée, plus le seuil doit être bas variance_factor = min(1.0, max(0.1, 0.5 / (self.cpu_std / 10.0))) self.anomaly_threshold = -0.15 * variance_factor def is_application_launch(self, data): """Détecte si un changement soudain est dû au lancement d'une application connue.""" if not data.get("active_processes"): return False, None # Vérifier si un nouveau processus avec une utilisation CPU importante est apparu for proc in data["active_processes"]: proc_name = proc["name"] if proc_name not in self.known_apps and proc["cpu_percent"] > 10: # C'est potentiellement une nouvelle application return True, proc_name return False, None def learn_application_impact(self, app_name, impact_cpu, impact_ram): """Mémorise l'impact d'une application sur les ressources système.""" self.known_apps[app_name] = { "cpu_impact": impact_cpu, "ram_impact": impact_ram, "first_seen": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } print(f"{Fore.CYAN}Application mémorisée: {app_name} (Impact CPU: {impact_cpu:.1f}%, RAM: {impact_ram:.1f}%)") def detect_anomaly(self, data): """Détecte si les données actuelles représentent une anomalie.""" # En phase d'apprentissage, pas d'alerte d'anomalie if self.is_learning: return False, 0.0 # Créer un DataFrame avec les noms de caractéristiques features_df = pd.DataFrame([[data["cpu_percent"], data["ram_percent"]]], columns=self.feature_names) # Faire la prédiction prediction = self.model.predict(features_df)[0] score = self.model.score_samples(features_df)[0] # Ajouter le score à l'historique self.anomaly_scores.append(score) if len(self.anomaly_scores) > HISTORY_SIZE: self.anomaly_scores.pop(0) # Récupérer les seuils adaptés aux valeurs de référence cpu_anomaly_threshold = min(CPU_THRESHOLD, self.baseline_cpu * 2) if self.baseline_cpu else CPU_THRESHOLD ram_anomaly_threshold = min(RAM_THRESHOLD, self.baseline_ram * 2) if self.baseline_ram else RAM_THRESHOLD # Détecter si un lancement d'application a eu lieu app_launch, app_name = self.is_application_launch(data) # Si c'est un lancement d'application, calculer l'impact if app_launch and len(self.data_history) > 1: prev_data = self.data_history[-2] cpu_impact = data["cpu_percent"] - prev_data["cpu_percent"] ram_impact = data["ram_percent"] - prev_data["ram_percent"] if cpu_impact > 10 or ram_impact > 5: # Impact significatif self.learn_application_impact(app_name, cpu_impact, ram_impact) # On ne considère pas cela comme une anomalie return False, score # Critères pour une vraie anomalie: # 1. Le modèle prédit une anomalie (score < seuil) ET # 2. Soit l'utilisation CPU est anormalement élevée, soit l'utilisation RAM est anormalement élevée is_cpu_high = data["cpu_percent"] > cpu_anomaly_threshold is_ram_high = data["ram_percent"] > ram_anomaly_threshold # Les seuils sont dynamiquement adaptés en fonction des valeurs de référence is_cpu_spike = False is_ram_spike = False if self.baseline_cpu is not None and self.cpu_std is not None: is_cpu_spike = data["cpu_percent"] > (self.baseline_cpu + 2 * self.cpu_std) if self.baseline_ram is not None and self.ram_std is not None: is_ram_spike = data["ram_percent"] > (self.baseline_ram + 2 * self.ram_std) # Une anomalie est soit un pic dans les ressources, soit une valeur absolue élevée resources_anomaly = is_cpu_high or is_ram_high or is_cpu_spike or is_ram_spike # Anomalie si 1) le score est faible ET 2) ressources anormales is_anomaly = (score < self.anomaly_threshold) and resources_anomaly return is_anomaly, score def get_trend_info(self): """Calcule les tendances à partir de l'historique des données.""" if len(self.data_history) < 2: return "Pas assez de données pour calculer la tendance." current = self.data_history[-1] previous = self.data_history[0] cpu_change = current["cpu_percent"] - previous["cpu_percent"] ram_change = current["ram_percent"] - previous["ram_percent"] duration = min(len(self.data_history) * INTERVAL, HISTORY_SIZE * INTERVAL) trend_info = f"Tendance (sur {duration}s):\n" trend_info += f"CPU: {'↑' if cpu_change > 0 else '↓'} {abs(cpu_change):.1f}%\n" trend_info += f"RAM: {'↑' if ram_change > 0 else '↓'} {abs(ram_change):.1f}%" return trend_info def format_score_explanation(self, score): """Fournit une explication du score d'anomalie.""" if score > -0.05: return f"{Fore.GREEN}Normal" elif score > -0.15: return f"{Fore.CYAN}Légèrement inhabituel" elif score > -0.3: return f"{Fore.YELLOW}Inhabituel" elif score > -0.5: return f"{Fore.MAGENTA}Très inhabituel" else: return f"{Fore.RED}Extrêmement anormal" def send_alert(self, data, score): """Envoie une alerte en cas d'anomalie détectée.""" current_time = time.time() # Vérification du délai de refroidissement if current_time - self.last_alert_time < ALERT_COOLDOWN: return # Récupérer les informations supplémentaires trend_info = self.get_trend_info() top_processes = self.get_top_processes() title = "MCP_AI_Monitor: Anomalie Détectée" message = f"CPU: {data['cpu_percent']}%, RAM: {data['ram_percent']}%\n" message += f"Score d'anomalie: {score:.4f}\n" message += f"{trend_info}\n\n" message += f"Processus les plus gourmands:\n{top_processes}" # Version courte pour la notification short_message = f"CPU: {data['cpu_percent']}%, RAM: {data['ram_percent']}%\nScore: {score:.4f}" # Envoi de la notification notification.notify( title=title, message=short_message, app_name="MCP_AI_Monitor", timeout=10 ) print(f"\n{Back.RED}{Fore.WHITE}[ALERTE] {title}{Style.RESET_ALL}") print(f"{Fore.RED}[ALERTE] {message}") self.last_alert_time = current_time def get_top_processes(self, n=3): """Récupère les processus qui consomment le plus de ressources.""" processes = [] for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']): try: pinfo = proc.info processes.append(pinfo) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass # Trier par utilisation CPU puis RAM processes.sort(key=lambda x: (x['cpu_percent'], x['memory_percent']), reverse=True) result = "" for i, proc in enumerate(processes[:n]): result += f"{proc['name']} (PID: {proc['pid']}): CPU {proc['cpu_percent']:.1f}%, MEM {proc['memory_percent']:.1f}%\n" return result def start_monitoring(self): """Démarre la surveillance en temps réel.""" print(f"{Fore.CYAN}MCP_AI_Monitor: Surveillance en temps réel démarrée...") print(f"{Fore.CYAN}Intervalle: {INTERVAL} secondes") print(f"{Fore.YELLOW}Phase d'apprentissage: {Fore.WHITE}{LEARNING_PERIOD} échantillons") print(f"{Fore.YELLOW}Appuyez sur Ctrl+C pour arrêter.") try: while True: # Collecte des données data = self.collect_current_data() # Détection d'anomalies is_anomaly, score = self.detect_anomaly(data) # Formater le status avec code couleur if self.is_learning: status = f"{Fore.BLUE}APPRENTISSAGE" elif is_anomaly: status = f"{Back.RED}{Fore.WHITE}ANOMALIE" else: status = f"{Fore.GREEN}normal" # Formater le score score_explanation = self.format_score_explanation(score) # Affichage des données avec couleurs print(f"[{Fore.CYAN}{data['timestamp']}{Style.RESET_ALL}] " f"CPU: {Fore.YELLOW}{data['cpu_percent']}%{Style.RESET_ALL}, " f"RAM: {Fore.MAGENTA}{data['ram_percent']}%{Style.RESET_ALL} - " f"{status}{Style.RESET_ALL} " f"(score: {score:.4f} - {score_explanation}{Style.RESET_ALL})") # Envoi d'alerte si anomalie if is_anomaly: self.send_alert(data, score) time.sleep(INTERVAL) except KeyboardInterrupt: print(f"\n{Fore.YELLOW}Surveillance arrêtée.{Style.RESET_ALL}") def main(): """Fonction principale pour la surveillance IA.""" try: monitor = MCPMonitor() monitor.start_monitoring() except Exception as e: print(f"{Fore.RED}Erreur lors de la surveillance: {str(e)}") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MedusaSH/MCP_AI_Monitor'

If you have feedback or need assistance with the MCP directory API, please join our Discord server