Skip to main content
Glama

MemOS-MCP

by qinshu1109
Apache 2.0
3
  • Linux
  • Apple
test_mem_user.py21.7 kB
""" Test cases for the MemOS User Management System. This module contains comprehensive test cases for testing user authentication, authorization, and cube management functionality. """ import os import tempfile import uuid from datetime import datetime from pathlib import Path import pytest from memos.mem_user.user_manager import UserManager, UserRole class TestUserManager: """Test cases for UserManager class.""" @pytest.fixture def temp_db(self): """Create a temporary database for testing.""" # Create temporary database file temp_dir = tempfile.mkdtemp() db_path = os.path.join(temp_dir, "test_memos.db") yield db_path # Cleanup - note: file cleanup is handled by user_manager fixture try: if os.path.exists(db_path): os.remove(db_path) os.rmdir(temp_dir) except (OSError, PermissionError): # On Windows, files might still be locked, ignore cleanup errors pass @pytest.fixture def user_manager(self, temp_db): """Create UserManager instance with temporary database.""" manager = UserManager(db_path=temp_db) yield manager # Ensure database connections are closed manager.close() def test_initialization(self, temp_db): """Test UserManager initialization.""" manager = UserManager(db_path=temp_db) # Check database file exists assert os.path.exists(temp_db) # Check root user is created root_user = manager.get_user("root") assert root_user is not None assert root_user.user_name == "root" assert root_user.role == UserRole.ROOT assert root_user.is_active is True def test_initialization_default_path(self, monkeypatch): """Test UserManager initialization with default path.""" # Mock settings.MEMOS_DIR temp_dir = tempfile.mkdtemp() mock_memos_dir = Path(temp_dir) class MockSettings: MEMOS_DIR = mock_memos_dir # Replace the settings import monkeypatch.setattr("memos.mem_user.user_manager.settings", MockSettings()) manager = None try: manager = UserManager() expected_path = mock_memos_dir / "memos_users.db" assert manager.db_path == str(expected_path) assert os.path.exists(expected_path) finally: # Close database connections first if manager: manager.close() # Cleanup try: expected_path = mock_memos_dir / "memos_users.db" if os.path.exists(expected_path): os.remove(expected_path) if os.path.exists(temp_dir): os.rmdir(temp_dir) except (OSError, PermissionError): # On Windows, files might still be locked, ignore cleanup errors pass class TestUserOperations: """Test cases for user operations.""" @pytest.fixture def temp_db(self): """Create a temporary database for testing.""" temp_dir = tempfile.mkdtemp() db_path = os.path.join(temp_dir, "test_memos.db") yield db_path if os.path.exists(db_path): os.remove(db_path) os.rmdir(temp_dir) @pytest.fixture def user_manager(self, temp_db): """Create UserManager instance with temporary database.""" manager = UserManager(db_path=temp_db) yield manager manager.close() def test_create_user(self, user_manager): """Test user creation.""" user_id = user_manager.create_user("test_user", UserRole.USER) assert user_id is not None assert isinstance(user_id, str) # Verify user exists user = user_manager.get_user(user_id) assert user is not None assert user.user_name == "test_user" assert user.role == UserRole.USER assert user.is_active is True def test_create_user_with_custom_id(self, user_manager): """Test user creation with custom ID.""" custom_id = "custom_user_123" user_id = user_manager.create_user("custom_user", UserRole.ADMIN, custom_id) assert user_id == custom_id user = user_manager.get_user(custom_id) assert user is not None assert user.user_id == custom_id assert user.user_name == "custom_user" assert user.role == UserRole.ADMIN def test_create_duplicate_user(self, user_manager): """Test creating user with duplicate name.""" # Create first user user_id1 = user_manager.create_user("duplicate_user", UserRole.USER) # Try to create user with same name user_id2 = user_manager.create_user("duplicate_user", UserRole.ADMIN) # Should return existing user ID assert user_id1 == user_id2 # Verify only one user exists user = user_manager.get_user(user_id1) assert user.role == UserRole.USER # Original role preserved def test_get_user_by_name(self, user_manager): """Test getting user by name.""" user_id = user_manager.create_user("named_user", UserRole.USER) user = user_manager.get_user_by_name("named_user") assert user is not None assert user.user_id == user_id assert user.user_name == "named_user" # Test non-existent user non_existent = user_manager.get_user_by_name("non_existent") assert non_existent is None def test_validate_user(self, user_manager): """Test user validation.""" user_id = user_manager.create_user("valid_user", UserRole.USER) # Valid user assert user_manager.validate_user(user_id) is True # Non-existent user assert user_manager.validate_user("non_existent") is False # Deactivated user user_manager.delete_user(user_id) assert user_manager.validate_user(user_id) is False def test_list_users(self, user_manager): """Test listing users.""" # Create multiple users user_manager.create_user("user1", UserRole.USER) user_manager.create_user("user2", UserRole.ADMIN) user_id3 = user_manager.create_user("user3", UserRole.GUEST) users = user_manager.list_users() # Should include root user + 3 created users assert len(users) == 4 user_names = [user.user_name for user in users] assert "root" in user_names assert "user1" in user_names assert "user2" in user_names assert "user3" in user_names # Deactivate one user user_manager.delete_user(user_id3) active_users = user_manager.list_users() active_names = [user.user_name for user in active_users] assert len(active_users) == 3 assert "user3" not in active_names def test_delete_user(self, user_manager): """Test user deletion (soft delete).""" user_id = user_manager.create_user("delete_user", UserRole.USER) # Verify user exists and is active assert user_manager.validate_user(user_id) is True # Delete user result = user_manager.delete_user(user_id) assert result is True # Verify user is deactivated assert user_manager.validate_user(user_id) is False # User still exists but is inactive user = user_manager.get_user(user_id) assert user is not None assert user.is_active is False def test_delete_root_user(self, user_manager): """Test that root user cannot be deleted.""" result = user_manager.delete_user("root") assert result is False # Root user should still be active assert user_manager.validate_user("root") is True def test_delete_nonexistent_user(self, user_manager): """Test deleting non-existent user.""" result = user_manager.delete_user("non_existent") assert result is False class TestCubeOperations: """Test cases for cube operations.""" @pytest.fixture def temp_db(self): """Create a temporary database for testing.""" temp_dir = tempfile.mkdtemp() db_path = os.path.join(temp_dir, "test_memos.db") yield db_path if os.path.exists(db_path): os.remove(db_path) os.rmdir(temp_dir) @pytest.fixture def user_manager(self, temp_db): """Create UserManager instance with temporary database.""" manager = UserManager(db_path=temp_db) yield manager manager.close() def test_create_cube(self, user_manager): """Test cube creation.""" # Create owner user owner_id = user_manager.create_user("cube_owner", UserRole.USER) # Create cube cube_id = user_manager.create_cube("test_cube", owner_id) assert cube_id is not None assert isinstance(cube_id, str) # Verify cube exists cube = user_manager.get_cube(cube_id) assert cube is not None assert cube.cube_name == "test_cube" assert cube.owner_id == owner_id assert cube.is_active is True def test_create_cube_with_path_and_custom_id(self, user_manager): """Test cube creation with path and custom ID.""" owner_id = user_manager.create_user("cube_owner", UserRole.USER) custom_cube_id = "custom_cube_123" cube_path = str(Path("/path/to/cube")) # Use pathlib for cross-platform path handling cube_id = user_manager.create_cube( "custom_cube", owner_id, cube_path=cube_path, cube_id=custom_cube_id ) assert cube_id == custom_cube_id cube = user_manager.get_cube(custom_cube_id) assert cube is not None assert cube.cube_id == custom_cube_id assert cube.cube_name == "custom_cube" assert cube.cube_path == cube_path assert cube.owner_id == owner_id def test_create_cube_invalid_owner(self, user_manager): """Test cube creation with invalid owner.""" with pytest.raises(ValueError, match="does not exist"): user_manager.create_cube("test_cube", "non_existent_owner") def test_validate_user_cube_access(self, user_manager): """Test user cube access validation.""" # Create users owner_id = user_manager.create_user("owner", UserRole.USER) user_id = user_manager.create_user("user", UserRole.USER) # Create cube cube_id = user_manager.create_cube("test_cube", owner_id) # Owner should have access assert user_manager.validate_user_cube_access(owner_id, cube_id) is True # Other user should not have access initially assert user_manager.validate_user_cube_access(user_id, cube_id) is False # Add user to cube user_manager.add_user_to_cube(user_id, cube_id) assert user_manager.validate_user_cube_access(user_id, cube_id) is True # Non-existent user should not have access assert user_manager.validate_user_cube_access("non_existent", cube_id) is False # Non-existent cube should not be accessible assert user_manager.validate_user_cube_access(owner_id, "non_existent") is False def test_get_user_cubes(self, user_manager): """Test getting user's accessible cubes.""" # Create users owner_id = user_manager.create_user("owner", UserRole.USER) user_id = user_manager.create_user("user", UserRole.USER) # Create cubes cube_id1 = user_manager.create_cube("cube1", owner_id) cube_id2 = user_manager.create_cube("cube2", owner_id) cube_id3 = user_manager.create_cube("cube3", user_id) # Add user to cube1 user_manager.add_user_to_cube(user_id, cube_id1) # Get cubes accessible by user user_cubes = user_manager.get_user_cubes(user_id) cube_ids = [cube.cube_id for cube in user_cubes] assert len(user_cubes) == 2 assert cube_id1 in cube_ids # Added to cube assert cube_id3 in cube_ids # Owned cube assert cube_id2 not in cube_ids # No access # Get cubes accessible by owner owner_cubes = user_manager.get_user_cubes(owner_id) owner_cube_ids = [cube.cube_id for cube in owner_cubes] assert len(owner_cubes) == 2 assert cube_id1 in owner_cube_ids assert cube_id2 in owner_cube_ids assert cube_id3 not in owner_cube_ids def test_add_user_to_cube(self, user_manager): """Test adding user to cube.""" # Create users and cube owner_id = user_manager.create_user("owner", UserRole.USER) user_id = user_manager.create_user("user", UserRole.USER) cube_id = user_manager.create_cube("test_cube", owner_id) # Add user to cube result = user_manager.add_user_to_cube(user_id, cube_id) assert result is True # Verify access assert user_manager.validate_user_cube_access(user_id, cube_id) is True # Adding same user again should still work result = user_manager.add_user_to_cube(user_id, cube_id) assert result is True # Adding non-existent user should fail result = user_manager.add_user_to_cube("non_existent", cube_id) assert result is False # Adding user to non-existent cube should fail result = user_manager.add_user_to_cube(user_id, "non_existent") assert result is False def test_remove_user_from_cube(self, user_manager): """Test removing user from cube.""" # Create users and cube owner_id = user_manager.create_user("owner", UserRole.USER) user_id = user_manager.create_user("user", UserRole.USER) cube_id = user_manager.create_cube("test_cube", owner_id) # Add and then remove user user_manager.add_user_to_cube(user_id, cube_id) assert user_manager.validate_user_cube_access(user_id, cube_id) is True result = user_manager.remove_user_from_cube(user_id, cube_id) assert result is True assert user_manager.validate_user_cube_access(user_id, cube_id) is False # Cannot remove owner result = user_manager.remove_user_from_cube(owner_id, cube_id) assert result is False assert user_manager.validate_user_cube_access(owner_id, cube_id) is True # Removing non-existent user should fail result = user_manager.remove_user_from_cube("non_existent", cube_id) assert result is False def test_delete_cube(self, user_manager): """Test cube deletion (soft delete).""" owner_id = user_manager.create_user("owner", UserRole.USER) cube_id = user_manager.create_cube("test_cube", owner_id) # Verify cube is active cube = user_manager.get_cube(cube_id) assert cube.is_active is True # Delete cube result = user_manager.delete_cube(cube_id) assert result is True # Verify cube is deactivated cube = user_manager.get_cube(cube_id) assert cube.is_active is False # Should not have access to deactivated cube assert user_manager.validate_user_cube_access(owner_id, cube_id) is False def test_delete_nonexistent_cube(self, user_manager): """Test deleting non-existent cube.""" result = user_manager.delete_cube("non_existent") assert result is False class TestUserRoles: """Test cases for user roles and permissions.""" @pytest.fixture def temp_db(self): """Create a temporary database for testing.""" temp_dir = tempfile.mkdtemp() db_path = os.path.join(temp_dir, "test_memos.db") yield db_path if os.path.exists(db_path): os.remove(db_path) os.rmdir(temp_dir) @pytest.fixture def user_manager(self, temp_db): """Create UserManager instance with temporary database.""" manager = UserManager(db_path=temp_db) yield manager manager.close() def test_user_roles(self, user_manager): """Test different user roles.""" # Test all user roles admin_id = user_manager.create_user("admin", UserRole.ADMIN) user_id = user_manager.create_user("user", UserRole.USER) guest_id = user_manager.create_user("guest", UserRole.GUEST) admin = user_manager.get_user(admin_id) user = user_manager.get_user(user_id) guest = user_manager.get_user(guest_id) root = user_manager.get_user("root") assert admin.role == UserRole.ADMIN assert user.role == UserRole.USER assert guest.role == UserRole.GUEST assert root.role == UserRole.ROOT def test_root_user_protection(self, user_manager): """Test root user cannot be deleted.""" # Root user should exist root = user_manager.get_user("root") assert root is not None assert root.role == UserRole.ROOT # Cannot delete root user result = user_manager.delete_user("root") assert result is False # Root user should still be active assert user_manager.validate_user("root") is True class TestDatabaseIntegrity: """Test cases for database integrity and edge cases.""" @pytest.fixture def temp_db(self): """Create a temporary database for testing.""" temp_dir = tempfile.mkdtemp() db_path = os.path.join(temp_dir, "test_memos.db") yield db_path if os.path.exists(db_path): os.remove(db_path) os.rmdir(temp_dir) @pytest.fixture def user_manager(self, temp_db): """Create UserManager instance with temporary database.""" manager = UserManager(db_path=temp_db) yield manager manager.close() def test_cascade_delete_user_cubes(self, user_manager): """Test that user's owned cubes are handled when user is deleted.""" # Create user and cube owner_id = user_manager.create_user("owner", UserRole.USER) cube_id = user_manager.create_cube("test_cube", owner_id) # Verify relationships assert user_manager.validate_user_cube_access(owner_id, cube_id) is True # Delete user (soft delete) user_manager.delete_user(owner_id) # User should be deactivated assert user_manager.validate_user(owner_id) is False # Cube should still exist but user shouldn't have access cube = user_manager.get_cube(cube_id) assert cube is not None assert user_manager.validate_user_cube_access(owner_id, cube_id) is False def test_timestamps(self, user_manager): """Test that timestamps are properly set.""" # Create user user_id = user_manager.create_user("timestamp_user", UserRole.USER) user = user_manager.get_user(user_id) assert user.created_at is not None assert user.updated_at is not None assert isinstance(user.created_at, datetime) assert isinstance(user.updated_at, datetime) # Create cube cube_id = user_manager.create_cube("timestamp_cube", user_id) cube = user_manager.get_cube(cube_id) assert cube.created_at is not None assert cube.updated_at is not None assert isinstance(cube.created_at, datetime) assert isinstance(cube.updated_at, datetime) def test_uuid_generation(self, user_manager): """Test UUID generation for IDs.""" # Create user without custom ID user_id = user_manager.create_user("uuid_user", UserRole.USER) # Should be valid UUID format try: uuid.UUID(user_id) except ValueError: pytest.fail(f"Generated user_id '{user_id}' is not a valid UUID") # Create cube without custom ID cube_id = user_manager.create_cube("uuid_cube", user_id) try: uuid.UUID(cube_id) except ValueError: pytest.fail(f"Generated cube_id '{cube_id}' is not a valid UUID") def test_session_management(self, user_manager): """Test that database sessions are properly managed.""" # This test ensures that sessions are properly closed # by performing multiple operations users = [] cubes = [] # Create multiple users and cubes for i in range(10): user_id = user_manager.create_user(f"user_{i}", UserRole.USER) users.append(user_id) cube_id = user_manager.create_cube(f"cube_{i}", user_id) cubes.append(cube_id) # Verify all users exist for user_id in users: assert user_manager.validate_user(user_id) is True # Verify all cubes exist for cube_id in cubes: cube = user_manager.get_cube(cube_id) assert cube is not None assert cube.is_active is True # Clean up - delete some users and cubes for i in range(0, 10, 2): # Delete every other user/cube user_manager.delete_user(users[i]) user_manager.delete_cube(cubes[i]) # Verify deletions for i in range(10): user_active = user_manager.validate_user(users[i]) cube = user_manager.get_cube(cubes[i]) if i % 2 == 0: # Deleted users/cubes assert user_active is False assert cube.is_active is False else: # Active users/cubes assert user_active is True assert cube.is_active is True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server