Skip to main content
Glama
test_auth.py12.7 kB
#!/usr/bin/env python3 """ Script de test pour le système d'authentification """ import asyncio import aiohttp import json import logging import sys from datetime import datetime # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration du serveur BASE_URL = "http://localhost:8080" class AuthTester: """Testeur pour l'API d'authentification""" def __init__(self): self.session = None self.access_token = None self.refresh_token = None self.user_id = None async def __aenter__(self): self.session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() async def test_health(self): """Test du health check""" logger.info("🏥 Testing health endpoint...") try: async with self.session.get(f"{BASE_URL}/health") as response: if response.status == 200: data = await response.json() logger.info(f"✅ Health check OK: {data}") return True else: logger.error(f"❌ Health check failed: {response.status}") return False except Exception as e: logger.error(f"❌ Health check error: {e}") return False async def test_register(self, username="testuser", password="TestPass123", email="test@example.com"): """Test d'inscription""" logger.info(f"📝 Testing user registration: {username}") user_data = { "username": username, "password": password, "email": email, "full_name": f"Test User {username}" } try: async with self.session.post( f"{BASE_URL}/auth/register", json=user_data ) as response: data = await response.json() if response.status == 200: self.user_id = data.get("id") logger.info(f"✅ Registration successful: {data}") return True else: logger.error(f"❌ Registration failed ({response.status}): {data}") return False except Exception as e: logger.error(f"❌ Registration error: {e}") return False async def test_login(self, username="testuser", password="TestPass123"): """Test de connexion""" logger.info(f"🔑 Testing user login: {username}") login_data = { "username": username, "password": password } try: async with self.session.post( f"{BASE_URL}/auth/login", json=login_data ) as response: data = await response.json() if response.status == 200: self.access_token = data.get("access_token") self.refresh_token = data.get("refresh_token") logger.info("✅ Login successful") logger.info(f" Token type: {data.get('token_type')}") logger.info(f" Expires in: {data.get('expires_in')} seconds") return True else: logger.error(f"❌ Login failed ({response.status}): {data}") return False except Exception as e: logger.error(f"❌ Login error: {e}") return False async def test_me(self): """Test de récupération des infos utilisateur""" logger.info("👤 Testing user info endpoint...") if not self.access_token: logger.error("❌ No access token available") return False headers = { "Authorization": f"Bearer {self.access_token}" } try: async with self.session.get( f"{BASE_URL}/auth/me", headers=headers ) as response: data = await response.json() if response.status == 200: logger.info(f"✅ User info retrieved: {data}") return True else: logger.error(f"❌ User info failed ({response.status}): {data}") return False except Exception as e: logger.error(f"❌ User info error: {e}") return False async def test_sessions(self): """Test de récupération des sessions""" logger.info("📋 Testing user sessions endpoint...") if not self.access_token: logger.error("❌ No access token available") return False headers = { "Authorization": f"Bearer {self.access_token}" } try: async with self.session.get( f"{BASE_URL}/auth/sessions", headers=headers ) as response: data = await response.json() if response.status == 200: sessions = data.get("sessions", []) logger.info(f"✅ Sessions retrieved: {len(sessions)} active sessions") for session in sessions: logger.info(f" Session: {session.get('id')} - {session.get('ip_address')}") return True else: logger.error(f"❌ Sessions failed ({response.status}): {data}") return False except Exception as e: logger.error(f"❌ Sessions error: {e}") return False async def test_refresh_token(self): """Test de rafraîchissement du token""" logger.info("🔄 Testing token refresh...") if not self.refresh_token: logger.error("❌ No refresh token available") return False try: async with self.session.post( f"{BASE_URL}/auth/refresh", params={"refresh_token": self.refresh_token} ) as response: data = await response.json() if response.status == 200: self.access_token = data.get("access_token") self.refresh_token = data.get("refresh_token") logger.info("✅ Token refresh successful") return True else: logger.error(f"❌ Token refresh failed ({response.status}): {data}") return False except Exception as e: logger.error(f"❌ Token refresh error: {e}") return False async def test_logout(self): """Test de déconnexion""" logger.info("🚪 Testing user logout...") if not self.access_token: logger.error("❌ No access token available") return False headers = { "Authorization": f"Bearer {self.access_token}" } try: async with self.session.post( f"{BASE_URL}/auth/logout", headers=headers ) as response: data = await response.json() if response.status == 200: logger.info("✅ Logout successful") self.access_token = None self.refresh_token = None return True else: logger.error(f"❌ Logout failed ({response.status}): {data}") return False except Exception as e: logger.error(f"❌ Logout error: {e}") return False async def test_invalid_credentials(self): """Test avec des identifiants invalides""" logger.info("🚫 Testing invalid credentials...") login_data = { "username": "invaliduser", "password": "wrongpassword" } try: async with self.session.post( f"{BASE_URL}/auth/login", json=login_data ) as response: if response.status == 401: logger.info("✅ Invalid credentials correctly rejected") return True else: logger.error(f"❌ Expected 401, got {response.status}") return False except Exception as e: logger.error(f"❌ Invalid credentials test error: {e}") return False async def test_unauthorized_access(self): """Test d'accès non autorisé""" logger.info("🛡️ Testing unauthorized access...") try: async with self.session.get(f"{BASE_URL}/auth/me") as response: if response.status == 401: logger.info("✅ Unauthorized access correctly blocked") return True else: logger.error(f"❌ Expected 401, got {response.status}") return False except Exception as e: logger.error(f"❌ Unauthorized access test error: {e}") return False async def run_auth_tests(): """Lance tous les tests d'authentification""" logger.info("🧪 Starting authentication tests...") async with AuthTester() as tester: results = [] # Test du health check results.append(("Health Check", await tester.test_health())) # Test d'accès non autorisé results.append(("Unauthorized Access", await tester.test_unauthorized_access())) # Test avec identifiants invalides results.append(("Invalid Credentials", await tester.test_invalid_credentials())) # Test d'inscription username = f"testuser_{datetime.now().strftime('%Y%m%d_%H%M%S')}" results.append(("Registration", await tester.test_register(username=username))) # Test de connexion results.append(("Login", await tester.test_login(username=username))) # Test des infos utilisateur results.append(("User Info", await tester.test_me())) # Test des sessions results.append(("User Sessions", await tester.test_sessions())) # Test du refresh token results.append(("Token Refresh", await tester.test_refresh_token())) # Test des infos utilisateur avec nouveau token results.append(("User Info (after refresh)", await tester.test_me())) # Test de déconnexion results.append(("Logout", await tester.test_logout())) # Résumé des résultats logger.info("\n" + "="*50) logger.info("📊 TEST RESULTS SUMMARY") logger.info("="*50) passed = 0 total = len(results) for test_name, success in results: status = "✅ PASS" if success else "❌ FAIL" logger.info(f"{status:10} {test_name}") if success: passed += 1 logger.info("="*50) logger.info(f"📈 Results: {passed}/{total} tests passed ({passed/total*100:.1f}%)") if passed == total: logger.info("🎉 All authentication tests passed!") return True else: logger.error(f"⚠️ {total-passed} tests failed") return False if __name__ == "__main__": print("🧪 HTTP-MCP Bridge Authentication Test Suite") print("=" * 50) print("Make sure the server is running on http://localhost:8000") print("=" * 50) try: success = asyncio.run(run_auth_tests()) sys.exit(0 if success else 1) except KeyboardInterrupt: print("\n❌ Tests interrupted by user") sys.exit(1) except Exception as e: print(f"\n❌ Test suite error: {e}") sys.exit(1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jonathan97480/McpHomeAssistant'

If you have feedback or need assistance with the MCP directory API, please join our Discord server