Skip to main content
Glama
permissions_manager.py‱21.9 kB
#!/usr/bin/env python3 """ 🔐 Permissions Manager Gestionnaire des permissions granulaires par outil MCP avec cache et hĂ©ritage """ import json import time import asyncio import logging from datetime import datetime, timedelta from typing import Dict, List, Optional, Set, Any, Tuple from dataclasses import dataclass, asdict from enum import Enum from collections import defaultdict from pydantic import BaseModel, Field # Import du systĂšme de base de donnĂ©es from database import db_manager logger = logging.getLogger(__name__) class PermissionType(Enum): """Types de permissions""" READ = "read" WRITE = "write" EXECUTE = "execute" # Pour les appels d'outils class ToolCategory(Enum): """CatĂ©gories d'outils MCP""" GENERAL = "general" LIGHTS = "lights" SENSORS = "sensors" CLIMATE = "climate" MEDIA = "media" SCRIPTS = "scripts" AUTOMATION = "automation" SECURITY = "security" ENERGY = "energy" DIAGNOSTICS = "diagnostics" @dataclass class ToolPermission: """Permission pour un outil spĂ©cifique""" tool_name: str can_read: bool = True can_write: bool = False is_enabled: bool = True tool_category: str = "general" description: Optional[str] = None last_used: Optional[datetime] = None @dataclass class UserPermissionCache: """Cache des permissions utilisateur""" user_id: int permissions: Dict[str, ToolPermission] last_updated: datetime cache_ttl: int = 300 # 5 minutes par dĂ©faut class DefaultPermissionCreate(BaseModel): """ModĂšle pour crĂ©er des permissions par dĂ©faut""" tool_name: str = Field(..., min_length=1, max_length=100) can_read: bool = True can_write: bool = False is_enabled: bool = True tool_category: str = "general" description: Optional[str] = Field(None, max_length=500) class UserPermissionUpdate(BaseModel): """ModĂšle pour mettre Ă  jour les permissions utilisateur""" can_read: Optional[bool] = None can_write: Optional[bool] = None is_enabled: Optional[bool] = None custom_settings: Optional[Dict[str, Any]] = None class PermissionSummary(BaseModel): """RĂ©sumĂ© des permissions d'un utilisateur""" tool_name: str can_read: bool can_write: bool is_enabled: bool tool_category: str description: Optional[str] is_custom: bool = False # True si l'utilisateur a personnalisĂ© cette permission last_used: Optional[datetime] = None class BulkPermissionUpdate(BaseModel): """Mise Ă  jour en masse des permissions""" tool_names: List[str] can_read: Optional[bool] = None can_write: Optional[bool] = None is_enabled: Optional[bool] = None class PermissionsManager: """Gestionnaire des permissions granulaires""" def __init__(self): self.permission_cache: Dict[int, UserPermissionCache] = {} self.default_permissions_cache: Optional[Dict[str, ToolPermission]] = None self.default_cache_ttl = 600 # 10 minutes pour les permissions par dĂ©faut self.last_default_update = datetime.min # Permissions par dĂ©faut pour les outils courants HA self.builtin_permissions = { # Éclairage "light.toggle": ToolPermission("light.toggle", True, True, True, "lights", "Allumer/Ă©teindre les lumiĂšres"), "light.set_brightness": ToolPermission("light.set_brightness", True, True, True, "lights", "RĂ©gler la luminositĂ©"), "light.set_color": ToolPermission("light.set_color", True, True, True, "lights", "Changer la couleur"), # Capteurs "sensor.read_temperature": ToolPermission("sensor.read_temperature", True, False, True, "sensors", "Lire les capteurs de tempĂ©rature"), "sensor.read_humidity": ToolPermission("sensor.read_humidity", True, False, True, "sensors", "Lire les capteurs d'humiditĂ©"), "sensor.read_motion": ToolPermission("sensor.read_motion", True, False, True, "sensors", "Lire les dĂ©tecteurs de mouvement"), # Climat "climate.set_temperature": ToolPermission("climate.set_temperature", True, True, True, "climate", "RĂ©gler la tempĂ©rature"), "climate.set_mode": ToolPermission("climate.set_mode", True, True, True, "climate", "Changer le mode climatisation"), # MĂ©dias "media_player.play": ToolPermission("media_player.play", True, True, True, "media", "ContrĂŽler la lecture mĂ©dia"), "media_player.volume": ToolPermission("media_player.volume", True, True, True, "media", "RĂ©gler le volume"), # Scripts et automatisations "script.run": ToolPermission("script.run", True, True, False, "scripts", "ExĂ©cuter des scripts"), "automation.trigger": ToolPermission("automation.trigger", True, True, False, "automation", "DĂ©clencher des automatisations"), # SĂ©curitĂ© "alarm.arm": ToolPermission("alarm.arm", True, True, False, "security", "Armer l'alarme"), "alarm.disarm": ToolPermission("alarm.disarm", True, True, False, "security", "DĂ©sarmer l'alarme"), "lock.control": ToolPermission("lock.control", True, True, False, "security", "ContrĂŽler les serrures"), # Énergie "energy.usage": ToolPermission("energy.usage", True, False, True, "energy", "Consulter la consommation Ă©nergĂ©tique"), # Diagnostics "system.diagnostics": ToolPermission("system.diagnostics", True, False, False, "diagnostics", "Diagnostics systĂšme"), "log.view": ToolPermission("log.view", True, False, False, "diagnostics", "Consulter les logs") } async def initialize(self): """Initialise le gestionnaire de permissions""" try: # CrĂ©er les permissions par dĂ©faut si elles n'existent pas await self._ensure_default_permissions() logger.info("🔐 Gestionnaire de permissions initialisĂ©") except Exception as e: logger.error(f"Erreur initialisation permissions: {e}") raise async def _ensure_default_permissions(self): """Assure que les permissions par dĂ©faut existent en base""" try: # VĂ©rifier les permissions existantes existing_perms = await db_manager.fetch_all( "SELECT tool_name FROM default_permissions" ) existing_tools = {perm['tool_name'] for perm in existing_perms} # Ajouter les permissions manquantes for tool_name, perm in self.builtin_permissions.items(): if tool_name not in existing_tools: await db_manager.execute( """INSERT INTO default_permissions (tool_name, can_read, can_write, is_enabled, tool_category, description, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", (perm.tool_name, perm.can_read, perm.can_write, perm.is_enabled, perm.tool_category, perm.description, datetime.now(), datetime.now()) ) logger.info(f"✅ {len(self.builtin_permissions)} permissions par dĂ©faut assurĂ©es") except Exception as e: logger.error(f"Erreur crĂ©ation permissions par dĂ©faut: {e}") raise async def get_default_permissions(self) -> Dict[str, ToolPermission]: """RĂ©cupĂšre les permissions par dĂ©faut avec cache""" try: # VĂ©rifier le cache if (self.default_permissions_cache is not None and (datetime.now() - self.last_default_update).total_seconds() < self.default_cache_ttl): return self.default_permissions_cache # Charger depuis la base perms_data = await db_manager.fetch_all( "SELECT * FROM default_permissions ORDER BY tool_category, tool_name" ) permissions = {} for perm in perms_data: permissions[perm['tool_name']] = ToolPermission( tool_name=perm['tool_name'], can_read=perm['can_read'], can_write=perm['can_write'], is_enabled=perm['is_enabled'], tool_category=perm['tool_category'], description=perm['description'] ) # Mettre Ă  jour le cache self.default_permissions_cache = permissions self.last_default_update = datetime.now() return permissions except Exception as e: logger.error(f"Erreur rĂ©cupĂ©ration permissions par dĂ©faut: {e}") return {} async def get_user_permissions(self, user_id: int, force_refresh: bool = False) -> Dict[str, ToolPermission]: """RĂ©cupĂšre les permissions d'un utilisateur avec cache""" try: # VĂ©rifier le cache utilisateur if not force_refresh and user_id in self.permission_cache: cache = self.permission_cache[user_id] if (datetime.now() - cache.last_updated).total_seconds() < cache.cache_ttl: return cache.permissions # Charger les permissions par dĂ©faut default_perms = await self.get_default_permissions() # Charger les permissions personnalisĂ©es de l'utilisateur user_perms_data = await db_manager.fetch_all( "SELECT * FROM user_tool_permissions WHERE user_id = ?", (user_id,) ) # Construire le dictionnaire des permissions finales final_permissions = {} # Commencer par les permissions par dĂ©faut for tool_name, default_perm in default_perms.items(): final_permissions[tool_name] = ToolPermission( tool_name=default_perm.tool_name, can_read=default_perm.can_read, can_write=default_perm.can_write, is_enabled=default_perm.is_enabled, tool_category=default_perm.tool_category, description=default_perm.description ) # Appliquer les personnalisations utilisateur for user_perm in user_perms_data: tool_name = user_perm['tool_name'] # Si l'outil existe dans les permissions par dĂ©faut if tool_name in final_permissions: perm = final_permissions[tool_name] perm.can_read = user_perm['can_read'] perm.can_write = user_perm['can_write'] perm.is_enabled = user_perm['is_enabled'] perm.last_used = user_perm['last_used'] else: # Outil personnalisĂ© pas dans les dĂ©fauts final_permissions[tool_name] = ToolPermission( tool_name=tool_name, can_read=user_perm['can_read'], can_write=user_perm['can_write'], is_enabled=user_perm['is_enabled'], tool_category="general", last_used=user_perm['last_used'] ) # Mettre Ă  jour le cache self.permission_cache[user_id] = UserPermissionCache( user_id=user_id, permissions=final_permissions, last_updated=datetime.now() ) return final_permissions except Exception as e: logger.error(f"Erreur rĂ©cupĂ©ration permissions utilisateur {user_id}: {e}") return {} async def check_permission(self, user_id: int, tool_name: str, permission_type: PermissionType) -> bool: """VĂ©rifie si un utilisateur a une permission spĂ©cifique""" try: user_perms = await self.get_user_permissions(user_id) if tool_name not in user_perms: logger.warning(f"Outil {tool_name} non trouvĂ© dans les permissions utilisateur {user_id}") return False perm = user_perms[tool_name] # VĂ©rifier si l'outil est activĂ© if not perm.is_enabled: return False # VĂ©rifier le type de permission if permission_type == PermissionType.READ: return perm.can_read elif permission_type == PermissionType.WRITE or permission_type == PermissionType.EXECUTE: return perm.can_write return False except Exception as e: logger.error(f"Erreur vĂ©rification permission {tool_name} pour utilisateur {user_id}: {e}") return False async def update_user_permission(self, user_id: int, tool_name: str, updates: UserPermissionUpdate) -> bool: """Met Ă  jour une permission utilisateur spĂ©cifique""" try: # VĂ©rifier si la permission existe dĂ©jĂ  existing = await db_manager.fetch_one( "SELECT * FROM user_tool_permissions WHERE user_id = ? AND tool_name = ?", (user_id, tool_name) ) now = datetime.now() if existing: # Mettre Ă  jour la permission existante set_clauses = [] params = [] if updates.can_read is not None: set_clauses.append("can_read = ?") params.append(updates.can_read) if updates.can_write is not None: set_clauses.append("can_write = ?") params.append(updates.can_write) if updates.is_enabled is not None: set_clauses.append("is_enabled = ?") params.append(updates.is_enabled) if updates.custom_settings is not None: set_clauses.append("custom_settings = ?") params.append(json.dumps(updates.custom_settings)) set_clauses.append("updated_at = ?") params.extend([now, user_id, tool_name]) query = f"UPDATE user_tool_permissions SET {', '.join(set_clauses)} WHERE user_id = ? AND tool_name = ?" await db_manager.execute(query, tuple(params)) else: # CrĂ©er une nouvelle permission personnalisĂ©e # RĂ©cupĂ©rer les valeurs par dĂ©faut default_perms = await self.get_default_permissions() default_perm = default_perms.get(tool_name) can_read = updates.can_read if updates.can_read is not None else (default_perm.can_read if default_perm else True) can_write = updates.can_write if updates.can_write is not None else (default_perm.can_write if default_perm else False) is_enabled = updates.is_enabled if updates.is_enabled is not None else (default_perm.is_enabled if default_perm else True) custom_settings = json.dumps(updates.custom_settings) if updates.custom_settings else None await db_manager.execute( """INSERT INTO user_tool_permissions (user_id, tool_name, can_read, can_write, is_enabled, custom_settings, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", (user_id, tool_name, can_read, can_write, is_enabled, custom_settings, now, now) ) # Invalider le cache utilisateur if user_id in self.permission_cache: del self.permission_cache[user_id] logger.info(f"Permission {tool_name} mise Ă  jour pour utilisateur {user_id}") return True except Exception as e: logger.error(f"Erreur mise Ă  jour permission {tool_name} pour utilisateur {user_id}: {e}") return False async def update_bulk_permissions(self, user_id: int, bulk_update: BulkPermissionUpdate) -> int: """Met Ă  jour plusieurs permissions en une fois""" try: updated_count = 0 for tool_name in bulk_update.tool_names: update = UserPermissionUpdate( can_read=bulk_update.can_read, can_write=bulk_update.can_write, is_enabled=bulk_update.is_enabled ) if await self.update_user_permission(user_id, tool_name, update): updated_count += 1 logger.info(f"Mise Ă  jour en masse: {updated_count}/{len(bulk_update.tool_names)} permissions pour utilisateur {user_id}") return updated_count except Exception as e: logger.error(f"Erreur mise Ă  jour en masse pour utilisateur {user_id}: {e}") return 0 async def get_user_permission_summary(self, user_id: int) -> List[PermissionSummary]: """RĂ©cupĂšre un rĂ©sumĂ© des permissions d'un utilisateur""" try: user_perms = await self.get_user_permissions(user_id) default_perms = await self.get_default_permissions() # RĂ©cupĂ©rer les permissions personnalisĂ©es custom_perms = await db_manager.fetch_all( "SELECT tool_name FROM user_tool_permissions WHERE user_id = ?", (user_id,) ) custom_tools = {perm['tool_name'] for perm in custom_perms} summary = [] for tool_name, perm in user_perms.items(): is_custom = tool_name in custom_tools summary.append(PermissionSummary( tool_name=perm.tool_name, can_read=perm.can_read, can_write=perm.can_write, is_enabled=perm.is_enabled, tool_category=perm.tool_category, description=perm.description, is_custom=is_custom, last_used=perm.last_used )) # Trier par catĂ©gorie puis par nom summary.sort(key=lambda x: (x.tool_category, x.tool_name)) return summary except Exception as e: logger.error(f"Erreur rĂ©cupĂ©ration rĂ©sumĂ© permissions pour utilisateur {user_id}: {e}") return [] async def record_tool_usage(self, user_id: int, tool_name: str): """Enregistre l'utilisation d'un outil""" try: await db_manager.execute( """UPDATE user_tool_permissions SET last_used = ?, updated_at = ? WHERE user_id = ? AND tool_name = ?""", (datetime.now(), datetime.now(), user_id, tool_name) ) # Invalider le cache pour cet utilisateur if user_id in self.permission_cache: del self.permission_cache[user_id] except Exception as e: logger.warning(f"Erreur enregistrement utilisation outil {tool_name}: {e}") async def get_tools_by_category(self) -> Dict[str, List[str]]: """RĂ©cupĂšre les outils groupĂ©s par catĂ©gorie""" try: default_perms = await self.get_default_permissions() categories = defaultdict(list) for tool_name, perm in default_perms.items(): categories[perm.tool_category].append(tool_name) # Trier les outils dans chaque catĂ©gorie for category in categories: categories[category].sort() return dict(categories) except Exception as e: logger.error(f"Erreur rĂ©cupĂ©ration outils par catĂ©gorie: {e}") return {} async def cleanup_cache(self): """Nettoie le cache des permissions expirĂ©es""" try: now = datetime.now() expired_users = [] for user_id, cache in self.permission_cache.items(): if (now - cache.last_updated).total_seconds() > cache.cache_ttl: expired_users.append(user_id) for user_id in expired_users: del self.permission_cache[user_id] # Nettoyer le cache des permissions par dĂ©faut if (now - self.last_default_update).total_seconds() > self.default_cache_ttl: self.default_permissions_cache = None self.last_default_update = datetime.min if expired_users: logger.info(f"đŸ§č Cache permissions nettoyĂ©: {len(expired_users)} utilisateurs") except Exception as e: logger.error(f"Erreur nettoyage cache permissions: {e}") # Instance globale du gestionnaire permissions_manager = PermissionsManager() # Fonctions de nettoyage async def cleanup_permissions_manager(): """Nettoie les ressources du gestionnaire de permissions""" await permissions_manager.cleanup_cache()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jonathan97480/McpHomeAssistant'

If you have feedback or need assistance with the MCP directory API, please join our Discord server