Skip to main content
Glama
bridge_server.py‱116 kB
#!/usr/bin/env python3 """ 🌉 HTTP-MCP Bridge Server FastAPI server that exposes MCP protocol via REST endpoints with queue management """ import asyncio import json import uuid import time from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Union from dataclasses import dataclass, asdict from enum import Enum import logging from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Header, Request, Depends, status, WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse, HTMLResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel, Field import uvicorn # Import MCP components import sys import os # Charger les variables d'environnement depuis le fichier .env try: from dotenv import load_dotenv load_dotenv() print(f"📁 Fichier .env chargĂ© - HASS_URL: {os.getenv('HASS_URL', 'Non dĂ©fini')}") except ImportError: print("⚠ Module python-dotenv non installĂ©, variables d'environnement non chargĂ©es") except Exception as e: print(f"⚠ Erreur lors du chargement du .env: {e}") # Ajouter le chemin pour importer notre serveur MCP sys.path.append(os.path.dirname(os.path.abspath(__file__))) # Import du systĂšme de base de donnĂ©es from database import db_manager, log_manager, setup_database, cleanup_old_data_task, LogEntry, RequestEntry, ErrorEntry # Import du systĂšme de cache et circuit breaker from cache_manager import cache_manager, CircuitBreakerOpenError # Import du systĂšme d'authentification from auth_manager import auth_manager, UserCreate, UserLogin, UserResponse, TokenResponse, UserRole, RefreshRequest # Import du gestionnaire de configuration Home Assistant from ha_config_manager import ha_config_manager, HAConfigCreate, HAConfigUpdate, HAConfigResponse, HATestResult, cleanup_ha_manager # Import du systĂšme de permissions from permissions_manager import PermissionsManager, PermissionType from permissions_middleware import permissions_middleware # Variables globales pour le serveur MCP mcp_server = None ha_client = None async def initialize_mcp_server(): """Initialise le serveur MCP et le client Home Assistant""" global mcp_server, ha_client try: # Importer les modules MCP from src.homeassistant_mcp_server.server import HomeAssistantClient from mcp.server import Server from dotenv import load_dotenv # Charger l'environnement load_dotenv() # Configuration HASS_URL = os.getenv("HASS_URL", "http://192.168.1.22:8123") HASS_TOKEN = os.getenv("HASS_TOKEN") if not HASS_TOKEN: raise ValueError("HASS_TOKEN environment variable required") # CrĂ©er le client Home Assistant ha_client = HomeAssistantClient(HASS_URL, HASS_TOKEN) # CrĂ©er le serveur MCP (pour l'instant une version simplifiĂ©e) mcp_server = Server("homeassistant-mcp-server") logging.info("✅ MCP Server initialized successfully") return True except Exception as e: logging.error(f"❌ Failed to initialize MCP server: {e}") return False class MockMCPServer: """Serveur MCP de test pour le dĂ©veloppement avec cache et circuit breaker""" async def list_tools(self): """Retourne une liste d'outils simulĂ©s avec mise en cache""" # ClĂ© de cache pour les outils cache_key = "mock_tools_list" # VĂ©rifier le cache d'abord cached_tools = await cache_manager.get_tools_cached(cache_key) if cached_tools is not None: logging.debug("🚀 Cache HIT: tools list retrieved from cache") return cached_tools # Simuler une rĂ©cupĂ©ration d'outils (avec circuit breaker) try: tools_data = await cache_manager.protected_call(self._fetch_tools_from_ha) # Mettre en cache pour 10 minutes await cache_manager.set_tools_cached(cache_key, tools_data, ttl=600.0) logging.debug("đŸ’Ÿ Tools list cached for 10 minutes") return tools_data except CircuitBreakerOpenError: logging.warning("⚠ Circuit breaker OPEN - returning cached fallback tools") # Retourner une version minimale en cas d'erreur fallback_tools = { "tools": [ { "name": "health_check", "description": "VĂ©rification de santĂ© (mode dĂ©gradĂ©)", "inputSchema": {"type": "object", "properties": {}} } ] } return fallback_tools except Exception as e: logging.error(f"❌ Error fetching tools: {e}") raise async def _fetch_tools_from_ha(self): """Simule la rĂ©cupĂ©ration des outils depuis Home Assistant""" # Simuler une latence rĂ©seau await asyncio.sleep(0.1) # Simuler parfois une erreur pour tester le circuit breaker import random if random.random() < 0.05: # 5% de chance d'erreur raise Exception("Simulated Home Assistant connection error") return { "tools": [ { "name": "get_entities", "description": "RĂ©cupĂšre la liste de toutes les entitĂ©s Home Assistant", "inputSchema": { "type": "object", "properties": { "domain": { "type": "string", "description": "Filtrer par domaine (optionnel)" } } } }, { "name": "call_service", "description": "Appelle un service Home Assistant", "inputSchema": { "type": "object", "properties": { "domain": {"type": "string"}, "service": {"type": "string"}, "entity_id": {"type": "string"}, "data": {"type": "object"} }, "required": ["domain", "service"] } }, { "name": "get_state", "description": "RĂ©cupĂšre l'Ă©tat d'une entitĂ©", "inputSchema": { "type": "object", "properties": { "entity_id": {"type": "string"} }, "required": ["entity_id"] } } ] } async def call_tool(self, name: str, args: Dict[str, Any]): """ExĂ©cute un outil simulĂ© avec cache et circuit breaker""" # ClĂ© de cache pour les rĂ©ponses (inclut le nom et les args) import hashlib args_str = json.dumps(args, sort_keys=True) cache_key = f"tool_response_{name}_{hashlib.md5(args_str.encode()).hexdigest()}" # VĂ©rifier le cache pour les rĂ©ponses en lecture seule if name in ["get_entities", "get_state"]: cached_response = await cache_manager.get_response_cached(cache_key) if cached_response is not None: logging.debug(f"🚀 Cache HIT: response for {name} retrieved from cache") return cached_response try: # ExĂ©cuter avec protection circuit breaker response = await cache_manager.protected_call(self._execute_tool, name, args) # Mettre en cache les rĂ©ponses en lecture (TTL plus court) if name in ["get_entities", "get_state"]: await cache_manager.set_response_cached(cache_key, response, ttl=60.0) logging.debug(f"đŸ’Ÿ Response for {name} cached for 1 minute") return response except CircuitBreakerOpenError: logging.warning(f"⚠ Circuit breaker OPEN - returning fallback for {name}") return { "content": [{ "type": "text", "text": f"⚠ Service temporairement indisponible - {name} en mode dĂ©gradĂ©" }], "isError": False } except Exception as e: logging.error(f"❌ Error executing tool {name}: {e}") raise async def _execute_tool(self, name: str, args: Dict[str, Any]): """ExĂ©cute l'outil (logique mĂ©tier)""" # Simuler une latence rĂ©seau await asyncio.sleep(0.05) # Simuler parfois une erreur pour tester le circuit breaker import random if random.random() < 0.03: # 3% de chance d'erreur raise Exception(f"Simulated error executing {name}") if name == "get_entities": domain = args.get("domain", "all") return { "content": [{ "type": "text", "text": f"🔧 Mock: RĂ©cupĂ©ration des entitĂ©s pour le domaine '{domain}'\n\nEntitĂ©s simulĂ©es:\n- light.salon_lamp (Ă©tat: off)\n- sensor.temperature (Ă©tat: 22.5°C)\n- switch.tv (Ă©tat: on)\n- sensor.humidity (Ă©tat: 45%)" }], "isError": False } elif name == "get_state": entity_id = args.get("entity_id", "unknown") return { "content": [{ "type": "text", "text": f"🔧 Mock: État de {entity_id}: {'on' if 'light' in entity_id else '22.5°C' if 'temperature' in entity_id else 'unknown'}" }], "isError": False } elif name == "call_service": domain = args.get("domain") service = args.get("service") entity_id = args.get("entity_id") return { "content": [{ "type": "text", "text": f"🔧 Mock: Service {domain}.{service} appelĂ© sur {entity_id} - ExĂ©cutĂ© avec succĂšs" }], "isError": False } else: return { "content": [{ "type": "text", "text": f"❌ Outil inconnu: {name}" }], "isError": True } # 📊 Models et Types class Priority(str, Enum): HIGH = "HIGH" MEDIUM = "MEDIUM" LOW = "LOW" BULK = "BULK" class RequestStatus(str, Enum): PENDING = "PENDING" PROCESSING = "PROCESSING" COMPLETED = "COMPLETED" FAILED = "FAILED" TIMEOUT = "TIMEOUT" @dataclass class QueuedRequest: """ReprĂ©sente une requĂȘte en file d'attente""" id: str session_id: str method: str params: Dict[str, Any] priority: Priority created_at: datetime timeout_seconds: int = 30 status: RequestStatus = RequestStatus.PENDING result: Optional[Any] = None error: Optional[Dict[str, Any]] = None @property def is_expired(self) -> bool: return datetime.now() > self.created_at + timedelta(seconds=self.timeout_seconds) @dataclass class MCPSession: """ReprĂ©sente une session MCP active""" id: str server: Any # MCP Server instance created_at: datetime last_used: datetime is_healthy: bool = True request_count: int = 0 @property def is_expired(self) -> bool: return datetime.now() > self.last_used + timedelta(minutes=30) # 🎯 Pydantic Models pour API class InitializeRequest(BaseModel): protocolVersion: str = "2024-11-05" capabilities: Dict[str, Any] = Field(default_factory=dict) clientInfo: Dict[str, str] = Field(default_factory=lambda: {"name": "http-client", "version": "1.0"}) session_id: Optional[str] = None class ToolCallRequest(BaseModel): jsonrpc: str = "2.0" id: Union[str, int] = 1 method: str = "tools/call" params: Dict[str, Any] class ToolListRequest(BaseModel): jsonrpc: str = "2.0" id: Union[str, int] = 2 method: str = "tools/list" params: Dict[str, Any] = Field(default_factory=dict) # 🔄 AsyncRequestQueue - Gestion des files d'attente class AsyncRequestQueue: def __init__(self, max_concurrent: int = 5): self.max_concurrent = max_concurrent self.queues = { Priority.HIGH: asyncio.Queue(), Priority.MEDIUM: asyncio.Queue(), Priority.LOW: asyncio.Queue(), Priority.BULK: asyncio.Queue() } self.processing: Dict[str, QueuedRequest] = {} self.completed: Dict[str, QueuedRequest] = {} self.stats = { "total_requests": 0, "completed_requests": 0, "failed_requests": 0, "avg_processing_time": 0.0 } self._processor_task: Optional[asyncio.Task] = None async def start(self): """DĂ©marre le processeur de queue""" if self._processor_task is None: self._processor_task = asyncio.create_task(self._process_queue()) logging.info("🔄 AsyncRequestQueue started") async def stop(self): """ArrĂȘte le processeur de queue""" if self._processor_task: self._processor_task.cancel() try: await self._processor_task except asyncio.CancelledError: pass self._processor_task = None logging.info("🛑 AsyncRequestQueue stopped") async def enqueue(self, request: QueuedRequest) -> str: """Ajoute une requĂȘte Ă  la queue""" await self.queues[request.priority].put(request) self.stats["total_requests"] += 1 logging.info(f"đŸ“„ Request {request.id} queued with priority {request.priority}") return request.id async def get_result(self, request_id: str, timeout: float = 30.0) -> QueuedRequest: """Attend et retourne le rĂ©sultat d'une requĂȘte""" start_time = time.time() while time.time() - start_time < timeout: # VĂ©rifier si completed if request_id in self.completed: return self.completed[request_id] # VĂ©rifier si en cours if request_id in self.processing: await asyncio.sleep(0.1) continue # VĂ©rifier dans les queues await asyncio.sleep(0.1) raise HTTPException(status_code=408, detail=f"Request {request_id} timeout") async def _process_queue(self): """Processeur principal de la queue""" while True: try: # Traiter par prioritĂ©: HIGH > MEDIUM > LOW > BULK for priority in [Priority.HIGH, Priority.MEDIUM, Priority.LOW, Priority.BULK]: if len(self.processing) >= self.max_concurrent: break try: request = self.queues[priority].get_nowait() if not request.is_expired: asyncio.create_task(self._execute_request(request)) else: logging.warning(f"⏰ Request {request.id} expired before processing") except asyncio.QueueEmpty: continue await asyncio.sleep(0.01) # Éviter CPU spinning except Exception as e: logging.error(f"❌ Queue processor error: {e}") await asyncio.sleep(1) async def _execute_request(self, request: QueuedRequest): """ExĂ©cute une requĂȘte MCP""" start_time = time.time() request.status = RequestStatus.PROCESSING self.processing[request.id] = request try: # RĂ©cupĂ©rer la session MCP session = session_pool.get_session(request.session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") # ExĂ©cuter la mĂ©thode MCP if request.method == "tools/list": result = await session.server.list_tools() elif request.method == "tools/call": tool_name = request.params.get("name") tool_args = request.params.get("arguments", {}) result = await session.server.call_tool(tool_name, tool_args) else: raise HTTPException(status_code=400, detail=f"Unknown method: {request.method}") # Stocker le rĂ©sultat request.result = result request.status = RequestStatus.COMPLETED session.last_used = datetime.now() session.request_count += 1 # Statistiques processing_time = time.time() - start_time self.stats["completed_requests"] += 1 self.stats["avg_processing_time"] = ( (self.stats["avg_processing_time"] * (self.stats["completed_requests"] - 1) + processing_time) / self.stats["completed_requests"] ) logging.info(f"✅ Request {request.id} completed in {processing_time:.3f}s") except Exception as e: request.error = { "code": -32603, "message": str(e), "data": {"request_id": request.id} } request.status = RequestStatus.FAILED self.stats["failed_requests"] += 1 logging.error(f"❌ Request {request.id} failed: {e}") finally: # DĂ©placer vers completed et nettoyer processing self.completed[request.id] = request if request.id in self.processing: del self.processing[request.id] # Cleanup ancien completed (garder derniĂšres 1000) if len(self.completed) > 1000: old_ids = sorted(self.completed.keys())[:500] for old_id in old_ids: del self.completed[old_id] def get_status(self) -> Dict[str, Any]: """Retourne le statut de la queue""" total_queued = sum(q.qsize() for q in self.queues.values()) return { "pending": total_queued, "processing": len(self.processing), "by_priority": {p.value: self.queues[p].qsize() for p in Priority}, "stats": self.stats, "max_concurrent": self.max_concurrent } @property def size(self) -> int: """Retourne la taille totale de la queue""" return sum(q.qsize() for q in self.queues.values()) + len(self.processing) def get_stats(self) -> Dict[str, Any]: """Retourne les statistiques dĂ©taillĂ©es de la queue""" total_queued = sum(q.qsize() for q in self.queues.values()) success_rate = 0 if self.stats["total_requests"] > 0: success_rate = (self.stats["completed_requests"] / self.stats["total_requests"]) * 100 return { "total_queued": total_queued, "processing_count": len(self.processing), "completed_count": len(self.completed), "queue_by_priority": {p.value: self.queues[p].qsize() for p in Priority}, "performance": { "success_rate_percent": round(success_rate, 2), "avg_processing_time_ms": round(self.stats["avg_processing_time"] * 1000, 2), "total_requests": self.stats["total_requests"], "completed_requests": self.stats["completed_requests"], "failed_requests": self.stats["failed_requests"] }, "capacity": { "max_concurrent": self.max_concurrent, "current_load_percent": round((len(self.processing) / self.max_concurrent) * 100, 2) } } # 🏊 MCPSessionPool - Gestion du pool de sessions class MCPSessionPool: def __init__(self, max_sessions: int = 10): self.max_sessions = max_sessions self.sessions: Dict[str, MCPSession] = {} self._cleanup_task: Optional[asyncio.Task] = None async def start(self): """DĂ©marre le nettoyage automatique des sessions""" if self._cleanup_task is None: self._cleanup_task = asyncio.create_task(self._cleanup_sessions()) logging.info("🏊 MCPSessionPool started") async def stop(self): """ArrĂȘte le pool de sessions""" if self._cleanup_task: self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass self._cleanup_task = None # Fermer toutes les sessions for session in self.sessions.values(): try: await session.server.close() except: pass self.sessions.clear() logging.info("🛑 MCPSessionPool stopped") async def create_session(self, session_id: Optional[str] = None) -> MCPSession: """CrĂ©e une nouvelle session MCP""" if len(self.sessions) >= self.max_sessions: # Nettoyer les sessions expirĂ©es ou inactives await self._cleanup_expired() if len(self.sessions) >= self.max_sessions: raise HTTPException(status_code=503, detail="No MCP sessions available") if not session_id: session_id = str(uuid.uuid4()) # CrĂ©er le serveur MCP if mcp_server is None: await initialize_mcp_server() # Utiliser le mock server pour les tests mock_server = MockMCPServer() session = MCPSession( id=session_id, server=mock_server, # Utiliser le mock pour les tests created_at=datetime.now(), last_used=datetime.now() ) self.sessions[session_id] = session logging.info(f"🆕 Created MCP session {session_id}") return session def get_session(self, session_id: str) -> Optional[MCPSession]: """RĂ©cupĂšre une session existante""" return self.sessions.get(session_id) async def _cleanup_sessions(self): """Nettoyage pĂ©riodique des sessions expirĂ©es""" while True: try: await self._cleanup_expired() await asyncio.sleep(60) # Cleanup toutes les minutes except Exception as e: logging.error(f"❌ Session cleanup error: {e}") await asyncio.sleep(60) async def _cleanup_expired(self): """Supprime les sessions expirĂ©es""" expired_sessions = [ session_id for session_id, session in self.sessions.items() if session.is_expired ] for session_id in expired_sessions: session = self.sessions[session_id] try: await session.server.close() except: pass del self.sessions[session_id] logging.info(f"đŸ—‘ïž Cleaned up expired session {session_id}") def get_status(self) -> Dict[str, Any]: """Retourne le statut du pool""" healthy_count = sum(1 for s in self.sessions.values() if s.is_healthy) return { "total": len(self.sessions), "healthy": healthy_count, "max_sessions": self.max_sessions, "by_status": { "healthy": healthy_count, "unhealthy": len(self.sessions) - healthy_count } } def get_active_sessions(self) -> Dict[str, MCPSession]: """Retourne les sessions actives (non expirĂ©es)""" return { session_id: session for session_id, session in self.sessions.items() if not session.is_expired and session.is_healthy } # 🌐 Instances globales request_queue = AsyncRequestQueue(max_concurrent=5) session_pool = MCPSessionPool(max_sessions=10) # 🚀 Lifecycle Management @asynccontextmanager async def lifespan(app: FastAPI): # Startup logging.info("🚀 Starting HTTP-MCP Bridge Server...") # Enregistrer l'heure de dĂ©marrage app.state.start_time = datetime.now() # Initialiser la base de donnĂ©es await setup_database() # Initialiser le systĂšme d'authentification await auth_manager.initialize() logging.info("🔐 Authentication system initialized") # Initialiser le gestionnaire de configuration Home Assistant await ha_config_manager.initialize() logging.info("🏠 Home Assistant config manager initialized") # DĂ©marrer les composants await request_queue.start() await session_pool.start() # DĂ©marrer la tĂąche de nettoyage automatique de la BDD cleanup_db_task = asyncio.create_task(cleanup_old_data_task()) # DĂ©marrer la tĂąche de nettoyage du cache (toutes les 5 minutes) async def cache_cleanup_task(): while True: try: await asyncio.sleep(300) # 5 minutes cleaned = await cache_manager.cleanup() if cleaned['total_cleaned'] > 0: logging.info(f"đŸ§č Cache cleanup: {cleaned['total_cleaned']} expired entries removed") except asyncio.CancelledError: break except Exception as e: logging.error(f"❌ Cache cleanup error: {e}") cleanup_cache_task = asyncio.create_task(cache_cleanup_task()) logging.info("đŸ§č Cache cleanup task started (every 5 minutes)") yield # Shutdown logging.info("🛑 Shutting down HTTP-MCP Bridge Server...") # ArrĂȘter les tĂąches de nettoyage cleanup_db_task.cancel() cleanup_cache_task.cancel() try: await cleanup_db_task await cleanup_cache_task except asyncio.CancelledError: pass # ArrĂȘter les composants await request_queue.stop() await session_pool.stop() # Nettoyer le gestionnaire HA await cleanup_ha_manager() # Fermer la base de donnĂ©es await db_manager.close() # 🌐 FastAPI App app = FastAPI( title="HTTP-MCP Bridge", description="Bridge HTTP REST API to MCP Protocol with queue management", version="1.0.0", lifespan=lifespan ) # Configuration des templates et fichiers statiques templates = Jinja2Templates(directory="web/templates") # Monter les fichiers statiques app.mount("/static", StaticFiles(directory="web/static"), name="static") # Route spĂ©cifique pour favicon @app.get("/favicon.ico") async def favicon(): """Retourne le favicon du site""" return FileResponse("web/static/favicon.svg", media_type="image/svg+xml") # CORS Middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 📝 Logging setup avec rotation journaliĂšre class DatabaseLogHandler(logging.Handler): """Handler personnalisĂ© pour envoyer les logs vers la base de donnĂ©es""" def emit(self, record): try: # CrĂ©er l'entrĂ©e de log log_entry = LogEntry( timestamp=datetime.fromtimestamp(record.created).isoformat(), level=record.levelname, message=record.getMessage(), module=record.name, extra_data=json.dumps({ "filename": record.filename, "lineno": record.lineno, "funcName": record.funcName }) if hasattr(record, 'filename') else None ) # InsĂ©rer de maniĂšre asynchrone (dans un thread sĂ©parĂ© pour Ă©viter les blocages) asyncio.create_task(db_manager.insert_log(log_entry)) except Exception: # Éviter les boucles infinies en cas d'erreur du logger pass # Configuration du logging def setup_logging(): """Configure le systĂšme de logging avec rotation et base de donnĂ©es""" # Logger principal logger = logging.getLogger() logger.setLevel(logging.INFO) # Formateur formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Handler pour fichier journalier file_handler = logging.FileHandler( log_manager.get_current_log_file(), mode='a', encoding='utf-8' ) file_handler.setFormatter(formatter) file_handler.setLevel(logging.INFO) # Handler pour console console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) console_handler.setLevel(logging.INFO) # Handler pour base de donnĂ©es db_handler = DatabaseLogHandler() db_handler.setLevel(logging.WARNING) # Seulement les warnings et erreurs en BDD # Ajouter les handlers logger.addHandler(file_handler) logger.addHandler(console_handler) logger.addHandler(db_handler) return logger # Initialiser le logging logger = setup_logging() # đŸ› ïž Dependencies async def get_session_id(x_session_id: Optional[str] = Header(None)) -> Optional[str]: return x_session_id async def get_priority(x_priority: Optional[str] = Header("MEDIUM")) -> Priority: try: return Priority(x_priority.upper()) except (ValueError, AttributeError): return Priority.MEDIUM async def get_timeout(x_timeout: Optional[int] = Header(30)) -> int: return max(1, min(x_timeout or 30, 300)) # Limite entre 1 et 300 secondes async def log_request(request: Request, response_time_ms: int, status_code: int, session_id: Optional[str] = None): """Log une requĂȘte utilisateur dans la base de donnĂ©es""" try: request_entry = RequestEntry( timestamp=datetime.now().isoformat(), session_id=session_id or "anonymous", method=request.method, endpoint=str(request.url.path), params=json.dumps(dict(request.query_params)) if request.query_params else "{}", response_time_ms=response_time_ms, status_code=status_code, user_ip=request.client.host if request.client else None, user_agent=request.headers.get("user-agent") ) await db_manager.insert_request(request_entry) except Exception as e: logger.warning(f"⚠ Erreur log requĂȘte: {e}") async def log_error(error_type: str, error_message: str, stack_trace: str = None, session_id: str = None, context: Dict = None): """Log une erreur dans la base de donnĂ©es""" try: error_entry = ErrorEntry( timestamp=datetime.now().isoformat(), error_type=error_type, error_message=error_message, stack_trace=stack_trace, session_id=session_id, context=json.dumps(context) if context else None ) await db_manager.insert_error(error_entry) except Exception as e: logger.warning(f"⚠ Erreur log erreur: {e}") # 🌐 Routes API @app.post("/mcp/initialize") async def initialize_session(request_data: InitializeRequest, request: Request): """Initialise une nouvelle session MCP""" start_time = time.time() status_code = 200 try: session = await session_pool.create_session(request_data.session_id) response_data = { "jsonrpc": "2.0", "id": 1, "result": { "protocolVersion": request_data.protocolVersion, "capabilities": { "tools": {}, "resources": {}, "prompts": {} }, "serverInfo": { "name": "homeassistant-mcp-server", "version": "1.0.0" }, "session_id": session.id, "expires_at": (session.created_at + timedelta(hours=1)).isoformat() }, "bridge_info": { "queue_position": 0, "estimated_wait_ms": 0 } } # Log de la requĂȘte response_time_ms = int((time.time() - start_time) * 1000) await log_request(request, response_time_ms, status_code, session.id) return JSONResponse(response_data) except HTTPException as e: status_code = e.status_code await log_error("HTTPException", str(e.detail), session_id=None, context={"endpoint": "/mcp/initialize"}) raise except Exception as e: status_code = 500 logger.error(f"Initialize error: {e}") await log_error("InternalError", str(e), session_id=None, context={"endpoint": "/mcp/initialize"}) raise HTTPException(status_code=500, detail=str(e)) finally: if status_code != 200: response_time_ms = int((time.time() - start_time) * 1000) await log_request(request, response_time_ms, status_code) @app.post("/mcp/tools/list") async def list_tools( request: ToolListRequest, session_id: str = Depends(get_session_id), priority: Priority = Depends(get_priority), timeout: int = Depends(get_timeout) ): """Liste tous les outils MCP disponibles""" if not session_id: raise HTTPException(status_code=400, detail="X-Session-ID header required") # CrĂ©er la requĂȘte en queue queued_request = QueuedRequest( id=str(uuid.uuid4()), session_id=session_id, method="tools/list", params=request.params, priority=priority, created_at=datetime.now(), timeout_seconds=timeout ) # Ajouter Ă  la queue await request_queue.enqueue(queued_request) # Attendre le rĂ©sultat result = await request_queue.get_result(queued_request.id, timeout) if result.status == RequestStatus.FAILED: raise HTTPException(status_code=500, detail=result.error) return JSONResponse({ "jsonrpc": "2.0", "id": request.id, "result": result.result, "bridge_info": { "cached": False, "execution_time_ms": int((datetime.now() - result.created_at).total_seconds() * 1000) } }) @app.post("/mcp/tools/call") async def call_tool( request_data: ToolCallRequest, request: Request, session_id: str = Depends(get_session_id), priority: Priority = Depends(get_priority), timeout: int = Depends(get_timeout) ): """ExĂ©cute un outil MCP spĂ©cifique""" start_time = time.time() status_code = 200 try: if not session_id: raise HTTPException(status_code=400, detail="X-Session-ID header required") # CrĂ©er la requĂȘte en queue queued_request = QueuedRequest( id=str(uuid.uuid4()), session_id=session_id, method="tools/call", params=request_data.params, priority=priority, created_at=datetime.now(), timeout_seconds=timeout ) # Ajouter Ă  la queue await request_queue.enqueue(queued_request) # Attendre le rĂ©sultat result = await request_queue.get_result(queued_request.id, timeout) if result.status == RequestStatus.FAILED: status_code = 500 await log_error("ToolExecutionError", str(result.error), session_id=session_id, context={"tool": request_data.params.get("name"), "params": request_data.params}) raise HTTPException(status_code=500, detail=result.error) response_data = { "jsonrpc": "2.0", "id": request_data.id, "result": result.result, "bridge_info": { "execution_time_ms": int((datetime.now() - result.created_at).total_seconds() * 1000), "session_id": session_id, "cached": False } } # Log de la requĂȘte response_time_ms = int((time.time() - start_time) * 1000) await log_request(request, response_time_ms, status_code, session_id) return JSONResponse(response_data) except HTTPException as e: status_code = e.status_code await log_error("HTTPException", str(e.detail), session_id=session_id, context={"endpoint": "/mcp/tools/call"}) raise except Exception as e: status_code = 500 await log_error("InternalError", str(e), session_id=session_id, context={"endpoint": "/mcp/tools/call"}) raise HTTPException(status_code=500, detail=str(e)) finally: if status_code != 200: response_time_ms = int((time.time() - start_time) * 1000) await log_request(request, response_time_ms, status_code, session_id) @app.get("/mcp/status") async def bridge_status(): """Statut complet du bridge""" return JSONResponse({ "bridge": { "status": "healthy", "version": "1.0.0", "started_at": datetime.now().isoformat() }, "sessions": session_pool.get_status(), "queue": request_queue.get_status(), "home_assistant": { "url": "http://192.168.1.22:8123", "status": "connected" } }) # 🔐 Authentication Dependencies security = HTTPBearer(auto_error=False) # Ne pas lever d'erreur automatiquement async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> UserResponse: """DĂ©pendance pour obtenir l'utilisateur actuel depuis le token JWT""" try: # VĂ©rifier si les credentials sont fournis if not credentials: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) token = credentials.credentials token_data = auth_manager.verify_token(token) if token_data is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) user = await auth_manager.get_user_by_id(token_data.user_id) if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", headers={"WWW-Authenticate": "Bearer"}, ) return user except HTTPException: raise except Exception as e: logger.error(f"❌ Authentication error: {e}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication failed", headers={"WWW-Authenticate": "Bearer"}, ) async def get_current_admin_user(current_user: UserResponse = Depends(get_current_user)) -> UserResponse: """DĂ©pendance pour vĂ©rifier que l'utilisateur est admin""" if current_user.role != UserRole.ADMIN: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required" ) return current_user def get_client_ip(request: Request) -> str: """RĂ©cupĂšre l'IP du client""" forwarded = request.headers.get("X-Forwarded-For") if forwarded: return forwarded.split(",")[0].strip() return request.client.host if request.client else "unknown" # 🌐 Endpoints API @app.get("/health") async def health_check(): """Health check simple""" return JSONResponse({ "status": "healthy", "timestamp": datetime.now().isoformat() }) # 🔐 Authentication Endpoints @app.post("/auth/register", response_model=UserResponse) async def register_user(user_data: UserCreate, request: Request): """Inscription d'un nouvel utilisateur""" try: ip_address = get_client_ip(request) logger.info(f"🔐 User registration attempt: {user_data.username} from {ip_address}") # CrĂ©er l'utilisateur user = await auth_manager.create_user(user_data) # Log pour audit await log_request(request, 0, 201) return user except HTTPException: raise except Exception as e: logger.error(f"❌ Registration error: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Registration failed" ) @app.post("/auth/login", response_model=TokenResponse) async def login_user(login_data: UserLogin, request: Request): """Connexion utilisateur""" try: ip_address = get_client_ip(request) user_agent = request.headers.get("User-Agent") logger.info(f"🔐 Login attempt: {login_data.username} from {ip_address}") # Authentifier l'utilisateur user = await auth_manager.authenticate_user( login_data.username, login_data.password, user_agent, ip_address ) if not user: # Log tentative Ă©chouĂ©e await log_error( error_type="AUTH_FAILED", error_message=f"Failed login for {login_data.username}", context={"endpoint": "/auth/login", "user_id": None} ) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # CrĂ©er la session token_response = await auth_manager.create_user_session(user, user_agent, ip_address) # Log connexion rĂ©ussie logger.info(f"📊 Login successful: {user.username} from {ip_address}") logger.info(f"✅ User logged in successfully: {user.username}") return token_response except HTTPException: raise except Exception as e: logger.error(f"❌ Login error: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Login failed" ) @app.post("/auth/refresh", response_model=TokenResponse) async def refresh_access_token(refresh_request: RefreshRequest, request: Request): """RafraĂźchit un token d'accĂšs""" try: ip_address = get_client_ip(request) token_response = await auth_manager.refresh_token(refresh_request.refresh_token) if not token_response: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token", headers={"WWW-Authenticate": "Bearer"}, ) # Log refresh rĂ©ussi await log_request(request, 0, 200) return token_response except HTTPException: raise except Exception as e: logger.error(f"❌ Token refresh error: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Token refresh failed" ) @app.post("/auth/logout") async def logout_user(current_user: UserResponse = Depends(get_current_user), credentials: HTTPAuthorizationCredentials = Depends(security), request: Request = None): """DĂ©connexion utilisateur""" try: ip_address = get_client_ip(request) if request else "unknown" # RĂ©voquer la session success = await auth_manager.revoke_session(credentials.credentials) # Log dĂ©connexion await log_request(request, 0, 200) logger.info(f"✅ User logged out: {current_user.username}") return JSONResponse({ "status": "success", "message": "Logged out successfully" }) except Exception as e: logger.error(f"❌ Logout error: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Logout failed" ) @app.get("/auth/me", response_model=UserResponse) async def get_current_user_info(current_user: UserResponse = Depends(get_current_user)): """RĂ©cupĂšre les informations de l'utilisateur connectĂ©""" return current_user @app.get("/auth/sessions") async def get_user_sessions(current_user: UserResponse = Depends(get_current_user)): """RĂ©cupĂšre les sessions actives de l'utilisateur""" try: sessions = await auth_manager.get_active_sessions(current_user.id) return JSONResponse({ "status": "success", "sessions": sessions }) except Exception as e: logger.error(f"❌ Failed to get user sessions: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve sessions" ) # =================================== # ENDPOINTS CONFIGURATION HOME ASSISTANT # =================================== @app.post("/config/homeassistant", response_model=HAConfigResponse) async def create_ha_config( config_data: HAConfigCreate, current_user: UserResponse = Depends(get_current_user) ): """CrĂ©e une nouvelle configuration Home Assistant""" try: # CrĂ©er la configuration avec test automatique ha_config = await ha_config_manager.create_config(current_user.id, config_data) # Retourner la rĂ©ponse (sans le token) return HAConfigResponse( config_id=1, # Sera mis Ă  jour avec la vraie valeur name=ha_config.name, url=ha_config.url, is_active=ha_config.is_active, last_test=ha_config.last_test, last_status=ha_config.last_status.value, created_at=ha_config.created_at, updated_at=ha_config.updated_at ) except Exception as e: logger.error(f"❌ Erreur crĂ©ation config HA: {e}") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Failed to create Home Assistant configuration: {str(e)}" ) @app.get("/config/homeassistant") async def list_ha_configs( current_user: UserResponse = Depends(get_current_user) ): """Liste toutes les configurations Home Assistant de l'utilisateur""" try: configs = await ha_config_manager.list_configs(current_user.id) return JSONResponse({ "status": "success", "configs": [config.dict() for config in configs] }) except Exception as e: logger.error(f"❌ Erreur liste configs HA: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve Home Assistant configurations" ) @app.get("/config/homeassistant/{config_id}", response_model=HAConfigResponse) async def get_ha_config( config_id: int, current_user: UserResponse = Depends(get_current_user) ): """RĂ©cupĂšre une configuration Home Assistant spĂ©cifique""" try: ha_config = await ha_config_manager.get_config(current_user.id, config_id) if not ha_config: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Home Assistant configuration not found" ) return HAConfigResponse( config_id=config_id, name=ha_config.name, url=ha_config.url, is_active=ha_config.is_active, last_test=ha_config.last_test, last_status=ha_config.last_status.value, created_at=ha_config.created_at, updated_at=ha_config.updated_at ) except HTTPException: raise except Exception as e: logger.error(f"❌ Erreur rĂ©cupĂ©ration config HA: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve Home Assistant configuration" ) @app.put("/config/homeassistant/{config_id}", response_model=HAConfigResponse) async def update_ha_config( config_id: int, update_data: HAConfigUpdate, current_user: UserResponse = Depends(get_current_user) ): """Met Ă  jour une configuration Home Assistant""" try: ha_config = await ha_config_manager.update_config(current_user.id, config_id, update_data) if not ha_config: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Home Assistant configuration not found" ) return HAConfigResponse( config_id=config_id, name=ha_config.name, url=ha_config.url, is_active=ha_config.is_active, last_test=ha_config.last_test, last_status=ha_config.last_status.value, created_at=ha_config.created_at, updated_at=ha_config.updated_at ) except HTTPException: raise except Exception as e: logger.error(f"❌ Erreur mise Ă  jour config HA: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update Home Assistant configuration" ) @app.delete("/config/homeassistant/{config_id}") async def delete_ha_config( config_id: int, current_user: UserResponse = Depends(get_current_user) ): """Supprime une configuration Home Assistant""" try: success = await ha_config_manager.delete_config(current_user.id, config_id) if not success: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Home Assistant configuration not found" ) return JSONResponse({ "status": "success", "message": "Home Assistant configuration deleted successfully" }) except HTTPException: raise except Exception as e: logger.error(f"❌ Erreur suppression config HA: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete Home Assistant configuration" ) @app.post("/config/homeassistant/{config_id}/test", response_model=HATestResult) async def test_ha_config( config_id: int, current_user: UserResponse = Depends(get_current_user) ): """Teste la connexion Ă  Home Assistant""" try: # RĂ©cupĂ©rer la configuration ha_config = await ha_config_manager.get_config(current_user.id, config_id) if not ha_config: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Home Assistant configuration not found" ) # RĂ©cupĂ©rer le token dĂ©chiffrĂ© token = await ha_config_manager.get_decrypted_token(current_user.id, config_id) if not token: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to decrypt token" ) # Tester la connexion test_result = await ha_config_manager.test_ha_connection(ha_config.url, token) # Mettre Ă  jour le statut en base await ha_config_manager.update_config( current_user.id, config_id, HAConfigUpdate() # DĂ©clenche la mise Ă  jour du last_test ) return test_result except HTTPException: raise except Exception as e: logger.error(f"❌ Erreur test config HA: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to test Home Assistant connection: {str(e)}" ) @app.post("/config/homeassistant/test", response_model=HATestResult) async def test_ha_connection_direct( config_data: HAConfigCreate, current_user: UserResponse = Depends(get_current_user) ): """Teste une connexion Home Assistant sans sauvegarder""" try: # Tester la connexion directement test_result = await ha_config_manager.test_ha_connection(config_data.url, config_data.token) return test_result except Exception as e: logger.error(f"❌ Erreur test direct HA: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to test Home Assistant connection: {str(e)}" ) # =================================== # ENDPOINTS ADMINISTRATION # =================================== @app.get("/admin/stats") async def get_statistics(days: int = 7): """RĂ©cupĂšre les statistiques dĂ©taillĂ©es du bridge""" try: stats = await db_manager.get_stats(days=days) return JSONResponse({ "status": "success", "data": stats }) except Exception as e: logger.error(f"Erreur rĂ©cupĂ©ration stats: {e}") return JSONResponse({ "status": "error", "message": str(e) }, status_code=500) @app.get("/admin/metrics") async def get_metrics(): """RĂ©cupĂšre les mĂ©triques du cache et du circuit breaker""" try: # Nettoyer les caches expirĂ©s cleanup_stats = await cache_manager.cleanup() # RĂ©cupĂ©rer toutes les mĂ©triques metrics = cache_manager.get_metrics() # Ajouter les stats de nettoyage metrics['last_cleanup'] = cleanup_stats # Ajouter les mĂ©triques de session session_stats = { 'active_sessions': len(session_pool.sessions), 'total_requests_processed': session_pool.total_requests, 'queue_size': request_queue.size, 'queue_stats': request_queue.get_stats() } metrics['session_management'] = session_stats return JSONResponse({ "status": "success", "timestamp": datetime.utcnow().isoformat(), "metrics": metrics }) except Exception as e: logger.error(f"Erreur rĂ©cupĂ©ration mĂ©triques: {e}") return JSONResponse({ "status": "error", "message": str(e) }, status_code=500) @app.post("/admin/cache/clear") async def clear_cache(): """Vide tous les caches""" try: await cache_manager.clear_all_caches() return JSONResponse({ "status": "success", "message": "All caches cleared successfully" }) except Exception as e: logger.error(f"Erreur vidage cache: {e}") return JSONResponse({ "status": "error", "message": str(e) }, status_code=500) @app.post("/admin/cleanup") async def manual_cleanup(days_to_keep: int = 30): """Lance un nettoyage manuel des donnĂ©es anciennes""" try: result = await db_manager.cleanup_old_data(days_to_keep=days_to_keep) return JSONResponse({ "status": "success", "data": result }) except Exception as e: logger.error(f"Erreur nettoyage manuel: {e}") return JSONResponse({ "status": "error", "message": str(e) }, status_code=500) # ================================ # 🔐 PERMISSIONS ENDPOINTS # ================================ # Models pour les permissions class PermissionRequest(BaseModel): tool_name: str = Field(..., description="Nom de l'outil MCP") permission_type: str = Field(..., description="Type de permission (READ/WRITE/EXECUTE)") class BulkPermissionRequest(BaseModel): permissions: List[PermissionRequest] = Field(..., description="Liste des permissions Ă  vĂ©rifier") class UserPermissionUpdate(BaseModel): tool_name: str = Field(..., description="Nom de l'outil MCP") can_read: bool = Field(default=False, description="Permission de lecture") can_write: bool = Field(default=False, description="Permission d'Ă©criture") can_execute: bool = Field(default=False, description="Permission d'exĂ©cution") class BulkUserPermissionUpdate(BaseModel): permissions: List[UserPermissionUpdate] = Field(..., description="Liste des permissions Ă  mettre Ă  jour") class DefaultPermissionUpdate(BaseModel): tool_name: str = Field(..., description="Nom de l'outil MCP") can_read: bool = Field(default=False, description="Permission de lecture par dĂ©faut") can_write: bool = Field(default=False, description="Permission d'Ă©criture par dĂ©faut") can_execute: bool = Field(default=False, description="Permission d'exĂ©cution par dĂ©faut") @app.post("/permissions/validate") async def validate_permission( request: PermissionRequest, credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer()) ): """Valide une permission spĂ©cifique pour l'utilisateur courant""" try: # Valider le type de permission try: permission_type = PermissionType(request.permission_type.upper()) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Type de permission invalide: {request.permission_type}" ) # Valider la permission validation_result = await permissions_middleware.validate_mcp_permission( request=None, # Pas besoin de request object ici tool_name=request.tool_name, permission_type=permission_type, credentials=credentials ) return JSONResponse({ "status": "success", "granted": True, "tool_name": request.tool_name, "permission_type": request.permission_type, "user_id": validation_result['user_id'], "timestamp": validation_result['timestamp'] }) except HTTPException as he: return JSONResponse({ "status": "denied", "granted": False, "tool_name": request.tool_name, "permission_type": request.permission_type, "reason": he.detail }, status_code=he.status_code) except Exception as e: logger.error(f"Erreur validation permission: {e}") return JSONResponse({ "status": "error", "message": str(e) }, status_code=500) @app.post("/permissions/validate/bulk") async def validate_bulk_permissions( request: BulkPermissionRequest, credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer()) ): """Valide plusieurs permissions en une seule fois""" try: # Convertir en format attendu par le middleware tool_permissions = [] for perm in request.permissions: tool_permissions.append({ 'tool_name': perm.tool_name, 'permission_type': perm.permission_type }) # Valider toutes les permissions validation_result = await permissions_middleware.validate_bulk_permissions( request=None, # Pas besoin de request object ici tool_permissions=tool_permissions, credentials=credentials ) return JSONResponse({ "status": "success", "all_granted": True, "user_id": validation_result['user_id'], "results": validation_result['results'], "timestamp": validation_result['timestamp'] }) except HTTPException as he: return JSONResponse({ "status": "denied", "all_granted": False, "reason": he.detail, "results": [] }, status_code=he.status_code) except Exception as e: logger.error(f"Erreur validation permissions bulk: {e}") return JSONResponse({ "status": "error", "message": str(e) }, status_code=500) @app.get("/permissions/me") async def get_my_permissions( credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer()) ): """Obtient toutes les permissions de l'utilisateur courant""" try: # Obtenir l'utilisateur depuis le token from permissions_middleware import get_current_user_from_token user_data = await get_current_user_from_token(credentials.credentials) if not user_data: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide" ) user_id = user_data.get('user_id') if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User ID invalide" ) # Obtenir le rĂ©sumĂ© des permissions summary = await permissions_middleware.get_user_permissions_summary(user_id) return JSONResponse({ "status": "success", "data": summary }) except HTTPException: raise except Exception as e: logger.error(f"Erreur obtention permissions: {e}") return JSONResponse({ "status": "error", "message": str(e) }, status_code=500) @app.get("/permissions/user/{user_id}") async def get_user_permissions( user_id: int, current_user: UserResponse = Depends(get_current_admin_user) ): """Obtient toutes les permissions d'un utilisateur spĂ©cifique (admin uniquement)""" try: # Obtenir le rĂ©sumĂ© des permissions summary = await permissions_middleware.get_user_permissions_summary(user_id) return JSONResponse({ "status": "success", "data": summary }) except Exception as e: logger.error(f"Erreur obtention permissions utilisateur {user_id}: {e}") return JSONResponse({ "status": "error", "message": str(e) }, status_code=500) @app.put("/permissions/user/{user_id}") async def update_user_permissions( user_id: int, request: UserPermissionUpdate, current_user: UserResponse = Depends(get_current_admin_user) ): """Met Ă  jour les permissions d'un utilisateur spĂ©cifique (admin uniquement)""" try: permissions_manager = PermissionsManager() # Mettre Ă  jour les permissions success = await permissions_manager.set_user_permission( user_id=user_id, tool_name=request.tool_name, can_read=request.can_read, can_write=request.can_write, can_execute=request.can_execute ) if not success: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Impossible de mettre Ă  jour les permissions" ) return JSONResponse({ "status": "success", "message": f"Permissions mises Ă  jour pour l'utilisateur {user_id}", "tool_name": request.tool_name, "permissions": { "can_read": request.can_read, "can_write": request.can_write, "can_execute": request.can_execute } }) except HTTPException: raise except Exception as e: logger.error(f"Erreur mise Ă  jour permissions: {e}") return JSONResponse({ "status": "error", "message": str(e) }, status_code=500) @app.put("/permissions/user/{user_id}/bulk") async def update_user_permissions_bulk( user_id: int, request: BulkUserPermissionUpdate, current_user: UserResponse = Depends(get_current_admin_user) ): """Met Ă  jour plusieurs permissions d'un utilisateur en une fois (admin uniquement)""" try: permissions_manager = PermissionsManager() # PrĂ©parer les donnĂ©es pour la mise Ă  jour en lot permissions_data = [] for perm in request.permissions: permissions_data.append({ 'tool_name': perm.tool_name, 'can_read': perm.can_read, 'can_write': perm.can_write, 'can_execute': perm.can_execute }) # Effectuer la mise Ă  jour en lot results = await permissions_manager.bulk_update_user_permissions( user_id=user_id, permissions_data=permissions_data ) return JSONResponse({ "status": "success", "message": f"Permissions mises Ă  jour en lot pour l'utilisateur {user_id}", "updated_count": len(results), "results": results }) except Exception as e: logger.error(f"Erreur mise Ă  jour permissions bulk: {e}") return JSONResponse({ "status": "error", "message": str(e) }, status_code=500) @app.delete("/permissions/user/{user_id}/tool/{tool_name}") async def delete_user_permission( user_id: int, tool_name: str, current_user: UserResponse = Depends(get_current_admin_user) ): """Supprime les permissions d'un utilisateur pour un outil spĂ©cifique (admin uniquement)""" try: permissions_manager = PermissionsManager() # Supprimer les permissions (revient aux permissions par dĂ©faut) success = await permissions_manager.remove_user_permission( user_id=user_id, tool_name=tool_name ) if not success: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Aucune permission trouvĂ©e pour l'utilisateur {user_id} et l'outil {tool_name}" ) return JSONResponse({ "status": "success", "message": f"Permissions supprimĂ©es pour l'utilisateur {user_id} et l'outil {tool_name}" }) except HTTPException: raise except Exception as e: logger.error(f"Erreur suppression permission: {e}") return JSONResponse({ "status": "error", "message": str(e) }, status_code=500) @app.get("/permissions/defaults") async def get_default_permissions( current_user: UserResponse = Depends(get_current_admin_user) ): """Obtient toutes les permissions par dĂ©faut (admin uniquement)""" try: permissions_manager = PermissionsManager() # Obtenir les permissions par dĂ©faut defaults = await permissions_manager.get_default_permissions() # Organiser par outil tools_defaults = {} for perm in defaults: tools_defaults[perm.tool_name] = { 'can_read': perm.can_read, 'can_write': perm.can_write, 'can_execute': perm.can_execute } return JSONResponse({ "status": "success", "data": { "total_tools": len(tools_defaults), "tools": tools_defaults } }) except Exception as e: logger.error(f"Erreur obtention permissions par dĂ©faut: {e}") return JSONResponse({ "status": "error", "message": str(e) }, status_code=500) @app.put("/permissions/defaults") async def update_default_permission( request: DefaultPermissionUpdate, current_user: UserResponse = Depends(get_current_admin_user) ): """Met Ă  jour les permissions par dĂ©faut pour un outil (admin uniquement)""" try: permissions_manager = PermissionsManager() # Mettre Ă  jour les permissions par dĂ©faut success = await permissions_manager.set_default_permission( tool_name=request.tool_name, can_read=request.can_read, can_write=request.can_write, can_execute=request.can_execute ) if not success: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Impossible de mettre Ă  jour les permissions par dĂ©faut" ) return JSONResponse({ "status": "success", "message": f"Permissions par dĂ©faut mises Ă  jour pour l'outil {request.tool_name}", "tool_name": request.tool_name, "default_permissions": { "can_read": request.can_read, "can_write": request.can_write, "can_execute": request.can_execute } }) except HTTPException: raise except Exception as e: logger.error(f"Erreur mise Ă  jour permissions par dĂ©faut: {e}") return JSONResponse({ "status": "error", "message": str(e) }, status_code=500) @app.get("/admin/logs/rotate") async def manual_log_rotation(): """Force la rotation des logs manuellement""" try: await log_manager.rotate_logs_if_needed() return JSONResponse({ "status": "success", "message": "Log rotation completed", "current_log_file": str(log_manager.get_current_log_file()) }) except Exception as e: logger.error(f"Erreur rotation logs: {e}") return JSONResponse({ "status": "error", "message": str(e) }, status_code=500) # 🌐 Routes Web Interface @app.get("/", response_class=HTMLResponse) async def root(): """Redirection vers le dashboard""" return HTMLResponse(""" <html> <head> <meta http-equiv="refresh" content="0; url=/login"> </head> <body> <p>Redirection vers le dashboard...</p> </body> </html> """) @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request): """Page de connexion""" return templates.TemplateResponse("login.html", {"request": request}) @app.get("/register", response_class=HTMLResponse) async def register_page(request: Request): """Page d'inscription""" return templates.TemplateResponse("register.html", {"request": request}) @app.get("/dashboard", response_class=HTMLResponse) async def dashboard_page(request: Request): """Page principale du dashboard""" return templates.TemplateResponse("dashboard.html", {"request": request}) @app.get("/permissions", response_class=HTMLResponse) async def permissions_page(request: Request): """Page de gestion des permissions""" return templates.TemplateResponse("dashboard.html", {"request": request}) @app.get("/config", response_class=HTMLResponse) async def config_page(request: Request): """Page de configuration""" return templates.TemplateResponse("dashboard.html", {"request": request}) @app.get("/tools", response_class=HTMLResponse) async def tools_page(request: Request): """Page des outils MCP""" return templates.TemplateResponse("dashboard.html", {"request": request}) @app.get("/logs", response_class=HTMLResponse) async def logs_page(request: Request): """Page des logs""" return templates.TemplateResponse("dashboard.html", {"request": request}) @app.get("/admin", response_class=HTMLResponse) async def admin_page(request: Request): """Page d'administration""" return templates.TemplateResponse("dashboard.html", {"request": request}) # API pour charger les templates de contenu @app.get("/api/templates/dashboard-overview", response_class=HTMLResponse) async def get_dashboard_overview(): """Retourne le template de vue d'ensemble du dashboard""" try: with open("web/templates/dashboard_overview.html", "r", encoding="utf-8") as f: return HTMLResponse(f.read()) except FileNotFoundError: raise HTTPException(status_code=404, detail="Template non trouvĂ©") @app.get("/api/templates/permissions", response_class=HTMLResponse) async def get_permissions_template(): """Retourne le template de gestion des permissions""" try: with open("web/templates/permissions.html", "r", encoding="utf-8") as f: return HTMLResponse(f.read()) except FileNotFoundError: raise HTTPException(status_code=404, detail="Template non trouvĂ©") @app.get("/api/templates/config", response_class=HTMLResponse) async def get_config_template(): """Retourne le template de configuration""" try: with open("web/templates/config.html", "r", encoding="utf-8") as f: return HTMLResponse(f.read()) except FileNotFoundError: raise HTTPException(status_code=404, detail="Template non trouvĂ©") @app.get("/api/templates/tools", response_class=HTMLResponse) async def get_tools_template(): """Retourne le template des outils MCP""" try: with open("web/templates/tools.html", "r", encoding="utf-8") as f: return HTMLResponse(f.read()) except FileNotFoundError: raise HTTPException(status_code=404, detail="Template non trouvĂ©") @app.get("/api/templates/logs", response_class=HTMLResponse) async def get_logs_template(): """Retourne le template des logs""" try: with open("web/templates/logs.html", "r", encoding="utf-8") as f: return HTMLResponse(f.read()) except FileNotFoundError: raise HTTPException(status_code=404, detail="Template non trouvĂ©") @app.get("/api/templates/admin", response_class=HTMLResponse) async def get_admin_template(): """Retourne le template d'administration""" try: with open("web/templates/admin.html", "r", encoding="utf-8") as f: return HTMLResponse(f.read()) except FileNotFoundError: raise HTTPException(status_code=404, detail="Template non trouvĂ©") # API pour les mĂ©triques du dashboard @app.get("/api/metrics") async def get_dashboard_metrics(): """Retourne les mĂ©triques pour le dashboard""" try: # Compter les connexions actives active_connections = len(session_pool.get_active_sessions()) if session_pool else 0 # Compter les outils MCP disponibles (utiliser la fonction get_tools existante) tools_data = await get_tools() total_tools = len(tools_data) if tools_data else 0 # Calculer l'uptime if hasattr(app.state, 'start_time'): uptime = int((datetime.now() - app.state.start_time).total_seconds()) else: uptime = 0 # Simuler des requĂȘtes par heure basĂ©es sur l'activitĂ© rĂ©elle current_hour = datetime.now().hour requests_last_hour = max(1, (current_hour + active_connections + total_tools) % 10) # GĂ©nĂ©rer des donnĂ©es d'activitĂ© pour les derniĂšres 24h avec simulation rĂ©aliste activity_data = [] for i in range(24): hour_start = datetime.now().replace(minute=0, second=0, microsecond=0) - timedelta(hours=23-i) # Simuler une activitĂ© variable selon l'heure base_requests = max(0, (hour_start.hour % 12) - 3) requests_count = base_requests + (i % 3) # Variation pour rendre rĂ©aliste activity_data.append({ "hour": hour_start.strftime("%H:%M"), "requests": requests_count }) return { "active_connections": active_connections, "total_tools": total_tools, "requests_per_hour": requests_last_hour, "uptime": uptime, "activity_data": activity_data } except Exception as e: logger.error(f"Erreur rĂ©cupĂ©ration mĂ©triques: {e}") # Valeurs de fallback avec des donnĂ©es rĂ©alistes return { "active_connections": 1, "total_tools": 3, "requests_per_hour": 5, "uptime": 300, "activity_data": [{"hour": f"{i:02d}:00", "requests": max(0, (i % 12) - 3)} for i in range(24)] } @app.get("/api/connections/recent") async def get_recent_connections(): """Retourne les connexions rĂ©centes""" try: # RĂ©cupĂ©rer les sessions actives connections = [] active_sessions = session_pool.get_active_sessions() if session_pool else {} for session_id, session in active_sessions.items(): connections.append({ "client_ip": getattr(session, 'client_ip', '127.0.0.1'), "connected_at": getattr(session, 'created_at', datetime.now()).isoformat(), "active": True, "requests_count": getattr(session, 'request_count', 1) }) # Si pas de connexions actives, ajouter des donnĂ©es d'exemple if not connections: now = datetime.now() connections = [ { "client_ip": "127.0.0.1", "connected_at": (now - timedelta(minutes=5)).isoformat(), "active": True, "requests_count": 12 }, { "client_ip": "192.168.1.100", "connected_at": (now - timedelta(minutes=15)).isoformat(), "active": False, "requests_count": 8 }, { "client_ip": "192.168.1.50", "connected_at": (now - timedelta(hours=1)).isoformat(), "active": False, "requests_count": 23 } ] # Trier par date de connexion (plus rĂ©cents en premier) connections.sort(key=lambda x: x["connected_at"], reverse=True) return connections[:10] # Limiter Ă  10 connexions except Exception as e: logger.error(f"Erreur rĂ©cupĂ©ration connexions: {e}") # DonnĂ©es de fallback now = datetime.now() return [ { "client_ip": "127.0.0.1", "connected_at": (now - timedelta(minutes=2)).isoformat(), "active": True, "requests_count": 5 } ] # Configuration endpoints @app.get("/api/config") async def get_config(): """Retourne la configuration actuelle du systĂšme""" try: # 1. Essayer de rĂ©cupĂ©rer depuis la base de donnĂ©es en prioritĂ© db_config = await db_manager.get_user_ha_config("beroute") logger.info(f"🔍 Configuration BDD rĂ©cupĂ©rĂ©e: {db_config}") if db_config: # Configuration trouvĂ©e en base de donnĂ©es hass_url = db_config["hass_url"] hass_token = db_config["hass_token"] source = "database" logger.info(f"✅ Utilisation configuration BDD: {hass_url}") else: # Fallback sur les variables d'environnement hass_url = os.getenv("HASS_URL", os.getenv("HOMEASSISTANT_URL", "")) hass_token = os.getenv("HASS_TOKEN", os.getenv("HOMEASSISTANT_TOKEN", "")) source = "environment" logger.info(f"⚠ Fallback sur environnement: {hass_url}") # Format attendu par le frontend (clĂ©s directes) config = { "hass_url": hass_url, "hass_token": hass_token, # Retourner le vrai token pour le formulaire "source": source, "homeassistant": { "url": hass_url, "token": hass_token, "timeout": int(os.getenv("HOMEASSISTANT_TIMEOUT", "10")), "retries": int(os.getenv("HOMEASSISTANT_RETRIES", "3")), "ssl_verify": os.getenv("HOMEASSISTANT_SSL_VERIFY", "true").lower() == "true", "connected": bool(hass_token and hass_url) }, "server": { "host": os.getenv("SERVER_HOST", "0.0.0.0"), "port": int(os.getenv("SERVER_PORT", "8080")), "max_sessions": int(os.getenv("MAX_SESSIONS", "10")), "session_timeout": int(os.getenv("SESSION_TIMEOUT", "30")) }, "database": { "file": os.getenv("DATABASE_FILE", "bridge_data.db"), "log_retention": int(os.getenv("LOG_RETENTION_DAYS", "30")), "auto_cleanup": os.getenv("AUTO_CLEANUP", "true").lower() == "true", "auto_backup": os.getenv("AUTO_BACKUP", "false").lower() == "true" }, "cache": { "ttl": int(os.getenv("CACHE_TTL", "300")), "max_entries": int(os.getenv("CACHE_MAX_ENTRIES", "1000")), "circuit_threshold": int(os.getenv("CIRCUIT_BREAKER_THRESHOLD", "5")) } } return config except Exception as e: logger.error(f"Erreur rĂ©cupĂ©ration configuration: {e}") # Configuration d'urgence return { "hass_url": "", "hass_token": "", "source": "fallback", "homeassistant": {"url": "", "token": ""}, "server": {"host": "0.0.0.0", "port": 8080}, "database": {"file": "bridge_data.db"}, "cache": {"ttl": 300} } @app.post("/api/config") async def update_config(config_data: dict): """Met Ă  jour la configuration du systĂšme""" try: config_updated = False # Validation et mise Ă  jour des donnĂ©es de configuration if "homeassistant" in config_data: ha_config = config_data["homeassistant"] url = ha_config.get("url", "").rstrip("/") token = ha_config.get("token", "") if url and token and token != "***": # Sauvegarder en base de donnĂ©es success = await db_manager.save_user_ha_config("beroute", url, token, "user_config") if success: logger.info(f"✅ Configuration Home Assistant sauvegardĂ©e en BDD pour beroute") config_updated = True else: logger.error("❌ Échec sauvegarde configuration HA en BDD") # Également gĂ©rer les clĂ©s directes (format alternatif) if "hass_url" in config_data and "hass_token" in config_data: url = config_data["hass_url"].rstrip("/") if config_data["hass_url"] else "" token = config_data["hass_token"] if config_data["hass_token"] else "" if url and token and token != "***": success = await db_manager.save_user_ha_config("beroute", url, token, "direct_config") if success: logger.info(f"✅ Configuration Home Assistant (format direct) sauvegardĂ©e en BDD") config_updated = True else: logger.error("❌ Échec sauvegarde configuration HA (format direct) en BDD") # Fallback : sauvegarder aussi dans les variables d'environnement pour compatibilitĂ© if config_updated: try: from pathlib import Path # Mettre Ă  jour les variables d'environnement en cours if "homeassistant" in config_data: ha_config = config_data["homeassistant"] if "url" in ha_config: os.environ["HASS_URL"] = ha_config["url"] if "token" in ha_config and ha_config["token"] != "***": os.environ["HASS_TOKEN"] = ha_config["token"] # Sauvegarder dans le fichier .env si il existe env_file = Path(".env") if env_file.exists(): lines = [] env_content = env_file.read_text(encoding='utf-8') for line in env_content.split('\n'): if line.startswith('HASS_URL='): lines.append(f'HASS_URL={os.environ.get("HASS_URL", "")}') elif line.startswith('HASS_TOKEN='): lines.append(f'HASS_TOKEN={os.environ.get("HASS_TOKEN", "")}') else: lines.append(line) env_file.write_text('\n'.join(lines), encoding='utf-8') logger.info("✅ Fichier .env mis Ă  jour Ă©galement") except Exception as env_error: logger.warning(f"⚠ Erreur mise Ă  jour .env: {env_error}") if config_updated: return {"status": "success", "message": "Configuration sauvegardĂ©e avec succĂšs en base de donnĂ©es"} else: return {"status": "warning", "message": "Aucune configuration valide Ă  sauvegarder"} except Exception as e: logger.error(f"❌ Erreur update_config: {e}") raise HTTPException(status_code=400, detail=f"Erreur lors de la sauvegarde: {str(e)}") @app.put("/api/config") async def update_config_put(config_data: dict): """Met Ă  jour la configuration du systĂšme (mĂ©thode PUT)""" # Utilise la mĂȘme logique que POST return await update_config(config_data) @app.post("/api/config/test") async def test_config(config_data: dict): """Teste une configuration""" try: if config_data.get("type") == "homeassistant": url = config_data.get("url") token = config_data.get("token") # Test de connexion Ă  Home Assistant import aiohttp async with aiohttp.ClientSession() as session: headers = {"Authorization": f"Bearer {token}"} async with session.get(f"{url}/api/", headers=headers) as response: if response.status == 200: return {"status": "success", "message": "Connexion rĂ©ussie"} else: return {"status": "error", "message": "Échec de la connexion"} return {"status": "success", "message": "Test rĂ©ussi"} except Exception as e: return {"status": "error", "message": str(e)} @app.get("/api/config/homeassistant-status") async def get_homeassistant_status(): """Retourne le statut de la connexion Home Assistant""" try: # RĂ©cupĂ©rer la configuration Home Assistant depuis les variables d'environnement ou la configuration import os hass_url = os.getenv("HASS_URL", "http://192.168.1.22:8123") hass_token = os.getenv("HASS_TOKEN", "") if not hass_token: return { "status": "not_configured", "message": "Token Home Assistant non configurĂ©", "url": hass_url, "connected": False } # Test de connexion Ă  Home Assistant import aiohttp import asyncio try: async with aiohttp.ClientSession() as session: headers = {"Authorization": f"Bearer {hass_token}"} async with session.get(f"{hass_url}/api/", headers=headers, timeout=5) as response: if response.status == 200: data = await response.json() return { "status": "connected", "message": "Connexion Home Assistant active", "url": hass_url, "connected": True, "version": data.get("version", "unknown") } else: return { "status": "error", "message": f"Erreur de connexion: {response.status}", "url": hass_url, "connected": False } except asyncio.TimeoutError: return { "status": "timeout", "message": "Timeout de connexion Ă  Home Assistant", "url": hass_url, "connected": False } except Exception as conn_error: return { "status": "error", "message": f"Erreur de connexion: {str(conn_error)}", "url": hass_url, "connected": False } except Exception as e: return { "status": "error", "message": f"Erreur interne: {str(e)}", "connected": False } @app.get("/api/homeassistant/diagnosis") async def diagnose_homeassistant(): """Diagnostic complet de la connexion Home Assistant""" try: # RĂ©cupĂ©rer la configuration depuis la base de donnĂ©es en prioritĂ© db_config = await db_manager.get_user_ha_config("beroute") if db_config: hass_url = db_config["hass_url"] hass_token = db_config["hass_token"] config_source = "database" else: # Fallback sur les variables d'environnement hass_url = os.getenv("HASS_URL", os.getenv("HOMEASSISTANT_URL", "")) hass_token = os.getenv("HASS_TOKEN", os.getenv("HOMEASSISTANT_TOKEN", "")) config_source = "environment" diagnosis = { "config": { "url_configured": bool(hass_url), "token_configured": bool(hass_token and hass_token != "test_token"), "url": hass_url if hass_url else "Non configurĂ©", "token_type": "test" if hass_token == "test_token" else ("valide" if hass_token else "manquant"), "source": config_source }, "connectivity": { "accessible": False, "authenticated": False, "api_version": None, "error": None }, "recommendations": [] } if not hass_url: diagnosis["recommendations"].append("Configurer l'URL de Home Assistant") return diagnosis if not hass_token or hass_token == "test_token": diagnosis["recommendations"].append("Configurer un token d'accĂšs valide") # Test de connectivitĂ© try: import aiohttp import asyncio async with aiohttp.ClientSession() as session: # Test sans authentification try: async with session.get(f"{hass_url}/api/", timeout=5) as response: diagnosis["connectivity"]["accessible"] = True if response.status == 401: diagnosis["connectivity"]["error"] = "Authentification requise (normal)" elif response.status == 200: data = await response.json() diagnosis["connectivity"]["api_version"] = data.get("version") except Exception as e: diagnosis["connectivity"]["accessible"] = False diagnosis["connectivity"]["error"] = f"Serveur inaccessible: {str(e)}" diagnosis["recommendations"].append("VĂ©rifier que Home Assistant fonctionne") return diagnosis # Test avec authentification si token disponible if hass_token and hass_token != "test_token": try: headers = {"Authorization": f"Bearer {hass_token}"} async with session.get(f"{hass_url}/api/", headers=headers, timeout=5) as response: if response.status == 200: data = await response.json() diagnosis["connectivity"]["authenticated"] = True diagnosis["connectivity"]["api_version"] = data.get("version") else: diagnosis["connectivity"]["error"] = f"Authentification Ă©chouĂ©e: {response.status}" diagnosis["recommendations"].append("VĂ©rifier la validitĂ© du token d'accĂšs") except Exception as e: diagnosis["connectivity"]["error"] = f"Erreur d'authentification: {str(e)}" except ImportError: diagnosis["connectivity"]["error"] = "Module aiohttp non disponible" diagnosis["recommendations"].append("Installer aiohttp: pip install aiohttp") except Exception as e: diagnosis["connectivity"]["error"] = f"Erreur test: {str(e)}" # Recommandations finales if diagnosis["connectivity"]["accessible"] and not diagnosis["connectivity"]["authenticated"]: diagnosis["recommendations"].append("CrĂ©er un token d'accĂšs dans Home Assistant: Profil → SĂ©curitĂ© → Tokens d'accĂšs Ă  long terme") if not diagnosis["recommendations"]: diagnosis["recommendations"].append("Configuration correcte ✅") return diagnosis except Exception as e: return { "error": f"Erreur diagnostic: {str(e)}", "recommendations": ["Contacter le support technique"] } # Endpoint de test de configuration Home Assistant @app.post("/api/config/test-homeassistant") async def test_homeassistant_config(config: dict): """Teste la connexion Ă  Home Assistant avec une configuration donnĂ©e""" try: import aiohttp import asyncio url = config.get("url", "").rstrip("/") token = config.get("token", "") if not url or not token: return { "success": False, "message": "URL et token requis" } try: async with aiohttp.ClientSession() as session: headers = {"Authorization": f"Bearer {token}"} async with session.get(f"{url}/api/", headers=headers, timeout=10) as response: if response.status == 200: data = await response.json() return { "success": True, "message": f"Connexion rĂ©ussie! Version HA: {data.get('version', 'inconnu')}", "version": data.get("version"), "url": url } else: return { "success": False, "message": f"Erreur HTTP {response.status}: {await response.text()}" } except asyncio.TimeoutError: return { "success": False, "message": "Timeout de connexion (10s)" } except Exception as conn_error: return { "success": False, "message": f"Erreur de connexion: {str(conn_error)}" } except Exception as e: return { "success": False, "message": f"Erreur: {str(e)}" } # Outils MCP endpoints @app.get("/api/tools") async def get_tools(): """Retourne la liste des outils MCP disponibles""" try: # Essayer de rĂ©cupĂ©rer les vrais outils MCP depuis une session active active_sessions = session_pool.get_active_sessions() if active_sessions: # Prendre la premiĂšre session active session_id = list(active_sessions.keys())[0] # CrĂ©er une requĂȘte pour lister les outils queued_request = QueuedRequest( id=str(uuid.uuid4()), session_id=session_id, method="tools/list", params={}, priority=Priority.HIGH, created_at=datetime.now(), timeout_seconds=10 ) # ExĂ©cuter la requĂȘte await request_queue.enqueue(queued_request) result = await request_queue.get_result(queued_request.id, 10) if result.status == RequestStatus.COMPLETED and result.result and 'tools' in result.result: # Retourner directement le tableau d'outils return result.result['tools'] except Exception as e: logger.warning(f"Impossible de rĂ©cupĂ©rer les outils MCP: {e}") # Fallback: retourner des outils d'exemple si MCP n'est pas disponible tools = [ { "id": "light_control", "name": "ContrĂŽle d'Ă©clairage", "description": "Gestion des lumiĂšres Home Assistant", "category": "homeassistant", "status": "active", "last_used": "2024-01-15T10:30:00Z", "usage_count": 45 }, { "id": "sensor_read", "name": "Lecture de capteurs", "description": "Lecture des valeurs de capteurs", "category": "sensors", "status": "active", "last_used": "2024-01-15T09:15:00Z", "usage_count": 128 }, { "id": "automation_trigger", "name": "DĂ©clenchement d'automations", "description": "DĂ©clenche des automations Home Assistant", "category": "automation", "status": "inactive", "last_used": "2024-01-14T15:45:00Z", "usage_count": 23 } ] return tools @app.post("/api/tools/health-check") async def health_check_tool(request: dict): """VĂ©rifie la santĂ© d'un outil MCP""" try: tool_name = request.get('tool_name', '') # Simulation du health check d'outil # Dans une vraie implĂ©mentation, ceci testerait la connectivitĂ© avec l'outil MCP # Simuler des rĂ©sultats variables pour la dĂ©monstration import time current_time = int(time.time()) is_healthy = (current_time % 4) != 0 # 75% de succĂšs basĂ© sur le temps if is_healthy: response_time = 50 + (current_time % 150) # Entre 50 et 200ms return { "status": "success", "tool_name": tool_name, "healthy": True, "response_time": response_time, "message": f"Outil {tool_name} opĂ©rationnel" } else: return { "status": "error", "tool_name": tool_name, "healthy": False, "response_time": None, "message": f"Outil {tool_name} non disponible" } except Exception as e: return { "status": "error", "healthy": False, "message": f"Erreur lors du test: {str(e)}" } @app.post("/api/tools/{tool_id}/test") async def test_tool(tool_id: str, test_data: dict = None): """Teste un outil MCP""" try: # Simulation du test d'outil await asyncio.sleep(1) # Simulation d'un dĂ©lai result = { "tool_id": tool_id, "status": "success", "result": { "execution_time": "0.245s", "output": f"Test de l'outil {tool_id} rĂ©ussi", "parameters": test_data or {} }, "timestamp": datetime.now().isoformat() } return result except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/api/tools/statistics") async def get_tools_statistics(): """Retourne les statistiques d'utilisation des outils""" stats = { "total_tools": 15, "active_tools": 12, "total_executions": 1247, "avg_execution_time": "0.156s", "success_rate": 98.7, "most_used": { "tool_id": "sensor_read", "name": "Lecture de capteurs", "usage_count": 128 } } return stats # Logs endpoints @app.get("/api/logs") async def get_logs( page: int = 1, limit: int = 50, level: str = None, category: str = None, search: str = None, start_date: str = None, end_date: str = None ): """Retourne les logs du systĂšme avec pagination et filtrage""" # Simulation de logs logs = [] for i in range(limit): log_entry = { "id": f"log_{page}_{i}", "timestamp": (datetime.now() - timedelta(minutes=i*5)).isoformat(), "level": ["INFO", "ERROR", "WARNING", "DEBUG"][i % 4], "category": ["homeassistant", "mcp", "database", "auth"][i % 4], "message": f"Message de log d'exemple {i+1}", "details": f"DĂ©tails supplĂ©mentaires pour le log {i+1}" } logs.append(log_entry) # Filtrage simulĂ© if level: logs = [log for log in logs if log["level"] == level.upper()] if category: logs = [log for log in logs if log["category"] == category] if search: logs = [log for log in logs if search.lower() in log["message"].lower()] return { "logs": logs, "pagination": { "page": page, "limit": limit, "total": 1500, "pages": 30 } } @app.get("/api/logs/export") async def export_logs(format: str = "json"): """Exporte les logs du systĂšme""" from fastapi.responses import Response # Simulation d'export if format == "csv": # Retourner un fichier CSV content = "timestamp,level,category,message\n" for i in range(100): timestamp = (datetime.now() - timedelta(minutes=i*5)).isoformat() level = ["INFO", "ERROR", "WARNING", "DEBUG"][i % 4] category = ["homeassistant", "mcp", "database", "auth"][i % 4] message = f"Message de log d'exemple {i+1}" content += f"{timestamp},{level},{category},{message}\n" return Response( content=content, media_type="text/csv", headers={"Content-Disposition": "attachment; filename=logs.csv"} ) else: # Export JSON par dĂ©faut logs = [] for i in range(100): log_entry = { "timestamp": (datetime.now() - timedelta(minutes=i*5)).isoformat(), "level": ["INFO", "ERROR", "WARNING", "DEBUG"][i % 4], "category": ["homeassistant", "mcp", "database", "auth"][i % 4], "message": f"Message de log d'exemple {i+1}" } logs.append(log_entry) return {"logs": logs} @app.delete("/api/logs/clear") async def clear_logs(): """Supprime tous les logs du systĂšme""" try: # Ici on supprimerait les logs de la base de donnĂ©es # Pour la dĂ©mo, on simule juste la suppression logs_cleared = 150 # Nombre simulĂ© de logs supprimĂ©s return { "status": "success", "message": f"{logs_cleared} logs supprimĂ©s avec succĂšs", "cleared_count": logs_cleared, "timestamp": datetime.now().isoformat() } except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur lors de la suppression des logs: {str(e)}") # Users and Permissions endpoints @app.get("/api/users") async def get_users(): """Retourne la liste des utilisateurs pour le PermissionsManager""" try: # Simulation d'utilisateurs pour la dĂ©mo users = [ { "id": 1, "username": "admin", "email": "admin@localhost", "role": "administrator", "status": "active", "created_at": "2024-01-01T00:00:00Z", "last_login": "2024-12-20T10:30:00Z" }, { "id": 2, "username": "user1", "email": "user1@localhost", "role": "user", "status": "active", "created_at": "2024-01-15T00:00:00Z", "last_login": "2024-12-19T14:20:00Z" }, { "id": 3, "username": "demo", "email": "demo@localhost", "role": "viewer", "status": "inactive", "created_at": "2024-02-01T00:00:00Z", "last_login": "2024-12-18T09:15:00Z" } ] return { "status": "success", "users": users, "total": len(users), "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Erreur lors de la rĂ©cupĂ©ration des utilisateurs: {e}") raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") @app.get("/api/permissions") async def get_permissions(): """Retourne la liste des permissions pour le PermissionsManager""" try: # Simulation de permissions pour la dĂ©mo permissions = [ { "id": 1, "name": "read_entities", "description": "Lire les entitĂ©s Home Assistant", "category": "homeassistant", "enabled": True }, { "id": 2, "name": "write_entities", "description": "Modifier les entitĂ©s Home Assistant", "category": "homeassistant", "enabled": True }, { "id": 3, "name": "call_services", "description": "Appeler les services Home Assistant", "category": "homeassistant", "enabled": True }, { "id": 4, "name": "read_logs", "description": "Consulter les logs du systĂšme", "category": "system", "enabled": True }, { "id": 5, "name": "manage_config", "description": "GĂ©rer la configuration du systĂšme", "category": "system", "enabled": False }, { "id": 6, "name": "admin_access", "description": "AccĂšs aux fonctions d'administration", "category": "admin", "enabled": False } ] return { "status": "success", "permissions": permissions, "total": len(permissions), "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Erreur lors de la rĂ©cupĂ©ration des permissions: {e}") raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") # Administration endpoints @app.get("/api/admin/users") async def get_admin_users(page: int = 1, limit: int = 20): """Retourne la liste des utilisateurs pour l'administration""" # Simulation d'utilisateurs users = [] for i in range(limit): user = { "id": f"user_{i+1}", "username": f"utilisateur{i+1}", "email": f"user{i+1}@example.com", "role": ["admin", "user", "moderator"][i % 3], "last_login": (datetime.now() - timedelta(days=i)).isoformat(), "status": "active" if i % 4 != 0 else "inactive", "created_at": (datetime.now() - timedelta(days=30+i)).isoformat() } users.append(user) return { "users": users, "pagination": { "page": page, "limit": limit, "total": 156, "pages": 8 } } @app.post("/api/admin/users") async def create_user(user_data: dict): """CrĂ©e un nouvel utilisateur""" try: # Validation des donnĂ©es utilisateur required_fields = ["username", "email", "password", "role"] for field in required_fields: if field not in user_data: raise HTTPException(status_code=400, detail=f"Champ {field} requis") # Simulation de crĂ©ation d'utilisateur new_user = { "id": f"user_{len(user_data)+1}", "username": user_data["username"], "email": user_data["email"], "role": user_data["role"], "created_at": datetime.now().isoformat(), "status": "active" } return {"status": "success", "user": new_user} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.put("/api/admin/users/{user_id}") async def update_user(user_id: str, user_data: dict): """Met Ă  jour un utilisateur""" try: # Simulation de mise Ă  jour updated_user = { "id": user_id, "username": user_data.get("username", f"user_{user_id}"), "email": user_data.get("email", f"user_{user_id}@example.com"), "role": user_data.get("role", "user"), "updated_at": datetime.now().isoformat() } return {"status": "success", "user": updated_user} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.delete("/api/admin/users/{user_id}") async def delete_user(user_id: str): """Supprime un utilisateur""" try: # Simulation de suppression return {"status": "success", "message": f"Utilisateur {user_id} supprimĂ©"} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/api/admin/system/metrics") async def get_system_metrics(): """Retourne les mĂ©triques systĂšme pour l'administration""" import psutil try: metrics = { "cpu_usage": psutil.cpu_percent(interval=1), "memory": { "total": psutil.virtual_memory().total, "available": psutil.virtual_memory().available, "percent": psutil.virtual_memory().percent }, "disk": { "total": psutil.disk_usage('/').total, "free": psutil.disk_usage('/').free, "percent": psutil.disk_usage('/').percent }, "network": { "bytes_sent": psutil.net_io_counters().bytes_sent, "bytes_recv": psutil.net_io_counters().bytes_recv } } return metrics except Exception as e: # Si psutil n'est pas disponible, retourner des donnĂ©es simulĂ©es return { "cpu_usage": 45.2, "memory": { "total": 8589934592, "available": 4294967296, "percent": 50.0 }, "disk": { "total": 1073741824000, "free": 536870912000, "percent": 50.0 }, "network": { "bytes_sent": 1048576, "bytes_recv": 2097152 } } @app.post("/api/admin/maintenance/{action}") async def maintenance_action(action: str): """ExĂ©cute une action de maintenance""" try: if action == "restart": # Simulation de redĂ©marrage return {"status": "success", "message": "RedĂ©marrage programmĂ©"} elif action == "backup": # Simulation de sauvegarde return {"status": "success", "message": "Sauvegarde créée"} elif action == "cleanup": # Simulation de nettoyage return {"status": "success", "message": "Nettoyage effectuĂ©", "freed_space": "245MB"} else: raise HTTPException(status_code=400, detail="Action non reconnue") except Exception as e: raise HTTPException(status_code=400, detail=str(e)) # WebSocket endpoint pour les connexions en temps rĂ©el @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() # Envoyer un message de bienvenue en JSON welcome_message = { "type": "welcome", "message": "Connexion WebSocket Ă©tablie", "timestamp": time.time() } await websocket.send_text(json.dumps(welcome_message)) try: while True: # Attendre les messages du client data = await websocket.receive_text() try: # Essayer de parser le message comme JSON message = json.loads(data) # PrĂ©parer la rĂ©ponse en JSON response = { "type": "response", "original_message": message, "timestamp": time.time(), "status": "received" } # Traiter diffĂ©rents types de messages if message.get("type") == "ping": response["type"] = "pong" elif message.get("type") == "status_request": response["type"] = "status" response["data"] = { "server": "running", "connections": 1, "uptime": time.time() } await websocket.send_text(json.dumps(response)) except json.JSONDecodeError: # Si ce n'est pas du JSON, traiter comme texte simple response = { "type": "echo", "message": data, "timestamp": time.time() } await websocket.send_text(json.dumps(response)) except WebSocketDisconnect: print("Client WebSocket dĂ©connectĂ©") except Exception as e: print(f"Erreur WebSocket: {e}") await websocket.close() if __name__ == "__main__": uvicorn.run( "bridge_server:app", host="0.0.0.0", port=8080, reload=False, # DĂ©sactiver reload pour Ă©viter les conflits log_level="info" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jonathan97480/McpHomeAssistant'

If you have feedback or need assistance with the MCP directory API, please join our Discord server