Skip to main content
Glama
dependencies.py6.43 kB
"""FastAPI dependencies for multi-tenant organization context. Provides dependency injection for organization-scoped requests, API key validation, and decrypted Hostaway credentials retrieval. """ import hashlib import logging from typing import NamedTuple from fastapi import Header, HTTPException, status from src.services.credential_service import DecryptedCredentials from src.services.supabase_client import get_supabase_client logger = logging.getLogger(__name__) class OrganizationContext(NamedTuple): """Organization context for authenticated MCP requests.""" organization_id: int api_key_id: int hostaway_credentials: DecryptedCredentials class AuthenticationError(HTTPException): """Raised when API key validation fails.""" def __init__(self, detail: str = "Invalid or inactive API key") -> None: """Initialize authentication error with 401 status.""" super().__init__( status_code=status.HTTP_401_UNAUTHORIZED, detail=detail, headers={"WWW-Authenticate": "Bearer"}, ) class CredentialError(HTTPException): """Raised when Hostaway credentials are missing or invalid.""" def __init__(self, detail: str = "Hostaway credentials not configured or invalid") -> None: """Initialize credential error with 403 status.""" super().__init__( status_code=status.HTTP_403_FORBIDDEN, detail=detail, ) def hash_api_key(api_key: str) -> str: """Hash API key using SHA-256 for database lookup. Args: api_key: Raw API key from X-API-Key header Returns: 64-character hexadecimal SHA-256 hash Example: >>> hash_api_key("api_abc123") '9f86d081884c7d659a2feaa0c55ad015...' # 64 chars """ return hashlib.sha256(api_key.encode()).hexdigest() async def get_organization_context( x_api_key: str = Header(..., description="MCP API key for organization"), ) -> OrganizationContext: """Validate X-API-Key header and return organization context with decrypted credentials. This dependency: 1. Hashes the provided API key using SHA-256 2. Looks up the key in api_keys table (must be active) 3. Retrieves organization_id from api_keys record 4. Fetches encrypted Hostaway credentials for the organization 5. Decrypts credentials using Supabase Vault 6. Updates last_used_at timestamp for the API key 7. Returns OrganizationContext with org_id and decrypted credentials Args: x_api_key: API key from X-API-Key header Returns: OrganizationContext with organization ID and decrypted Hostaway credentials Raises: AuthenticationError: If API key is invalid, inactive, or not found CredentialError: If Hostaway credentials missing or decryption fails Example: >>> from fastapi import Depends >>> @app.get("/api/properties") >>> async def get_properties( ... ctx: OrganizationContext = Depends(get_organization_context) ... ): ... # ctx.organization_id: 123 ... # ctx.hostaway_credentials.account_id: "ACC_12345" ... # ctx.hostaway_credentials.secret_key: "sk_live_abc..." ... pass """ supabase = get_supabase_client() key_hash = hash_api_key(x_api_key) # 1. Validate API key and get organization_id try: api_key_response = ( supabase.table("api_keys") .select("id, organization_id, is_active") .eq("key_hash", key_hash) .eq("is_active", True) .single() .execute() ) if not api_key_response.data: raise AuthenticationError("API key not found or inactive") api_key_data = api_key_response.data api_key_id: int = api_key_data["id"] organization_id: int = api_key_data["organization_id"] except Exception as e: # Log error and return generic auth failure raise AuthenticationError(f"API key validation failed: {e!s}") from e # 2. Get Hostaway credentials for organization try: creds_response = ( supabase.table("hostaway_credentials") .select("account_id, encrypted_secret_key, credentials_valid") .eq("organization_id", organization_id) .single() .execute() ) if not creds_response.data: raise CredentialError("Hostaway credentials not configured for organization") creds_data = creds_response.data # Check if credentials marked as invalid (e.g., 401 from Hostaway) if not creds_data.get("credentials_valid", True): raise CredentialError( "Hostaway credentials are invalid. Please re-authenticate in dashboard." ) account_id: str = creds_data["account_id"] # Plain text account ID encrypted_secret_key: str = creds_data["encrypted_secret_key"] except CredentialError: # Re-raise credential errors as-is raise except Exception as e: raise CredentialError(f"Failed to retrieve Hostaway credentials: {e!s}") from e # 3. Decrypt secret key using Vault (account_id is stored in plain text) try: # Decrypt only the secret key secret_response = supabase.rpc( "decrypt_hostaway_credential", {"ciphertext": encrypted_secret_key}, ).execute() if not secret_response.data: raise CredentialError("Failed to decrypt secret key") secret_key: str = secret_response.data # Create decrypted credentials (account_id is already plain text) decrypted = DecryptedCredentials( account_id=account_id, secret_key=secret_key, ) except Exception as e: raise CredentialError(f"Failed to decrypt Hostaway credentials: {e!s}") from e # 4. Update API key last_used_at timestamp (fire and forget) try: supabase.rpc( "update_api_key_last_used", {"key_hash": key_hash}, ).execute() except Exception as e: # Non-critical - log but don't fail request logger.warning(f"Failed to update API key last_used timestamp: {e!s}") return OrganizationContext( organization_id=organization_id, api_key_id=api_key_id, hostaway_credentials=decrypted, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/darrentmorgan/hostaway-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server