Skip to main content
Glama
test_auth.py12.2 kB
"""Unit tests for auth module.""" from datetime import UTC, datetime, timedelta from unittest.mock import patch import pytest from fastapi import HTTPException from fastapi.security import OAuth2PasswordRequestForm from yaraflux_mcp_server.auth import ( UserRole, authenticate_user, create_access_token, create_refresh_token, create_user, decode_token, delete_user, get_current_active_user, get_current_user, get_password_hash, get_user, list_users, refresh_access_token, update_user, validate_admin, verify_password, ) from yaraflux_mcp_server.models import TokenData, User def test_get_password_hash(): """Test password hashing.""" password = "testpassword" hashed = get_password_hash(password) # Verify it's not the original password assert hashed != password # Verify it's a bcrypt hash assert hashed.startswith("$2b$") def test_verify_password(): """Test password verification.""" password = "testpassword" hashed = get_password_hash(password) # Verify correct password works assert verify_password(password, hashed) # Verify incorrect password fails assert not verify_password("wrongpassword", hashed) def test_get_user_exists(): """Test getting a user that exists.""" # Create a user first username = "testuser" password = "testpass" role = UserRole.USER create_user(username=username, password=password, role=role) # Now get the user user = get_user(username) assert user is not None assert user.username == username assert user.role == role def test_get_user_not_exists(): """Test getting a user that doesn't exist.""" user = get_user("nonexistentuser") assert user is None def test_authenticate_user_success(): """Test successful user authentication.""" # Create a user first username = "authuser" password = "authpass" role = UserRole.USER create_user(username=username, password=password, role=role) # Now authenticate user = authenticate_user(username, password) assert user is not None assert user.username == username assert user.role == role def test_authenticate_user_wrong_password(): """Test user authentication with wrong password.""" # Create a user first username = "wrongpassuser" password = "correctpass" role = UserRole.USER create_user(username=username, password=password, role=role) # Now authenticate with wrong password user = authenticate_user(username, "wrongpass") assert user is None def test_authenticate_user_not_exists(): """Test authenticating a user that doesn't exist.""" user = authenticate_user("nonexistentuser", "anypassword") assert user is None def test_create_access_token(): """Test creating an access token.""" data = {"sub": "testuser", "role": UserRole.USER} token = create_access_token(data) # Token should be a non-empty string assert isinstance(token, str) assert len(token) > 0 def test_create_refresh_token(): """Test creating a refresh token.""" data = {"sub": "testuser", "role": UserRole.USER} token = create_refresh_token(data) # Token should be a non-empty string assert isinstance(token, str) assert len(token) > 0 # Decode the token and verify it contains refresh flag from jose import jwt from yaraflux_mcp_server.auth import ALGORITHM, SECRET_KEY payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) assert payload.get("refresh") is True def test_decode_token_valid(): """Test decoding a valid token.""" # Create a token data = {"sub": "testuser", "role": UserRole.USER} token = create_access_token(data) # Decode it token_data = decode_token(token) assert isinstance(token_data, TokenData) assert token_data.username == data["sub"] assert token_data.role == data["role"] @pytest.mark.asyncio @patch("yaraflux_mcp_server.auth.get_user") async def test_get_current_active_user_success(mock_get_user): """Test getting current active user with valid token.""" # Set up the mocks mock_get_user.return_value = User(username="testuser", role=UserRole.USER, disabled=False) # Create a token data = {"sub": "testuser", "role": UserRole.USER} token = create_access_token(data) # Get current user user = await get_current_user(token) assert user is not None assert user.username == "testuser" assert user.role == UserRole.USER assert not user.disabled # Test active user active_user = await get_current_active_user(user) assert active_user is not None @pytest.mark.asyncio @patch("yaraflux_mcp_server.auth.get_user") async def test_get_current_active_user_disabled(mock_get_user): """Test getting disabled user.""" # Set up the mock from yaraflux_mcp_server.models import UserInDB mock_user = UserInDB(username="disableduser", role=UserRole.USER, disabled=True, hashed_password="fakehash") mock_get_user.return_value = mock_user # Create a token data = {"sub": "disableduser", "role": UserRole.USER} token = create_access_token(data) # Get current user - this should raise an exception with pytest.raises(HTTPException) as exc_info: user = await get_current_user(token) # Check that the correct error was raised assert exc_info.value.status_code == 403 assert "disabled" in str(exc_info.value.detail).lower() @pytest.mark.asyncio @patch("yaraflux_mcp_server.auth.get_user") async def test_validate_admin_success(mock_get_user): """Test validating admin with valid token and admin role.""" # Set up the mock mock_get_user.return_value = User(username="adminuser", role=UserRole.ADMIN, disabled=False) # Create a token data = {"sub": "adminuser", "role": UserRole.ADMIN} token = create_access_token(data) # Get current user user = await get_current_user(token) # Validate admin admin_user = await validate_admin(user) assert admin_user is not None assert admin_user.username == "adminuser" assert admin_user.role == UserRole.ADMIN @pytest.mark.asyncio @patch("yaraflux_mcp_server.auth.get_user") async def test_validate_admin_not_admin(mock_get_user): """Test validating admin with non-admin role.""" # Set up the mock mock_get_user.return_value = User(username="regularuser", role=UserRole.USER, disabled=False) # Create a token data = {"sub": "regularuser", "role": UserRole.USER} token = create_access_token(data) # Get current user user = await get_current_user(token) # Validate admin should raise exception with pytest.raises(HTTPException) as exc_info: await validate_admin(user) assert exc_info.value.status_code == 403 assert "admin" in str(exc_info.value.detail).lower() def test_refresh_access_token(): """Test refreshing an access token.""" # Create a refresh token data = {"sub": "testuser", "role": UserRole.USER} refresh_token = create_refresh_token(data) # Refresh it to get an access token access_token = refresh_access_token(refresh_token) # Decode the new token token_data = decode_token(access_token) assert isinstance(token_data, TokenData) assert token_data.username == data["sub"] assert token_data.role == data["role"] # Verify it's not a refresh token by checking the raw payload from jose import jwt from yaraflux_mcp_server.auth import ALGORITHM, SECRET_KEY payload = jwt.decode(access_token, SECRET_KEY, algorithms=[ALGORITHM]) assert payload.get("refresh") is None def test_refresh_access_token_not_refresh_token(): """Test refreshing with a non-refresh token.""" # Create an access token data = {"sub": "testuser", "role": UserRole.USER} access_token = create_access_token(data) # Try to refresh it with pytest.raises(HTTPException) as exc_info: refresh_access_token(access_token) assert exc_info.value.status_code == 401 assert "refresh token" in str(exc_info.value.detail).lower() def test_refresh_access_token_expired(): """Test refreshing with an expired refresh token.""" # Create a token that's already expired data = { "sub": "testuser", "role": UserRole.USER, "refresh": True, "exp": int((datetime.now(UTC) - timedelta(minutes=5)).timestamp()), } # We need to manually create this token since the create_refresh_token function would create a valid one from jose import jwt from yaraflux_mcp_server.auth import ALGORITHM, SECRET_KEY expired_token = jwt.encode(data, SECRET_KEY, algorithm=ALGORITHM) # Try to refresh it with pytest.raises(HTTPException) as exc_info: refresh_access_token(expired_token) assert exc_info.value.status_code == 401 assert "expired" in str(exc_info.value.detail).lower() def test_update_user(): """Test updating a user.""" # Create a user first username = "updateuser" password = "updatepass" role = UserRole.USER create_user(username=username, password=password, role=role) # Update the user updated = update_user(username=username, role=UserRole.ADMIN, email="test@example.com", disabled=True) assert updated is not None assert updated.username == username assert updated.role == UserRole.ADMIN assert updated.email == "test@example.com" assert updated.disabled is True def test_update_user_not_found(): """Test updating a user that doesn't exist.""" updated = update_user(username="nonexistentuser", role=UserRole.ADMIN) assert updated is None def test_list_users(): """Test listing users.""" # Create a couple of test users create_user(username="listuser1", password="pass1", role=UserRole.USER) create_user(username="listuser2", password="pass2", role=UserRole.ADMIN) # List users users = list_users() assert isinstance(users, list) assert len(users) >= 2 # At least our two test users # Check if our test users are in the list usernames = [u.username for u in users] assert "listuser1" in usernames assert "listuser2" in usernames def test_delete_user(): """Test deleting a user.""" # Create a user first username = "deleteuser" password = "deletepass" role = UserRole.USER create_user(username=username, password=password, role=role) # Delete the user (as someone else) result = delete_user(username=username, current_username="someoneelse") assert result is True # User should no longer exist assert get_user(username) is None def test_delete_user_not_found(): """Test deleting a user that doesn't exist.""" result = delete_user(username="nonexistentuser", current_username="someoneelse") assert result is False def test_delete_user_self(): """Test deleting own account.""" # Create a user first username = "selfdeleteuser" password = "selfdeletepass" role = UserRole.USER create_user(username=username, password=password, role=role) # Try to delete self with pytest.raises(ValueError) as exc_info: delete_user(username=username, current_username=username) assert "cannot delete your own account" in str(exc_info.value).lower() # User should still exist assert get_user(username) is not None def test_delete_last_admin(): """Test deleting the last admin user.""" # Create an admin user username = "lastadmin" password = "lastadminpass" role = UserRole.ADMIN create_user(username=username, password=password, role=role) # Make sure all other admin users are deleted users = list_users() for user in users: if user.role == UserRole.ADMIN and user.username != username: delete_user(user.username, current_username="someoneelse") # Try to delete the last admin with pytest.raises(ValueError) as exc_info: delete_user(username=username, current_username="someoneelse") assert "cannot delete the last admin" in str(exc_info.value).lower() # Admin should still exist assert get_user(username) is not None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ThreatFlux/YaraFlux'

If you have feedback or need assistance with the MCP directory API, please join our Discord server