Skip to main content
Glama
dependencies.py20.7 kB
""" Shared dependencies for the server. This module contains authentication, database connections, and other shared dependencies to avoid circular imports. """ import hashlib import logging from typing import NamedTuple from fastapi import Header, HTTPException from .auth import validate_session, validate_token from .config import config from .database import mongo_db from .utils.crypto import verify_api_key from .utils.datetime_helpers import is_expired, utc_now from .utils.permission_helpers import get_user_object_id_from_kratos_id, is_owner from .utils.validators import validate_object_id logger = logging.getLogger(__name__) # ============================================================================ # Data Models # ============================================================================ class ProjectPermissions(NamedTuple): """User's permissions in a project.""" canRead: bool canWrite: bool canDelete: bool canInvite: bool # Default denied permissions - fail-closed approach DENIED_PERMISSIONS = ProjectPermissions( canRead=False, canWrite=False, canDelete=False, canInvite=False ) # Full access permissions (for owners and admins) FULL_PERMISSIONS = ProjectPermissions( canRead=True, canWrite=True, canDelete=True, canInvite=True ) class AuthContext(NamedTuple): """Authentication context for multi-tenant support.""" user_id: str project_id: str | None = None organization_id: str | None = None permissions: ProjectPermissions = ( DENIED_PERMISSIONS # User's permissions in the project (defaults to denied) ) # ============================================================================ # Permission Checking Helper Functions # ============================================================================ def get_user_object_id(user_id: str): """ Get user identifier for database queries. With Kratos authentication, user_id IS the Kratos identity_id (UUID string) which is stored directly as _id in the users collection. Args: user_id: Kratos identity_id (UUID string) Returns: str: User ID (Kratos identity_id as string) Raises: ValueError: If user not found """ # user_id is Kratos identity_id (string), stored directly as _id user = mongo_db.users.find_one({"_id": user_id}) if not user: raise ValueError(f"User not found with ID: {user_id}") return user_id def check_project_access(user_id: str, project_id: str) -> bool: """ Check if a user has access to a project. A user has access if: - They are the organization owner (implicit full access), OR - They are the project owner, OR - They are a member in project_members collection Args: user_id: User ID string (Kratos identity_id) project_id: Project ID string (MongoDB ObjectId) Returns: bool: True if user has access, False otherwise """ try: # user_id is Kratos identity_id (string) project_obj_id = validate_object_id(project_id, "project_id") # Get project to check organization ownership project = mongo_db.projects.find_one({"_id": project_obj_id}) if not project: return False # Get user's MongoDB ObjectId by looking up via kratosId try: user_obj_id = get_user_object_id_from_kratos_id(mongo_db, user_id) except ValueError: logger.warning(f"User not found: {user_id}") return False # Check if user is the organization owner (implicit full access) organization = mongo_db.organizations.find_one( {"_id": project["organizationId"]} ) if organization and is_owner(organization, user_id, user_obj_id): return True # Check if user owns the project if is_owner(project, user_id, user_obj_id): return True # Check if user is a member member = mongo_db.project_members.find_one( {"projectId": project_obj_id, "userId": user_obj_id} ) return member is not None except Exception as e: logger.error(f"Error checking project access: {e}") return False def get_user_permissions(user_id: str, project_id: str) -> ProjectPermissions: """ Get a user's permissions for a specific project. FAIL-CLOSED APPROACH: Always returns ProjectPermissions. Returns DENIED_PERMISSIONS if user has no access. Never returns None to prevent NoneType errors. Returns full permissions for organization owners (implicit admin access), or permissions from project_members if user is a member, or full permissions if user is the project owner. Args: user_id: User ID string (Kratos identity_id) project_id: Project ID string (MongoDB ObjectId) Returns: ProjectPermissions: User's permissions (DENIED_PERMISSIONS if no access) """ try: # user_id is Kratos identity_id (string) project_obj_id = validate_object_id(project_id, "project_id") # Get project to check organization ownership project = mongo_db.projects.find_one({"_id": project_obj_id}) if not project: logger.warning(f"Project not found: {project_id}") return DENIED_PERMISSIONS # Get user's MongoDB ObjectId by looking up via kratosId try: user_obj_id = get_user_object_id_from_kratos_id(mongo_db, user_id) except ValueError: logger.warning(f"User not found: {user_id}") return DENIED_PERMISSIONS # Check if user is the organization owner (implicit full access) organization = mongo_db.organizations.find_one( {"_id": project["organizationId"]} ) if organization and is_owner(organization, user_id, user_obj_id): return FULL_PERMISSIONS # Check if user owns the project if is_owner(project, user_id, user_obj_id): return FULL_PERMISSIONS # Get permissions from project_members member = mongo_db.project_members.find_one( {"projectId": project_obj_id, "userId": user_obj_id} ) if not member: logger.info(f"User {user_id} is not a member of project {project_id}") return DENIED_PERMISSIONS perms = member.get("permissions", {}) return ProjectPermissions( canRead=perms.get("canRead", False), canWrite=perms.get("canWrite", False), canDelete=perms.get("canDelete", False), canInvite=perms.get("canInvite", False), ) except Exception as e: logger.error(f"Error getting user permissions: {e}") return DENIED_PERMISSIONS def has_permission(user_id: str, project_id: str, permission: str) -> bool: """ Check if a user has a specific permission in a project. FAIL-CLOSED APPROACH: Returns False if permission is not found or user has no access. Args: user_id: User ID string project_id: Project ID string permission: Permission to check (e.g., "canRead", "canWrite", "canDelete", "canInvite") Returns: bool: True if user has the permission, False otherwise """ permissions = get_user_permissions(user_id, project_id) # Map permission string to attribute permission_map = { "canRead": permissions.canRead, "canWrite": permissions.canWrite, "canDelete": permissions.canDelete, "canInvite": permissions.canInvite, "read": permissions.canRead, "write": permissions.canWrite, "delete": permissions.canDelete, "invite": permissions.canInvite, } return permission_map.get(permission, False) def is_project_admin(user_id: str, project_id: str) -> bool: """ Check if a user is an admin of a project. A user is an admin if: - They are the project owner, OR - They have the "admin" role in project_members Args: user_id: User ID string (Kratos identity_id) project_id: Project ID string (MongoDB ObjectId) Returns: bool: True if user is admin, False otherwise """ try: # user_id is Kratos identity_id (string) project_obj_id = validate_object_id(project_id, "project_id") # Get user's MongoDB ObjectId by looking up via kratosId try: user_obj_id = get_user_object_id_from_kratos_id(mongo_db, user_id) except ValueError: logger.warning(f"User not found: {user_id}") return False # Check if user owns the project project = mongo_db.projects.find_one({"_id": project_obj_id}) if project and is_owner(project, user_id, user_obj_id): return True # Check if user has admin role in project_members member = mongo_db.project_members.find_one( {"projectId": project_obj_id, "userId": user_obj_id, "role": "admin"} ) return member is not None except Exception as e: logger.error(f"Error checking project admin status: {e}") return False # ============================================================================ # Authentication Middleware # ============================================================================ def authenticate_api_key(authorization: str = Header(None)) -> AuthContext: """ Ory-based authentication with legacy API key support. Authentication methods (in order): 1. Kratos session cookie (Session prefix) - Dashboard users 2. Hydra OAuth token (Bearer prefix) - MCP OAuth users 3. Legacy API keys (Bearer sk_im_ prefix) - Temporary backward compatibility Returns AuthContext with user_id (Kratos identity_id for Ory auth). """ if not authorization: raise HTTPException(status_code=401, detail="Authorization header required") # ======================================================================== # 1. Kratos Session Authentication (Dashboard users) # ======================================================================== if authorization.startswith("Session "): session_cookie = authorization.replace("Session ", "") try: # Validate session with Kratos kratos_session = validate_session(session_cookie) # user_id is now Kratos identity_id user_id = kratos_session.user_id organization_id = kratos_session.organization_id logger.info( f"✅ Kratos session validated: user={kratos_session.email}, " f"identity_id={user_id}, org={organization_id}" ) # Project context comes from request parameters for dashboard users return AuthContext( user_id=user_id, project_id=None, # Set by endpoint from request params organization_id=organization_id, permissions=DENIED_PERMISSIONS, # Set by endpoint after project validation ) except ValueError as e: logger.warning(f"❌ Kratos session validation failed: {e}") raise HTTPException(status_code=401, detail=str(e)) from e except Exception as e: logger.error(f"❌ Kratos session validation error: {e}") raise HTTPException( status_code=500, detail="Session validation error" ) from e # ======================================================================== # 2. Bearer Token Authentication (OAuth or Legacy API Key) # ======================================================================== if not authorization.startswith("Bearer "): raise HTTPException( status_code=401, detail="Authorization header must start with 'Bearer ' or 'Session '", ) token = authorization.replace("Bearer ", "") # Try Hydra OAuth token first (MCP users) if not token.startswith("sk_im_"): try: # Validate OAuth token with Hydra hydra_token = validate_token(token) # user_id is Kratos identity_id from token subject user_id = hydra_token.subject project_id = hydra_token.project_id organization_id = hydra_token.organization_id logger.info( f"✅ Hydra token validated: subject={user_id}, " f"project={project_id}, org={organization_id}" ) # For OAuth tokens, project context comes from token claims # Get permissions from MongoDB based on project access permissions = DENIED_PERMISSIONS if project_id: permissions = get_user_permissions(user_id, project_id) return AuthContext( user_id=user_id, project_id=project_id, organization_id=organization_id, permissions=permissions, ) except ValueError as e: logger.warning(f"❌ Hydra token validation failed: {e}") raise HTTPException(status_code=401, detail=str(e)) from None except Exception as e: logger.error(f"❌ Hydra token validation error: {e}") raise HTTPException(status_code=500, detail="Token validation error") from e # ======================================================================== # 3. Legacy API Key Authentication (Temporary backward compatibility) # ======================================================================== api_key = token if not api_key.startswith("sk_im_"): logger.warning("❌ Invalid token: not Hydra token or legacy API key") raise HTTPException(status_code=401, detail="Invalid token format") try: # Find all active API keys with matching prefix for efficiency key_prefix = api_key[:10] + "..." logger.info("🔍 Looking up API key with prefix") potential_keys_cursor = mongo_db.api_keys.find( {"keyPrefix": key_prefix, "isActive": True} ).limit(100) # Convert cursor to list and check count for monitoring potential_keys = list(potential_keys_cursor) logger.info( f"🔍 Found {len(potential_keys)} potential key(s) with matching prefix" ) # Security: Hash prefix before logging to avoid leaking sensitive info # Note: SHA256 is appropriate here as it's only for logging collision warnings, # not for password hashing. API key verification uses Argon2. if len(potential_keys) > config.auth.COLLISION_WARNING_THRESHOLD: hashed_prefix = hashlib.sha256(key_prefix.encode()).hexdigest()[:8] logger.warning( f"⚠️ High API key prefix collision: {len(potential_keys)} keys (prefix hash: {hashed_prefix})" ) # Performance: Limit expensive Argon2 hash verifications checked_count = 0 stored_key = None for candidate in potential_keys[: config.auth.MAX_HASH_VERIFICATIONS]: checked_count += 1 logger.info( f"🔍 Verifying candidate {checked_count}: userId={candidate.get('userId')}, projectId={candidate.get('projectId')}" ) if verify_api_key(api_key, candidate["keyHash"]): stored_key = candidate logger.info( f"✅ API key hash verification successful for candidate {checked_count}" ) break logger.info( f"❌ API key hash verification failed for candidate {checked_count}" ) if not stored_key: if len(potential_keys) > config.auth.MAX_HASH_VERIFICATIONS: logger.error( f"❌ Auth failed after {checked_count} hash verifications. " f"Total candidates: {len(potential_keys)}. Possible attack or system issue." ) else: logger.warning( f"❌ No matching API key found after checking {checked_count} candidate(s)" ) raise HTTPException(status_code=401, detail="Invalid API key") # Check if key is expired if stored_key.get("expiresAt") and is_expired(stored_key["expiresAt"]): logger.warning( f"❌ Expired API key attempted: {stored_key.get('keyPrefix', 'unknown')}" ) raise HTTPException(status_code=401, detail="API key expired") # Get user to verify it's active # Note: API key now stores Kratos ID directly (not MongoDB ObjectId) user = mongo_db.users.find_one({"kratosId": stored_key["userId"]}) logger.info( f"🔍 Looking up user: {stored_key['userId']}, found={user is not None}" ) if not user or not user.get("isActive", True): logger.warning( f"❌ API key belongs to inactive user: {stored_key['userId']}" ) raise HTTPException(status_code=401, detail="User account inactive") # Extract multi-tenant context from API key # userId is now Kratos ID (already a string) user_id = stored_key["userId"] project_id = None organization_id = None # Get project context if API key has projectId (Phase 6 enhancement) permissions = None if stored_key.get("projectId"): logger.info(f"🔍 API key has projectId: {stored_key['projectId']}") project = mongo_db.projects.find_one({"_id": stored_key["projectId"]}) logger.info(f"🔍 Project lookup result: found={project is not None}") if project: project_id = str(project["_id"]) organization_id = str(project["organizationId"]) logger.info( f"🔍 Project context: project_id={project_id}, org_id={organization_id}, owner_id={project.get('ownerId')}" ) # PHASE 6 ENHANCEMENT: Verify user still has project access # Check project membership (not just ownership) logger.info( f"🔍 Checking project access for user {user_id} to project {project_id}" ) has_access = check_project_access(user_id, project_id) logger.info(f"🔍 Project access check result: {has_access}") if not has_access: logger.warning( f"❌ User {user_id} no longer has access to project {project_id}" ) raise HTTPException( status_code=403, detail="Access denied - project membership revoked", ) # PHASE 6 ENHANCEMENT: Get user's permissions from project_members permissions = get_user_permissions(user_id, project_id) # Check if user has any permissions (not all denied) if permissions == DENIED_PERMISSIONS: logger.warning( f"❌ User {user_id} has no permissions for project {project_id}" ) raise HTTPException( status_code=403, detail="Access denied - no project permissions" ) logger.info( f"✅ Multi-tenant auth: user={user.get('email', user_id)}, " f"project={project_id}, org={organization_id}, " f"permissions={permissions}" ) else: logger.warning( f"❌ API key references non-existent project: {stored_key['projectId']}" ) raise HTTPException(status_code=404, detail="Project not found") else: # Backward compatibility: API key without project context logger.info( f"✅ Legacy context: user={user.get('email', user_id)} (no project context)" ) # Update last used timestamp mongo_db.api_keys.update_one( {"_id": stored_key["_id"]}, {"$set": {"lastUsed": utc_now()}} ) return AuthContext( user_id=user_id, project_id=project_id, organization_id=organization_id, permissions=permissions, ) except HTTPException: raise except Exception as e: logger.error(f"API key authentication error: {e}") raise HTTPException(status_code=500, detail="Authentication error") from e

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shrijayan/SelfMemory'

If you have feedback or need assistance with the MCP directory API, please join our Discord server