Skip to main content
Glama
permissions_middleware.py12.2 kB
""" Middleware de validation des permissions pour les outils MCP. Valide les permissions granulaires avant l'exécution d'outils MCP. """ from fastapi import HTTPException, Request, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from typing import Optional, Dict, Any, List import logging import json from datetime import datetime from permissions_manager import PermissionsManager, PermissionType from auth_manager import auth_manager logger = logging.getLogger(__name__) async def get_current_user_from_token(token: str) -> Optional[Dict[str, Any]]: """ Obtient les données utilisateur depuis un token JWT. Args: token: Token JWT Returns: Dict avec user_id, username, role ou None si invalide """ try: token_data = auth_manager.verify_token(token) if not token_data: return None user = await auth_manager.get_user_by_id(token_data.user_id) if not user: return None return { 'user_id': user.id, 'username': user.username, 'role': user.role.value, 'email': user.email, 'full_name': user.full_name } except Exception as e: logger.error(f"Erreur décodage token: {e}") return None class PermissionsMiddleware: """Middleware pour la validation des permissions MCP.""" def __init__(self): self.permissions_manager = PermissionsManager() self.security = HTTPBearer(auto_error=False) async def validate_mcp_permission( self, request: Request, tool_name: str, permission_type: PermissionType, credentials: Optional[HTTPAuthorizationCredentials] = None ) -> Dict[str, Any]: """ Valide les permissions pour un outil MCP spécifique. Args: request: Requête FastAPI tool_name: Nom de l'outil MCP permission_type: Type de permission (READ/WRITE/EXECUTE) credentials: Credentials d'autorisation Returns: Dict contenant user_id et les informations de validation Raises: HTTPException: Si permission refusée """ try: # Extraire le token if not credentials: credentials = await self.security(request) if not credentials: logger.warning(f"Accès refusé - pas de token pour outil {tool_name}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token d'autorisation requis" ) # Obtenir l'utilisateur depuis le token user_data = await get_current_user_from_token(credentials.credentials) if not user_data: logger.warning(f"Token invalide pour outil {tool_name}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide" ) user_id = user_data.get('user_id') if not user_id: logger.error(f"User ID manquant dans token pour outil {tool_name}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User ID invalide" ) # Vérifier les permissions has_permission = await self.permissions_manager.check_permission( user_id=user_id, tool_name=tool_name, permission_type=permission_type ) if not has_permission: logger.warning( f"Permission refusée - User {user_id}, outil {tool_name}, " f"type {permission_type.value}" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permission {permission_type.value} refusée pour l'outil {tool_name}" ) logger.info( f"Permission accordée - User {user_id}, outil {tool_name}, " f"type {permission_type.value}" ) return { 'user_id': user_id, 'username': user_data.get('username'), 'tool_name': tool_name, 'permission_type': permission_type.value, 'timestamp': datetime.utcnow().isoformat() } except HTTPException: raise except Exception as e: logger.error(f"Erreur validation permission: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Erreur interne de validation des permissions" ) async def validate_bulk_permissions( self, request: Request, tool_permissions: List[Dict[str, str]], credentials: Optional[HTTPAuthorizationCredentials] = None ) -> Dict[str, Any]: """ Valide plusieurs permissions en une seule fois. Args: request: Requête FastAPI tool_permissions: Liste de dict avec tool_name et permission_type credentials: Credentials d'autorisation Returns: Dict avec user_id et résultats de validation Raises: HTTPException: Si au moins une permission est refusée """ try: # Extraire le token if not credentials: credentials = await self.security(request) if not credentials: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token d'autorisation requis" ) # Obtenir l'utilisateur user_data = await get_current_user_from_token(credentials.credentials) if not user_data: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide" ) user_id = user_data.get('user_id') if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User ID invalide" ) # Préparer les vérifications check_list = [] for tool_perm in tool_permissions: tool_name = tool_perm.get('tool_name') perm_type_str = tool_perm.get('permission_type', 'READ') if not tool_name: continue try: permission_type = PermissionType(perm_type_str.upper()) check_list.append((tool_name, permission_type)) except ValueError: logger.warning(f"Type de permission invalide: {perm_type_str}") continue # Vérifier toutes les permissions results = [] denied_permissions = [] for tool_name, permission_type in check_list: has_permission = await self.permissions_manager.check_permission( user_id=user_id, tool_name=tool_name, permission_type=permission_type ) result = { 'tool_name': tool_name, 'permission_type': permission_type.value, 'granted': has_permission } results.append(result) if not has_permission: denied_permissions.append(f"{tool_name}:{permission_type.value}") # Si des permissions sont refusées, lever une exception if denied_permissions: logger.warning( f"Permissions refusées pour User {user_id}: {', '.join(denied_permissions)}" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permissions refusées: {', '.join(denied_permissions)}" ) logger.info(f"Toutes les permissions accordées pour User {user_id}") return { 'user_id': user_id, 'username': user_data.get('username'), 'results': results, 'timestamp': datetime.utcnow().isoformat() } except HTTPException: raise except Exception as e: logger.error(f"Erreur validation permissions bulk: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Erreur interne de validation des permissions" ) async def log_permission_usage( self, user_id: int, tool_name: str, permission_type: PermissionType, success: bool = True, details: Optional[str] = None ): """ Log l'utilisation des permissions pour audit. Args: user_id: ID de l'utilisateur tool_name: Nom de l'outil MCP permission_type: Type de permission success: Si l'opération a réussi details: Détails additionnels """ try: log_entry = { 'timestamp': datetime.utcnow().isoformat(), 'user_id': user_id, 'tool_name': tool_name, 'permission_type': permission_type.value, 'success': success, 'details': details } logger.info(f"Permission usage: {json.dumps(log_entry)}") except Exception as e: logger.error(f"Erreur lors du log des permissions: {e}") async def get_user_permissions_summary(self, user_id: int) -> Dict[str, Any]: """ Obtient un résumé des permissions d'un utilisateur. Args: user_id: ID de l'utilisateur Returns: Dict avec résumé des permissions """ try: # Obtenir toutes les permissions de l'utilisateur user_permissions = await self.permissions_manager.get_user_permissions(user_id) # Organiser par outil tools_summary = {} for perm in user_permissions: tool_name = perm.tool_name if tool_name not in tools_summary: tools_summary[tool_name] = { 'read': False, 'write': False, 'execute': False } if perm.can_read: tools_summary[tool_name]['read'] = True if perm.can_write: tools_summary[tool_name]['write'] = True if perm.can_execute: tools_summary[tool_name]['execute'] = True return { 'user_id': user_id, 'total_tools': len(tools_summary), 'tools': tools_summary, 'timestamp': datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Erreur obtention résumé permissions: {e}") return { 'user_id': user_id, 'total_tools': 0, 'tools': {}, 'error': str(e), 'timestamp': datetime.utcnow().isoformat() } # Instance globale du middleware permissions_middleware = PermissionsMiddleware()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jonathan97480/McpHomeAssistant'

If you have feedback or need assistance with the MCP directory API, please join our Discord server