Skip to main content
Glama
database.py15.6 kB
""" Database initialization and schema management for selfmemory-core server. This module handles: - MongoDB connection management - Creation of new collections for memory sharing functionality - Index creation for optimal query performance - Database schema validation Following Uncle Bob's clean code principles: - Single Responsibility: Each function has one clear purpose - No mock data or fallback mechanisms - Clear error handling without hiding failures """ import logging from pymongo import ASCENDING, IndexModel, MongoClient from pymongo.errors import CollectionInvalid, OperationFailure from .config import config from .utils.datetime_helpers import utc_now, utc_to_iso logger = logging.getLogger(__name__) # MongoDB client and database connection # Initialized on module import mongo_client = MongoClient( config.database.URI, serverSelectionTimeoutMS=config.database.TIMEOUT * 1000, maxPoolSize=config.database.MAX_POOL_SIZE, retryWrites=config.database.RETRY_WRITES, w=config.database.WRITE_CONCERN, ) # Get database instance mongo_db = mongo_client.get_default_database() logger.info(f"✅ Connected to MongoDB: {config.database.URI}") def create_organization_members_collection(db) -> bool: """ Create organization_members collection with proper indexes. Purpose: Track which users belong to which organizations with their roles. Fields: - organizationId: ObjectId (reference to organizations) - userId: ObjectId (reference to users) - role: String enum ["owner", "admin", "member"] - invitedBy: ObjectId (reference to users) - joinedAt: Date - status: String enum ["active"] Indexes: - Compound unique: {organizationId: 1, userId: 1} - Single: {userId: 1} Args: db: MongoDB database instance Returns: bool: True if collection created successfully Raises: OperationFailure: If index creation fails """ collection_name = "organization_members" try: # Create collection if it doesn't exist if collection_name not in db.list_collection_names(): db.create_collection(collection_name) logger.info(f"✅ Created collection: {collection_name}") else: logger.info(f"ℹ️ Collection already exists: {collection_name}") # Define indexes indexes = [ IndexModel( [("organizationId", ASCENDING), ("userId", ASCENDING)], unique=True, name="org_user_unique", ), IndexModel([("userId", ASCENDING)], name="user_idx"), ] # Create indexes collection = db[collection_name] collection.create_indexes(indexes) logger.info(f"✅ Created indexes for {collection_name}") return True except CollectionInvalid as e: logger.error(f"❌ Failed to create collection {collection_name}: {e}") raise except OperationFailure as e: logger.error(f"❌ Failed to create indexes for {collection_name}: {e}") raise def create_project_members_collection(db) -> bool: """ Create project_members collection with proper indexes. Purpose: Track which users have access to which projects and their roles. Fields: - projectId: ObjectId (reference to projects) - userId: ObjectId (reference to users) - organizationId: ObjectId (for quick org-level queries) - role: String enum ["admin", "editor", "viewer"] - permissions: Object - canRead: Boolean - canWrite: Boolean - canDelete: Boolean - canInvite: Boolean - addedBy: ObjectId (reference to users) - addedAt: Date Role Permissions Mapping: - Admin: { canRead: true, canWrite: true, canDelete: true, canInvite: true } - Editor: { canRead: true, canWrite: true, canDelete: true, canInvite: false } - Viewer: { canRead: true, canWrite: false, canDelete: false, canInvite: false } Indexes: - Compound unique: {projectId: 1, userId: 1} - Compound: {userId: 1, organizationId: 1} - Single: {projectId: 1} Args: db: MongoDB database instance Returns: bool: True if collection created successfully Raises: OperationFailure: If index creation fails """ collection_name = "project_members" try: # Create collection if it doesn't exist if collection_name not in db.list_collection_names(): db.create_collection(collection_name) logger.info(f"✅ Created collection: {collection_name}") else: logger.info(f"ℹ️ Collection already exists: {collection_name}") # Define indexes indexes = [ IndexModel( [("projectId", ASCENDING), ("userId", ASCENDING)], unique=True, name="project_user_unique", ), IndexModel( [("userId", ASCENDING), ("organizationId", ASCENDING)], name="user_org_idx", ), IndexModel([("projectId", ASCENDING)], name="project_idx"), ] # Create indexes collection = db[collection_name] collection.create_indexes(indexes) logger.info(f"✅ Created indexes for {collection_name}") return True except CollectionInvalid as e: logger.error(f"❌ Failed to create collection {collection_name}: {e}") raise except OperationFailure as e: logger.error(f"❌ Failed to create indexes for {collection_name}: {e}") raise def create_invitations_collection(db) -> bool: """ Create invitations collection with proper indexes. Purpose: Manage email invitations to organizations and projects. Fields: - email: String (email address being invited) - organizationId: ObjectId (reference to organizations) - projectId: ObjectId | null (null = org-only invite, value = project invite) - invitedBy: ObjectId (reference to users) - role: String enum ["admin", "editor", "viewer"] - token: String (secure random token for invitation link) - status: String enum ["pending", "accepted", "expired"] - expiresAt: Date (24 hours from creation) - createdAt: Date Indexes: - Single unique: {token: 1} - Compound: {email: 1, status: 1} - Single: {expiresAt: 1} (for cleanup) Args: db: MongoDB database instance Returns: bool: True if collection created successfully Raises: OperationFailure: If index creation fails """ collection_name = "invitations" try: # Create collection if it doesn't exist if collection_name not in db.list_collection_names(): db.create_collection(collection_name) logger.info(f"✅ Created collection: {collection_name}") else: logger.info(f"ℹ️ Collection already exists: {collection_name}") # Define indexes indexes = [ IndexModel([("token", ASCENDING)], unique=True, name="token_unique"), IndexModel( [("email", ASCENDING), ("status", ASCENDING)], name="email_status_idx" ), IndexModel([("expiresAt", ASCENDING)], name="expiration_idx"), ] # Create indexes collection = db[collection_name] collection.create_indexes(indexes) logger.info(f"✅ Created indexes for {collection_name}") return True except CollectionInvalid as e: logger.error(f"❌ Failed to create collection {collection_name}: {e}") raise except OperationFailure as e: logger.error(f"❌ Failed to create indexes for {collection_name}: {e}") raise def create_notifications_collection(db) -> bool: """ Create notifications collection with proper indexes. Purpose: Store user notifications (invitation acceptances, etc.) Fields: - type: String (notification type identifier) - userId: ObjectId (recipient) - relatedUserId: ObjectId (actor) - organizationId: ObjectId - projectId: ObjectId | null - invitationId: ObjectId | null - title: String - message: String - metadata: Object - read: Boolean - readAt: Date | null - createdAt: Date - expiresAt: Date | null Indexes: - Compound: {userId: 1, read: 1, createdAt: -1} - TTL: {expiresAt: 1} - Reference: {invitationId: 1} Args: db: MongoDB database instance Returns: bool: True if collection created successfully Raises: OperationFailure: If index creation fails """ collection_name = "notifications" try: if collection_name not in db.list_collection_names(): db.create_collection(collection_name) logger.info(f"✅ Created collection: {collection_name}") else: logger.info(f"ℹ️ Collection already exists: {collection_name}") indexes = [ IndexModel( [("userId", ASCENDING), ("read", ASCENDING), ("createdAt", ASCENDING)], name="user_read_created_idx", ), IndexModel( [("expiresAt", ASCENDING)], name="expiration_idx", expireAfterSeconds=0, # TTL index ), IndexModel([("invitationId", ASCENDING)], name="invitation_idx"), ] collection = db[collection_name] collection.create_indexes(indexes) logger.info(f"✅ Created indexes for {collection_name}") return True except CollectionInvalid as e: logger.error(f"❌ Failed to create collection {collection_name}: {e}") raise except OperationFailure as e: logger.error(f"❌ Failed to create indexes for {collection_name}: {e}") raise def create_api_keys_indexes(db) -> bool: """ Create indexes for api_keys collection to optimize authentication queries. Purpose: Improve API key verification performance by indexing key prefix lookups. Indexes: - Compound: {keyPrefix: 1, isActive: 1} - Optimizes auth query with both fields - Single: {userId: 1} - For user-specific key queries - Single: {projectId: 1} - For project-specific key queries Args: db: MongoDB database instance Returns: bool: True if indexes created successfully Raises: OperationFailure: If index creation fails """ collection_name = "api_keys" try: if collection_name not in db.list_collection_names(): logger.warning(f"⚠️ Collection {collection_name} does not exist yet") return False # Define indexes indexes = [ IndexModel( [("keyPrefix", ASCENDING), ("isActive", ASCENDING)], name="keyprefix_active_idx", ), IndexModel([("userId", ASCENDING)], name="user_idx"), IndexModel([("projectId", ASCENDING)], name="project_idx"), ] # Create indexes collection = db[collection_name] collection.create_indexes(indexes) logger.info(f"✅ Created indexes for {collection_name}") return True except OperationFailure as e: logger.error(f"❌ Failed to create indexes for {collection_name}: {e}") raise def verify_api_keys_schema(db) -> bool: """ Verify api_keys collection has required fields for project-scoped keys. The api_keys collection should have: - projectId: ObjectId (REQUIRED - makes key project-specific) - organizationId: ObjectId (for quick org-level queries) This function doesn't modify existing data, only verifies schema expectations. Args: db: MongoDB database instance Returns: bool: True if schema is valid """ collection_name = "api_keys" try: if collection_name not in db.list_collection_names(): logger.warning(f"⚠️ Collection {collection_name} does not exist yet") return False collection = db[collection_name] # Check if any documents have the required fields sample_doc = collection.find_one({"projectId": {"$exists": True}}) if sample_doc: logger.info(f"✅ Collection {collection_name} has projectId field") else: logger.info( f"ℹ️ Collection {collection_name} exists but no documents with projectId yet" ) return True except Exception as e: logger.error(f"❌ Error verifying {collection_name} schema: {e}") raise def initialize_database_schema(db) -> dict: """ Initialize all required collections and indexes for memory sharing. This is the main entry point for Phase 1: Database Foundation. Creates all new collections with proper indexes. Args: db: MongoDB database instance Returns: dict: Status report of initialization Raises: Exception: If any critical operation fails """ results = { "timestamp": utc_to_iso(utc_now()), "collections_created": [], "collections_existed": [], "indexes_created": True, "errors": [], } logger.info("🚀 Starting database schema initialization for Phase 1...") try: # Create organization_members collection create_organization_members_collection(db) results["collections_created"].append("organization_members") # Create project_members collection create_project_members_collection(db) results["collections_created"].append("project_members") # Create invitations collection create_invitations_collection(db) results["collections_created"].append("invitations") # Create notifications collection create_notifications_collection(db) results["collections_created"].append("notifications") # Create indexes for api_keys collection create_api_keys_indexes(db) # Verify api_keys schema verify_api_keys_schema(db) logger.info("✅ Database schema initialization completed successfully") except Exception as e: logger.error(f"❌ Database initialization failed: {e}") results["errors"].append(str(e)) raise return results def get_role_permissions(role: str) -> dict: """ Get permission mapping for a given project role. Role Permissions Mapping: - Admin: { canRead: true, canWrite: true, canDelete: true, canInvite: true } - Editor: { canRead: true, canWrite: true, canDelete: true, canInvite: false } - Viewer: { canRead: true, canWrite: false, canDelete: false, canInvite: false } Args: role: Role string (admin, editor, or viewer) Returns: dict: Permission mapping Raises: ValueError: If role is invalid """ permissions_map = { "admin": { "canRead": True, "canWrite": True, "canDelete": True, "canInvite": True, }, "editor": { "canRead": True, "canWrite": True, "canDelete": True, "canInvite": False, }, "viewer": { "canRead": True, "canWrite": False, "canDelete": False, "canInvite": False, }, } role_lower = role.lower() if role_lower not in permissions_map: raise ValueError(f"Invalid role: {role}. Must be one of: admin, editor, viewer") return permissions_map[role_lower]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shrijayan/SelfMemory'

If you have feedback or need assistance with the MCP directory API, please join our Discord server