Skip to main content
Glama
test_permissions.py15.6 kB
#!/usr/bin/env python3 """ 🔐 Tests pour le système de permissions granulaires MCP Tests unitaires et d'intégration pour les permissions par outil """ import asyncio import pytest import aiohttp import json from typing import Dict, Any, List import time # URL de base du serveur de test BASE_URL = "http://localhost:8080" class PermissionsTestSuite: """Suite de tests pour le système de permissions""" def __init__(self): self.base_url = BASE_URL self.admin_token = None self.user_token = None self.admin_user_id = None self.test_user_id = None async def setup_test_users(self): """Initialise les utilisateurs de test""" print("🔧 Configuration des utilisateurs de test...") async with aiohttp.ClientSession() as session: # Login admin par défaut admin_login = { "username": "admin", "password": "Admin123!" } try: async with session.post(f"{self.base_url}/auth/login", json=admin_login) as resp: if resp.status == 200: data = await resp.json() self.admin_token = data['access_token'] print("✅ Admin connecté") else: print("❌ Échec connexion admin") return False except Exception as e: print(f"❌ Erreur connexion admin: {e}") return False # Créer un utilisateur de test headers = {"Authorization": f"Bearer {self.admin_token}"} test_user = { "username": "testuser", "email": "test@example.com", "full_name": "Test User", "password": "TestPass123!" } try: async with session.post(f"{self.base_url}/auth/register", json=test_user, headers=headers) as resp: if resp.status == 200: data = await resp.json() self.test_user_id = data['id'] print("✅ Utilisateur de test créé") else: print("❌ Échec création utilisateur de test") return False except Exception as e: print(f"❌ Erreur création utilisateur: {e}") return False # Login utilisateur de test user_login = { "username": "testuser", "password": "TestPass123!" } try: async with session.post(f"{self.base_url}/auth/login", json=user_login) as resp: if resp.status == 200: data = await resp.json() self.user_token = data['access_token'] print("✅ Utilisateur de test connecté") return True else: print("❌ Échec connexion utilisateur de test") return False except Exception as e: print(f"❌ Erreur connexion utilisateur de test: {e}") return False async def test_permission_validation(self): """Test de validation des permissions individuelles""" print("\n🔍 Test: Validation des permissions individuelles") async with aiohttp.ClientSession() as session: headers = {"Authorization": f"Bearer {self.user_token}"} # Test permission refusée (pas de permissions par défaut) permission_request = { "tool_name": "homeassistant.get_state", "permission_type": "READ" } async with session.post(f"{self.base_url}/permissions/validate", json=permission_request, headers=headers) as resp: data = await resp.json() if resp.status == 403: print("✅ Permission correctement refusée") return True else: print(f"❌ Statut inattendu: {resp.status}, data: {data}") return False async def test_default_permissions(self): """Test de gestion des permissions par défaut""" print("\n🔧 Test: Gestion des permissions par défaut") async with aiohttp.ClientSession() as session: admin_headers = {"Authorization": f"Bearer {self.admin_token}"} # Définir une permission par défaut default_permission = { "tool_name": "homeassistant.get_state", "can_read": True, "can_write": False, "can_execute": False } async with session.put(f"{self.base_url}/permissions/defaults", json=default_permission, headers=admin_headers) as resp: if resp.status == 200: print("✅ Permission par défaut définie") else: data = await resp.json() print(f"❌ Échec définition permission par défaut: {resp.status}, {data}") return False # Vérifier que l'utilisateur hérite de la permission time.sleep(1) # Laisser le temps au cache de se rafraîchir user_headers = {"Authorization": f"Bearer {self.user_token}"} permission_request = { "tool_name": "homeassistant.get_state", "permission_type": "READ" } async with session.post(f"{self.base_url}/permissions/validate", json=permission_request, headers=user_headers) as resp: if resp.status == 200: data = await resp.json() if data.get('granted'): print("✅ Utilisateur hérite correctement de la permission par défaut") return True else: print("❌ Permission refusée malgré héritage") return False else: data = await resp.json() print(f"❌ Échec validation héritage: {resp.status}, {data}") return False async def test_user_specific_permissions(self): """Test de permissions spécifiques utilisateur""" print("\n👤 Test: Permissions spécifiques utilisateur") async with aiohttp.ClientSession() as session: admin_headers = {"Authorization": f"Bearer {self.admin_token}"} # Donner une permission spécifique à l'utilisateur user_permission = { "tool_name": "homeassistant.call_service", "can_read": False, "can_write": True, "can_execute": True } async with session.put(f"{self.base_url}/permissions/user/{self.test_user_id}", json=user_permission, headers=admin_headers) as resp: if resp.status == 200: print("✅ Permission utilisateur définie") else: data = await resp.json() print(f"❌ Échec définition permission utilisateur: {resp.status}, {data}") return False # Vérifier la permission WRITE time.sleep(1) user_headers = {"Authorization": f"Bearer {self.user_token}"} permission_request = { "tool_name": "homeassistant.call_service", "permission_type": "WRITE" } async with session.post(f"{self.base_url}/permissions/validate", json=permission_request, headers=user_headers) as resp: if resp.status == 200: data = await resp.json() if data.get('granted'): print("✅ Permission utilisateur WRITE accordée") else: print("❌ Permission WRITE refusée") return False else: data = await resp.json() print(f"❌ Échec validation permission utilisateur: {resp.status}, {data}") return False # Vérifier que READ est refusée permission_request['permission_type'] = 'READ' async with session.post(f"{self.base_url}/permissions/validate", json=permission_request, headers=user_headers) as resp: if resp.status == 403: print("✅ Permission utilisateur READ correctement refusée") return True else: data = await resp.json() print(f"❌ Permission READ accordée à tort: {resp.status}, {data}") return False async def test_bulk_permissions(self): """Test de validation en lot des permissions""" print("\n📦 Test: Validation en lot des permissions") async with aiohttp.ClientSession() as session: user_headers = {"Authorization": f"Bearer {self.user_token}"} # Test plusieurs permissions à la fois bulk_request = { "permissions": [ {"tool_name": "homeassistant.get_state", "permission_type": "READ"}, {"tool_name": "homeassistant.call_service", "permission_type": "WRITE"}, {"tool_name": "homeassistant.unknown_tool", "permission_type": "READ"} ] } async with session.post(f"{self.base_url}/permissions/validate/bulk", json=bulk_request, headers=user_headers) as resp: if resp.status == 200: data = await resp.json() results = data.get('results', []) if len(results) == 2: # unknown_tool devrait être ignoré s'il n'a pas de permissions print("✅ Validation en lot réussie") return True else: print(f"❌ Résultats inattendus: {results}") return False elif resp.status == 403: print("✅ Au moins une permission refusée en lot (comportement attendu)") return True else: data = await resp.json() print(f"❌ Échec validation en lot: {resp.status}, {data}") return False async def test_permissions_summary(self): """Test de résumé des permissions utilisateur""" print("\n📊 Test: Résumé des permissions utilisateur") async with aiohttp.ClientSession() as session: user_headers = {"Authorization": f"Bearer {self.user_token}"} async with session.get(f"{self.base_url}/permissions/me", headers=user_headers) as resp: if resp.status == 200: data = await resp.json() summary = data.get('data', {}) tools = summary.get('tools', {}) # Vérifier les outils configurés expected_tools = ['homeassistant.get_state', 'homeassistant.call_service'] found_tools = list(tools.keys()) if all(tool in found_tools for tool in expected_tools): print("✅ Résumé des permissions correct") print(f" 📋 Outils trouvés: {found_tools}") return True else: print(f"❌ Outils manquants: attendus {expected_tools}, trouvés {found_tools}") return False else: data = await resp.json() print(f"❌ Échec obtention résumé: {resp.status}, {data}") return False async def test_admin_permissions_management(self): """Test de gestion des permissions par admin""" print("\n🔑 Test: Gestion des permissions par admin") async with aiohttp.ClientSession() as session: admin_headers = {"Authorization": f"Bearer {self.admin_token}"} # Consulter les permissions d'un utilisateur async with session.get(f"{self.base_url}/permissions/user/{self.test_user_id}", headers=admin_headers) as resp: if resp.status == 200: data = await resp.json() print("✅ Admin peut consulter les permissions utilisateur") else: data = await resp.json() print(f"❌ Échec consultation permissions: {resp.status}, {data}") return False # Consulter les permissions par défaut async with session.get(f"{self.base_url}/permissions/defaults", headers=admin_headers) as resp: if resp.status == 200: data = await resp.json() print("✅ Admin peut consulter les permissions par défaut") return True else: data = await resp.json() print(f"❌ Échec consultation permissions par défaut: {resp.status}, {data}") return False async def run_all_tests(self): """Lance tous les tests de permissions""" print("🧪 Démarrage de la suite de tests des permissions\n") # Configuration if not await self.setup_test_users(): print("❌ Échec de configuration des utilisateurs de test") return False # Tests tests = [ ("Validation permissions individuelles", self.test_permission_validation), ("Permissions par défaut", self.test_default_permissions), ("Permissions spécifiques utilisateur", self.test_user_specific_permissions), ("Validation en lot", self.test_bulk_permissions), ("Résumé permissions", self.test_permissions_summary), ("Gestion admin", self.test_admin_permissions_management) ] passed = 0 total = len(tests) for test_name, test_func in tests: try: result = await test_func() if result: passed += 1 print(f"✅ {test_name}: PASSÉ") else: print(f"❌ {test_name}: ÉCHOUÉ") except Exception as e: print(f"❌ {test_name}: ERREUR - {e}") print(f"\n📊 Résultats: {passed}/{total} tests passés") return passed == total async def main(): """Fonction principale""" test_suite = PermissionsTestSuite() success = await test_suite.run_all_tests() if success: print("🎉 Tous les tests de permissions ont réussi!") return 0 else: print("💥 Certains tests ont échoué") return 1 if __name__ == "__main__": import sys result = asyncio.run(main()) sys.exit(result)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jonathan97480/McpHomeAssistant'

If you have feedback or need assistance with the MCP directory API, please join our Discord server