Skip to main content
Glama
database.py‱35.8 kB
#!/usr/bin/env python3 """ đŸ—ƒïž Database Manager pour HTTP-MCP Bridge Gestion de la base de donnĂ©es SQLite pour logs, erreurs et historique utilisateur """ import sqlite3 import asyncio import json import os from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from dataclasses import dataclass, asdict from pathlib import Path import logging @dataclass class LogEntry: """EntrĂ©e de log pour la base de donnĂ©es""" id: Optional[int] = None timestamp: str = "" level: str = "" message: str = "" module: str = "" session_id: Optional[str] = None request_id: Optional[str] = None user_ip: Optional[str] = None extra_data: Optional[str] = None # JSON string @dataclass class RequestEntry: """EntrĂ©e de requĂȘte utilisateur pour l'historique""" id: Optional[int] = None timestamp: str = "" session_id: str = "" method: str = "" endpoint: str = "" params: str = "" # JSON string response_time_ms: int = 0 status_code: int = 0 user_ip: Optional[str] = None user_agent: Optional[str] = None @dataclass class ErrorEntry: """EntrĂ©e d'erreur pour le suivi des problĂšmes""" id: Optional[int] = None timestamp: str = "" error_type: str = "" error_message: str = "" stack_trace: Optional[str] = None session_id: Optional[str] = None request_id: Optional[str] = None context: Optional[str] = None # JSON string class DatabaseManager: """Gestionnaire de base de donnĂ©es pour le bridge""" def __init__(self, db_path: str = "bridge_data.db"): self.db_path = db_path self.connection: Optional[sqlite3.Connection] = None async def initialize(self): """Initialise la base de donnĂ©es et crĂ©e les tables""" try: self.connection = sqlite3.connect(self.db_path, check_same_thread=False) self.connection.row_factory = sqlite3.Row # CrĂ©er les tables await self._create_tables() # CrĂ©er les index pour les performances await self._create_indexes() logging.info(f"✅ Base de donnĂ©es initialisĂ©e: {self.db_path}") except Exception as e: logging.error(f"❌ Erreur initialisation BDD: {e}") raise async def _create_tables(self): """CrĂ©e les tables de la base de donnĂ©es""" # Table des logs self.connection.execute(""" CREATE TABLE IF NOT EXISTS logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, level TEXT NOT NULL, message TEXT NOT NULL, module TEXT, session_id TEXT, request_id TEXT, user_ip TEXT, extra_data TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Table des requĂȘtes utilisateur (historique) self.connection.execute(""" CREATE TABLE IF NOT EXISTS requests ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, session_id TEXT NOT NULL, method TEXT NOT NULL, endpoint TEXT NOT NULL, params TEXT, response_time_ms INTEGER DEFAULT 0, status_code INTEGER DEFAULT 0, user_ip TEXT, user_agent TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Table des erreurs self.connection.execute(""" CREATE TABLE IF NOT EXISTS errors ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, error_type TEXT NOT NULL, error_message TEXT NOT NULL, stack_trace TEXT, session_id TEXT, request_id TEXT, context TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Table des statistiques (pour le monitoring) self.connection.execute(""" CREATE TABLE IF NOT EXISTS stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, date TEXT NOT NULL UNIQUE, total_requests INTEGER DEFAULT 0, total_errors INTEGER DEFAULT 0, avg_response_time_ms REAL DEFAULT 0, unique_sessions INTEGER DEFAULT 0, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Tables d'authentification # Table des utilisateurs self.connection.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, email TEXT UNIQUE NOT NULL, full_name TEXT, password_hash TEXT NOT NULL, role TEXT DEFAULT 'user', is_active BOOLEAN DEFAULT 1, failed_login_attempts INTEGER DEFAULT 0, locked_until DATETIME, last_login DATETIME, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Table des sessions utilisateur self.connection.execute(""" CREATE TABLE IF NOT EXISTS user_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, access_token TEXT UNIQUE NOT NULL, refresh_token TEXT UNIQUE NOT NULL, access_token_expires DATETIME NOT NULL, refresh_token_expires DATETIME NOT NULL, user_agent TEXT, ip_address TEXT, is_active BOOLEAN DEFAULT 1, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE ) """) # Table des configurations Home Assistant self.connection.execute(""" CREATE TABLE IF NOT EXISTS ha_configs ( config_id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, name TEXT NOT NULL, url TEXT NOT NULL, token_encrypted TEXT NOT NULL, is_active BOOLEAN DEFAULT 1, last_test DATETIME, last_status TEXT DEFAULT 'unknown', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE ) """) # Table de configuration systĂšme (pour clĂ©s de chiffrement) self.connection.execute(""" CREATE TABLE IF NOT EXISTS system_config ( id INTEGER PRIMARY KEY AUTOINCREMENT, config_type TEXT UNIQUE NOT NULL, encryption_key TEXT, salt TEXT, config_data TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Table des permissions par dĂ©faut pour les outils MCP self.connection.execute(""" CREATE TABLE IF NOT EXISTS default_permissions ( id INTEGER PRIMARY KEY AUTOINCREMENT, tool_name TEXT UNIQUE NOT NULL, can_read BOOLEAN DEFAULT 1, can_write BOOLEAN DEFAULT 0, is_enabled BOOLEAN DEFAULT 1, tool_category TEXT DEFAULT 'general', description TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) # Table des permissions utilisateur pour les outils MCP self.connection.execute(""" CREATE TABLE IF NOT EXISTS user_tool_permissions ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, tool_name TEXT NOT NULL, can_read BOOLEAN DEFAULT 1, can_write BOOLEAN DEFAULT 0, is_enabled BOOLEAN DEFAULT 1, custom_settings TEXT, last_used DATETIME, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE, UNIQUE(user_id, tool_name) ) """) self.connection.commit() async def _create_indexes(self): """CrĂ©e les index pour optimiser les performances""" indexes = [ "CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp)", "CREATE INDEX IF NOT EXISTS idx_logs_level ON logs(level)", "CREATE INDEX IF NOT EXISTS idx_logs_session_id ON logs(session_id)", "CREATE INDEX IF NOT EXISTS idx_requests_timestamp ON requests(timestamp)", "CREATE INDEX IF NOT EXISTS idx_requests_session_id ON requests(session_id)", "CREATE INDEX IF NOT EXISTS idx_requests_endpoint ON requests(endpoint)", "CREATE INDEX IF NOT EXISTS idx_errors_timestamp ON errors(timestamp)", "CREATE INDEX IF NOT EXISTS idx_errors_type ON errors(error_type)", "CREATE INDEX IF NOT EXISTS idx_stats_date ON stats(date)", # Index pour l'authentification "CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)", "CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)", "CREATE INDEX IF NOT EXISTS idx_users_is_active ON users(is_active)", "CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON user_sessions(user_id)", "CREATE INDEX IF NOT EXISTS idx_sessions_access_token ON user_sessions(access_token)", "CREATE INDEX IF NOT EXISTS idx_sessions_refresh_token ON user_sessions(refresh_token)", "CREATE INDEX IF NOT EXISTS idx_sessions_is_active ON user_sessions(is_active)", "CREATE INDEX IF NOT EXISTS idx_sessions_expires ON user_sessions(access_token_expires)", # Index pour les configurations Home Assistant "CREATE INDEX IF NOT EXISTS idx_ha_configs_user_id ON ha_configs(user_id)", "CREATE INDEX IF NOT EXISTS idx_ha_configs_is_active ON ha_configs(is_active)", "CREATE INDEX IF NOT EXISTS idx_system_config_type ON system_config(config_type)", # Index pour les permissions "CREATE INDEX IF NOT EXISTS idx_default_permissions_tool_name ON default_permissions(tool_name)", "CREATE INDEX IF NOT EXISTS idx_default_permissions_category ON default_permissions(tool_category)", "CREATE INDEX IF NOT EXISTS idx_user_permissions_user_id ON user_tool_permissions(user_id)", "CREATE INDEX IF NOT EXISTS idx_user_permissions_tool_name ON user_tool_permissions(tool_name)", "CREATE INDEX IF NOT EXISTS idx_user_permissions_enabled ON user_tool_permissions(is_enabled)", "CREATE INDEX IF NOT EXISTS idx_user_permissions_composite ON user_tool_permissions(user_id, tool_name)" ] for index_sql in indexes: self.connection.execute(index_sql) self.connection.commit() async def insert_log(self, entry: LogEntry): """InsĂšre une entrĂ©e de log""" try: cursor = self.connection.execute(""" INSERT INTO logs (timestamp, level, message, module, session_id, request_id, user_ip, extra_data) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( entry.timestamp, entry.level, entry.message, entry.module, entry.session_id, entry.request_id, entry.user_ip, entry.extra_data )) self.connection.commit() return cursor.lastrowid except Exception as e: logging.error(f"❌ Erreur insertion log: {e}") return None async def insert_request(self, entry: RequestEntry): """InsĂšre une entrĂ©e de requĂȘte utilisateur""" try: cursor = self.connection.execute(""" INSERT INTO requests (timestamp, session_id, method, endpoint, params, response_time_ms, status_code, user_ip, user_agent) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( entry.timestamp, entry.session_id, entry.method, entry.endpoint, entry.params, entry.response_time_ms, entry.status_code, entry.user_ip, entry.user_agent )) self.connection.commit() return cursor.lastrowid except Exception as e: logging.error(f"❌ Erreur insertion requĂȘte: {e}") return None async def insert_error(self, entry: ErrorEntry): """InsĂšre une entrĂ©e d'erreur""" try: cursor = self.connection.execute(""" INSERT INTO errors (timestamp, error_type, error_message, stack_trace, session_id, request_id, context) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( entry.timestamp, entry.error_type, entry.error_message, entry.stack_trace, entry.session_id, entry.request_id, entry.context )) self.connection.commit() return cursor.lastrowid except Exception as e: logging.error(f"❌ Erreur insertion erreur: {e}") return None async def cleanup_old_data(self, days_to_keep: int = 30): """Supprime les donnĂ©es anciennes (plus de X jours)""" try: cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).isoformat() # Compter les entrĂ©es Ă  supprimer logs_count = self.connection.execute( "SELECT COUNT(*) FROM logs WHERE timestamp < ?", (cutoff_date,) ).fetchone()[0] requests_count = self.connection.execute( "SELECT COUNT(*) FROM requests WHERE timestamp < ?", (cutoff_date,) ).fetchone()[0] errors_count = self.connection.execute( "SELECT COUNT(*) FROM errors WHERE timestamp < ?", (cutoff_date,) ).fetchone()[0] # Supprimer les anciennes donnĂ©es self.connection.execute("DELETE FROM logs WHERE timestamp < ?", (cutoff_date,)) self.connection.execute("DELETE FROM requests WHERE timestamp < ?", (cutoff_date,)) self.connection.execute("DELETE FROM errors WHERE timestamp < ?", (cutoff_date,)) self.connection.commit() # Optimiser la base de donnĂ©es (en dehors de la transaction) self.connection.execute("VACUUM") logging.info(f"đŸ§č Nettoyage BDD terminĂ©: {logs_count} logs, {requests_count} requĂȘtes, {errors_count} erreurs supprimĂ©es (>{days_to_keep} jours)") return { "logs_deleted": logs_count, "requests_deleted": requests_count, "errors_deleted": errors_count, "cutoff_date": cutoff_date } except Exception as e: logging.error(f"❌ Erreur nettoyage BDD: {e}") return None async def get_stats(self, days: int = 7) -> Dict[str, Any]: """RĂ©cupĂšre les statistiques des derniers jours""" try: # S'assurer que row_factory est en mode dictionnaire pour cette fonction original_row_factory = self.connection.row_factory self.connection.row_factory = sqlite3.Row cutoff_date = (datetime.now() - timedelta(days=days)).isoformat() # Statistiques des requĂȘtes (fetchone retourne sqlite3.Row) request_stats = self.connection.execute(""" SELECT COUNT(*) as total_requests, AVG(response_time_ms) as avg_response_time, COUNT(DISTINCT session_id) as unique_sessions, MIN(timestamp) as first_request, MAX(timestamp) as last_request FROM requests WHERE timestamp >= ? """, (cutoff_date,)).fetchone() # Statistiques des erreurs par type error_stats = self.connection.execute(""" SELECT error_type, COUNT(*) as count FROM errors WHERE timestamp >= ? GROUP BY error_type ORDER BY count DESC """, (cutoff_date,)).fetchall() # Top endpoints top_endpoints = self.connection.execute(""" SELECT endpoint, COUNT(*) as count, AVG(response_time_ms) as avg_time FROM requests WHERE timestamp >= ? GROUP BY endpoint ORDER BY count DESC LIMIT 10 """, (cutoff_date,)).fetchall() # Restaurer row_factory self.connection.row_factory = original_row_factory return { "period_days": days, "requests": dict(request_stats) if request_stats else {}, "errors_by_type": [dict(row) for row in error_stats], "top_endpoints": [dict(row) for row in top_endpoints], "generated_at": datetime.now().isoformat() } except Exception as e: # Restaurer row_factory en cas d'erreur self.connection.row_factory = original_row_factory logging.error(f"❌ Erreur rĂ©cupĂ©ration stats: {e}") return {} async def import_daily_logs(self, log_file_path: str): """Importe les logs d'un fichier journalier vers la BDD""" try: if not os.path.exists(log_file_path): logging.warning(f"⚠ Fichier log non trouvĂ©: {log_file_path}") return 0 imported_count = 0 with open(log_file_path, 'r', encoding='utf-8') as f: for line in f: try: # Parser la ligne de log (format: timestamp - module - level - message) parts = line.strip().split(' - ', 3) if len(parts) >= 4: timestamp_str, module, level, message = parts # CrĂ©er l'entrĂ©e de log log_entry = LogEntry( timestamp=timestamp_str, level=level, message=message, module=module ) # InsĂ©rer en BDD await self.insert_log(log_entry) imported_count += 1 except Exception as e: logging.warning(f"⚠ Erreur parsing ligne log: {e}") continue logging.info(f"đŸ“„ Import terminĂ©: {imported_count} logs importĂ©s depuis {log_file_path}") return imported_count except Exception as e: logging.error(f"❌ Erreur import logs: {e}") return 0 async def count_requests_since(self, since_time: datetime) -> int: """Compte le nombre de requĂȘtes depuis une date donnĂ©e""" try: cursor = self.connection.cursor() cursor.execute(""" SELECT COUNT(*) as count FROM requests WHERE timestamp >= ? """, (since_time.isoformat(),)) result = cursor.fetchone() return result[0] if result else 0 except Exception as e: logging.error(f"Erreur comptage requĂȘtes depuis {since_time}: {e}") return 0 async def count_requests_between(self, start_time: datetime, end_time: datetime) -> int: """Compte le nombre de requĂȘtes entre deux dates""" try: cursor = self.connection.cursor() cursor.execute(""" SELECT COUNT(*) as count FROM requests WHERE timestamp >= ? AND timestamp < ? """, (start_time.isoformat(), end_time.isoformat())) result = cursor.fetchone() return result[0] if result else 0 except Exception as e: logging.error(f"Erreur comptage requĂȘtes entre {start_time} et {end_time}: {e}") return 0 # ====== MĂ©thodes de gestion de configuration ====== async def save_user_ha_config(self, username: str, url: str, token: str, config_name: str = "default"): """Sauvegarde la configuration Home Assistant pour un utilisateur""" try: # Simple chiffrement du token (base64 pour l'instant) import base64 token_encrypted = base64.b64encode(token.encode()).decode() # InsĂ©rer ou mettre Ă  jour la configuration query = """ INSERT OR REPLACE INTO ha_configs (user_id, name, url, token_encrypted, is_active, updated_at) VALUES ( (SELECT 1), -- User ID fixe pour l'instant ?, ?, ?, 1, CURRENT_TIMESTAMP ) """ self.connection.execute(query, (config_name, url, token_encrypted)) self.connection.commit() logging.info(f"✅ Configuration Home Assistant sauvegardĂ©e pour {username}") return True except Exception as e: logging.error(f"❌ Erreur sauvegarde config HA: {e}") return False async def get_user_ha_config(self, username: str = "beroute"): """RĂ©cupĂšre la configuration Home Assistant active pour un utilisateur""" try: # S'assurer que row_factory est en mode tuple pour cette fonction original_row_factory = self.connection.row_factory self.connection.row_factory = None query = """ SELECT url, token_encrypted, name, last_test, last_status FROM ha_configs WHERE is_active = 1 ORDER BY updated_at DESC LIMIT 1 """ cursor = self.connection.execute(query) result = cursor.fetchone() # Restaurer row_factory self.connection.row_factory = original_row_factory if result: # DĂ©chiffrer le token (result est un tuple) import base64 try: token_decrypted = base64.b64decode(result[1]).decode() # index 1 = token_encrypted except: token_decrypted = "token_invalid" return { "hass_url": result[0], # index 0 = url "hass_token": token_decrypted, "config_name": result[2], # index 2 = name "last_test": result[3], # index 3 = last_test "last_status": result[4], # index 4 = last_status "source": "database" } else: logging.info("â„č Aucune configuration HA en base, utilisation des variables d'environnement") return None except Exception as e: logging.error(f"❌ Erreur rĂ©cupĂ©ration config HA: {e}") # Restaurer row_factory en cas d'erreur self.connection.row_factory = original_row_factory return None async def save_system_config(self, config_type: str, config_data: dict): """Sauvegarde une configuration systĂšme gĂ©nĂ©rale""" try: import json query = """ INSERT OR REPLACE INTO system_config (config_type, config_data, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP) """ await self.execute_async(query, (config_type, json.dumps(config_data))) logging.info(f"✅ Configuration systĂšme '{config_type}' sauvegardĂ©e") return True except Exception as e: logging.error(f"❌ Erreur sauvegarde config systĂšme: {e}") return False async def get_system_config(self, config_type: str): """RĂ©cupĂšre une configuration systĂšme""" try: query = "SELECT config_data FROM system_config WHERE config_type = ?" result = await self.fetch_one_async(query, (config_type,)) if result: import json return json.loads(result['config_data']) return None except Exception as e: logging.error(f"❌ Erreur rĂ©cupĂ©ration config systĂšme: {e}") return None async def close(self): """Ferme la connexion Ă  la base de donnĂ©es""" if self.connection: self.connection.close() self.connection = None logging.info("🔌 Connexion BDD fermĂ©e") async def fetch_one(self, query: str, params: tuple = ()): """ExĂ©cute une requĂȘte et retourne une seule ligne""" try: # Activer row_factory pour obtenir des dictionnaires self.connection.row_factory = sqlite3.Row cursor = self.connection.cursor() cursor.execute(query, params) result = cursor.fetchone() if result: # Convertir Row en dictionnaire return dict(result) return None except Exception as e: logging.error(f"❌ Erreur fetch_one: {e}") return None finally: # Remettre row_factory par dĂ©faut self.connection.row_factory = None async def fetch_all(self, query: str, params: tuple = ()): """ExĂ©cute une requĂȘte et retourne toutes les lignes""" try: # Activer row_factory pour obtenir des dictionnaires self.connection.row_factory = sqlite3.Row cursor = self.connection.cursor() cursor.execute(query, params) results = cursor.fetchall() # Convertir chaque Row en dictionnaire return [dict(row) for row in results] except Exception as e: logging.error(f"❌ Erreur fetch_all: {e}") return [] finally: # Remettre row_factory par dĂ©faut self.connection.row_factory = None async def execute(self, query: str, params: tuple = ()): """ExĂ©cute une requĂȘte sans retour""" try: cursor = self.connection.cursor() cursor.execute(query, params) self.connection.commit() # Si c'est un INSERT, retourner l'ID du dernier insert if query.strip().upper().startswith('INSERT'): return cursor.lastrowid else: return cursor.rowcount except Exception as e: logging.error(f"❌ Erreur execute: {e}") self.connection.rollback() return 0 def fetch_one_sync(self, query: str, params: tuple = ()): """ExĂ©cute une requĂȘte et retourne une seule ligne (synchrone)""" try: # Activer row_factory pour obtenir des dictionnaires self.connection.row_factory = sqlite3.Row cursor = self.connection.cursor() cursor.execute(query, params) result = cursor.fetchone() if result: # Convertir Row en dictionnaire return dict(result) return None except Exception as e: logging.error(f"❌ Erreur fetch_one_sync: {e}") return None finally: # Remettre row_factory par dĂ©faut self.connection.row_factory = None def fetch_all_sync(self, query: str, params: tuple = ()): """ExĂ©cute une requĂȘte et retourne toutes les lignes (synchrone)""" try: # Activer row_factory pour obtenir des dictionnaires self.connection.row_factory = sqlite3.Row cursor = self.connection.cursor() cursor.execute(query, params) results = cursor.fetchall() # Convertir chaque Row en dictionnaire return [dict(row) for row in results] except Exception as e: logging.error(f"❌ Erreur fetch_all_sync: {e}") return [] finally: # Remettre row_factory par dĂ©faut self.connection.row_factory = None def execute_sync(self, query: str, params: tuple = ()): """ExĂ©cute une requĂȘte sans retour (synchrone)""" try: cursor = self.connection.cursor() cursor.execute(query, params) self.connection.commit() # Si c'est un INSERT, retourner l'ID du dernier insert if query.strip().upper().startswith('INSERT'): return cursor.lastrowid else: return cursor.rowcount except Exception as e: logging.error(f"❌ Erreur execute_sync: {e}") self.connection.rollback() return 0 class DailyLogManager: """Gestionnaire des logs journaliers""" def __init__(self, logs_dir: str = "logs", db_manager: DatabaseManager = None): self.logs_dir = Path(logs_dir) self.logs_dir.mkdir(exist_ok=True) self.db_manager = db_manager self.current_log_file = None self.current_date = None def get_log_file_path(self, date: datetime = None) -> Path: """Retourne le chemin du fichier log pour une date donnĂ©e""" if date is None: date = datetime.now() filename = f"bridge_{date.strftime('%Y-%m-%d')}.log" return self.logs_dir / filename async def rotate_logs_if_needed(self): """Effectue la rotation des logs si on change de jour""" current_date = datetime.now().date() if self.current_date != current_date: # Nouveau jour = rotation nĂ©cessaire if self.current_log_file and self.current_date: # Archiver l'ancien fichier await self.archive_previous_day() # Commencer nouveau fichier self.current_date = current_date self.current_log_file = self.get_log_file_path() logging.info(f"🔄 Rotation logs: nouveau fichier {self.current_log_file}") async def archive_previous_day(self): """Archive les logs du jour prĂ©cĂ©dent en BDD et supprime le fichier""" try: if not self.db_manager: logging.warning("⚠ Pas de DB manager pour archivage") return previous_date = self.current_date - timedelta(days=1) if self.current_date else datetime.now().date() - timedelta(days=1) previous_log_file = self.get_log_file_path(datetime.combine(previous_date, datetime.min.time())) if previous_log_file.exists(): # Importer en BDD imported_count = await self.db_manager.import_daily_logs(str(previous_log_file)) # Supprimer le fichier aprĂšs import rĂ©ussi if imported_count > 0: previous_log_file.unlink() logging.info(f"đŸ—ƒïž Logs archivĂ©s et fichier supprimĂ©: {previous_log_file}") else: logging.warning(f"⚠ Aucun log importĂ© depuis {previous_log_file}") except Exception as e: logging.error(f"❌ Erreur archivage logs: {e}") def get_current_log_file(self) -> Path: """Retourne le fichier log actuel""" if not self.current_log_file: self.current_date = datetime.now().date() self.current_log_file = self.get_log_file_path() return self.current_log_file # Instance globale db_manager = DatabaseManager() log_manager = DailyLogManager(db_manager=db_manager) async def setup_database(): """Initialise le systĂšme de base de donnĂ©es""" await db_manager.initialize() await log_manager.rotate_logs_if_needed() async def cleanup_old_data_task(): """TĂąche de nettoyage automatique des anciennes donnĂ©es""" while True: try: # Attendre jusqu'Ă  2h du matin now = datetime.now() next_cleanup = now.replace(hour=2, minute=0, second=0, microsecond=0) if now >= next_cleanup: next_cleanup += timedelta(days=1) wait_seconds = (next_cleanup - now).total_seconds() await asyncio.sleep(wait_seconds) # Effectuer le nettoyage logging.info("đŸ§č DĂ©but du nettoyage automatique des donnĂ©es anciennes") result = await db_manager.cleanup_old_data(days_to_keep=30) if result: logging.info(f"✅ Nettoyage terminĂ©: {result}") # Rotation des logs await log_manager.rotate_logs_if_needed() except Exception as e: logging.error(f"❌ Erreur tĂąche nettoyage: {e}") await asyncio.sleep(3600) # RĂ©essayer dans 1h en cas d'erreur if __name__ == "__main__": # Test du systĂšme de base de donnĂ©es async def test_database(): await setup_database() # Test insertion log log_entry = LogEntry( timestamp=datetime.now().isoformat(), level="INFO", message="Test log message", module="test_module", session_id="test-session-123" ) log_id = await db_manager.insert_log(log_entry) print(f"Log insĂ©rĂ© avec ID: {log_id}") # Test statistiques stats = await db_manager.get_stats(days=7) print(f"Statistiques: {json.dumps(stats, indent=2)}") await db_manager.close() asyncio.run(test_database())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jonathan97480/McpHomeAssistant'

If you have feedback or need assistance with the MCP directory API, please join our Discord server