Skip to main content
Glama
kratos_validator.py9.15 kB
""" Kratos session validator. Validates Kratos sessions and extracts user identity and traits. No fallbacks - fail fast with clear errors. """ import logging from ory_kratos_client.exceptions import ApiException from .ory_config import get_kratos_frontend_api, get_kratos_identity_api from .retry_utils import retry_with_exponential_backoff # Session cookie validation constants MAX_COOKIE_LENGTH = 4096 # Standard HTTP header limit logger = logging.getLogger(__name__) # ============================================================================ # Data Models # ============================================================================ class KratosSession: """Kratos session data with user identity and traits.""" def __init__( self, user_id: str, email: str, organization_id: str, project_ids: list[str] | None = None, name: str | None = None, is_active: bool = True, ): """ Initialize Kratos session data. Args: user_id: Kratos identity ID email: User email organization_id: Organization ID from traits project_ids: List of project IDs from traits name: User name (optional) is_active: Whether the identity is active """ self.user_id = user_id self.email = email self.organization_id = organization_id self.project_ids = project_ids or [] self.name = name self.is_active = is_active def __repr__(self) -> str: """String representation for logging.""" return ( f"KratosSession(user_id={self.user_id}, email={self.email}, " f"org={self.organization_id}, projects={len(self.project_ids)})" ) # ============================================================================ # Session Validation # ============================================================================ def validate_session(session_cookie: str) -> KratosSession: """ Validate Kratos session and extract user identity. This function validates a session cookie with Kratos and extracts the user's identity and multi-tenant traits (organization_id, project_ids). Args: session_cookie: Kratos session cookie value (ory_kratos_session) Returns: KratosSession: Validated session with user identity and traits Raises: ValueError: If session is invalid, expired, or user is inactive ApiException: If Kratos API call fails """ if not session_cookie: error_msg = "Session cookie is required" logger.warning(error_msg) raise ValueError(error_msg) # Validate cookie length to prevent header overflow attacks if len(session_cookie) > MAX_COOKIE_LENGTH: error_msg = f"Session cookie exceeds maximum length: {len(session_cookie)}" logger.warning(error_msg) raise ValueError(error_msg) try: # Validate session with Kratos using Cookie header (not x-session-token) # This matches how the dashboard validates sessions successfully from ory_kratos_client.api_client import ApiClient logger.info(f"Validating Kratos session (cookie length: {len(session_cookie)})") # Create a custom API client with the Cookie header api_client = ApiClient( configuration=get_kratos_frontend_api().api_client.configuration ) # Set the Cookie header with the session token api_client.set_default_header("Cookie", f"ory_kratos_session={session_cookie}") # Create frontend API with our custom client from ory_kratos_client import FrontendApi frontend_api = FrontendApi(api_client=api_client) # Call Kratos to validate session with retry logic for transient failures session = retry_with_exponential_backoff( lambda: frontend_api.to_session(), max_retries=3, base_delay=1.0, ) # Check if session is active if not session.active: error_msg = "Session is not active" logger.warning(f"{error_msg}: {session.id}") raise ValueError(error_msg) # Extract identity identity = session.identity if not identity: error_msg = "Session has no identity" logger.error(error_msg) raise ValueError(error_msg) # Check if identity is active if identity.state != "active": error_msg = f"Identity is not active: state={identity.state}" logger.warning(f"{error_msg}, user={identity.id}") raise ValueError(error_msg) # Extract traits traits = identity.traits or {} # Get required traits email = traits.get("email") if not email: error_msg = "Identity missing required trait: email" logger.error(f"{error_msg}, user={identity.id}") raise ValueError(error_msg) organization_id = traits.get("organization_id") if not organization_id: error_msg = "Identity missing required trait: organization_id" logger.error(f"{error_msg}, user={identity.id}, email={email}") raise ValueError(error_msg) # Get optional traits project_ids = traits.get("project_ids", []) name = traits.get("name") # Create session object kratos_session = KratosSession( user_id=identity.id, email=email, organization_id=organization_id, project_ids=project_ids, name=name, is_active=True, ) logger.info(f"✅ Kratos session validated: {kratos_session}") return kratos_session except ApiException as e: # Handle Kratos API errors # Note: Don't chain exception to prevent internal details leakage if e.status == 401: error_msg = "Invalid or expired session" logger.warning(f"{error_msg}: {e.reason}") else: error_msg = f"Kratos API error: {e.reason}" logger.error(f"{error_msg}, status={e.status}") raise ValueError(error_msg) from None except Exception as e: error_msg = f"Session validation error: {str(e)}" logger.error(error_msg, exc_info=True) raise ValueError(error_msg) from e def get_identity_by_id(identity_id: str) -> KratosSession: """ Get identity from Kratos by ID. This uses the Admin API to fetch identity details. Useful for syncing user data or verifying identity state. Args: identity_id: Kratos identity ID Returns: KratosSession: Identity data with traits Raises: ValueError: If identity not found or invalid ApiException: If Kratos API call fails """ if not identity_id: error_msg = "Identity ID is required" logger.warning(error_msg) raise ValueError(error_msg) try: identity_api = get_kratos_identity_api() logger.info(f"Fetching identity from Kratos: {identity_id}") # Get identity from Admin API with retry logic for transient failures identity = retry_with_exponential_backoff( lambda: identity_api.get_identity(id=identity_id), max_retries=3, base_delay=1.0, ) # Check if identity is active if identity.state != "active": error_msg = f"Identity is not active: state={identity.state}" logger.warning(f"{error_msg}, user={identity_id}") raise ValueError(error_msg) # Extract traits traits = identity.traits or {} email = traits.get("email") if not email: error_msg = "Identity missing required trait: email" logger.error(f"{error_msg}, user={identity_id}") raise ValueError(error_msg) organization_id = traits.get("organization_id") if not organization_id: error_msg = "Identity missing required trait: organization_id" logger.error(f"{error_msg}, user={identity_id}, email={email}") raise ValueError(error_msg) project_ids = traits.get("project_ids", []) name = traits.get("name") kratos_session = KratosSession( user_id=identity.id, email=email, organization_id=organization_id, project_ids=project_ids, name=name, is_active=True, ) logger.info(f"✅ Identity fetched: {kratos_session}") return kratos_session except ApiException as e: if e.status == 404: error_msg = f"Identity not found: {identity_id}" logger.warning(error_msg) else: error_msg = f"Kratos API error: {e.reason}" logger.error(f"{error_msg}, status={e.status}") raise ValueError(error_msg) from None except Exception as e: error_msg = f"Error fetching identity: {str(e)}" logger.error(error_msg, exc_info=True) raise ValueError(error_msg) from e

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shrijayan/SelfMemory'

If you have feedback or need assistance with the MCP directory API, please join our Discord server