"""
TTG Scratchpad MCP Server - Authentication Module
User ID extraction and Bearer token validation.
"""
import os
import logging
from typing import Optional, Set
from functools import lru_cache
from starlette.requests import Request
from starlette.responses import Response
# Configure logging
logger = logging.getLogger(__name__)
# ============================================================
# API KEY CONFIGURATION
# ============================================================
@lru_cache(maxsize=1)
def get_api_keys() -> Set[str]:
"""
Get valid API keys from environment.
Supports comma-separated list of keys.
Environment variable: MCP_API_KEYS
Example: MCP_API_KEYS=key1,key2,key3
"""
keys_str = os.environ.get("MCP_API_KEYS", "")
if not keys_str:
logger.warning("MCP_API_KEYS not set - authentication disabled!")
return set()
keys = {k.strip() for k in keys_str.split(",") if k.strip()}
logger.info(f"Loaded {len(keys)} API key(s) for authentication")
return keys
def clear_api_keys_cache() -> None:
"""Clear the API keys cache (useful for testing)."""
get_api_keys.cache_clear()
# ============================================================
# BEARER TOKEN VALIDATION
# ============================================================
def validate_bearer_token(request: Request) -> bool:
"""
Validate Bearer token from Authorization header.
Expected header format: Authorization: Bearer <token>
Returns True if:
- MCP_API_KEYS is not set (auth disabled)
- Token matches one of the configured API keys
Returns False if:
- Header missing or malformed
- Token doesn't match any configured key
"""
api_keys = get_api_keys()
# If no keys configured, allow all requests (development mode)
if not api_keys:
return True
auth_header = request.headers.get("Authorization", "")
if not auth_header:
logger.warning("Missing Authorization header")
return False
if not auth_header.startswith("Bearer "):
logger.warning("Invalid Authorization header format (expected 'Bearer <token>')")
return False
token = auth_header[7:] # Remove "Bearer " prefix
if token not in api_keys:
logger.warning("Invalid Bearer token")
return False
return True
def get_auth_error_response() -> Response:
"""Create a 401 Unauthorized response."""
return Response(
content='{"error": "Unauthorized", "message": "Invalid or missing Bearer token"}',
status_code=401,
media_type="application/json"
)
# ============================================================
# USER ID EXTRACTION
# ============================================================
def get_user_id_from_request(request: Request) -> Optional[str]:
"""
Extract user ID from X-User-ID header.
LibreChat sends this header with the current user's ID:
X-User-ID: {{LIBRECHAT_USER_ID}}
Returns None if header is missing.
"""
user_id = request.headers.get("X-User-ID")
if user_id:
# Clean up the user ID
user_id = user_id.strip()
if user_id:
return user_id
return None
def require_user_id(request: Request) -> str:
"""
Extract user ID from request, raising error if missing.
Use this in tools that require user isolation.
Raises:
ValueError: If X-User-ID header is missing
"""
user_id = get_user_id_from_request(request)
if not user_id:
raise ValueError(
"X-User-ID header required. "
"This tool requires user authentication."
)
return user_id
# ============================================================
# FASTMCP INTEGRATION HELPERS
# ============================================================
def get_current_user_id() -> str:
"""
Get current user ID from the request context.
This uses FastMCP's dependency injection to access the HTTP request.
Usage in tools:
@mcp.tool()
async def my_tool() -> str:
user_id = get_current_user_id()
...
Raises:
ValueError: If user ID cannot be extracted
RuntimeError: If called outside of request context
"""
try:
from fastmcp.server.dependencies import get_http_request
request = get_http_request()
return require_user_id(request)
except ImportError:
raise RuntimeError(
"FastMCP not available. "
"This function must be called within a FastMCP request context."
)
except Exception as e:
if "No HTTP request" in str(e):
raise RuntimeError(
"No HTTP request context available. "
"This function must be called within a FastMCP tool handler."
)
raise
def get_current_user_id_optional() -> Optional[str]:
"""
Get current user ID, returning None if not available.
Use this for tools that can work with or without user context.
"""
try:
return get_current_user_id()
except (ValueError, RuntimeError):
return None
# ============================================================
# MIDDLEWARE HELPERS
# ============================================================
async def authenticate_request(request: Request) -> Optional[Response]:
"""
Authenticate a request, returning error response if invalid.
Returns:
None if authentication successful
Response if authentication failed (return this to client)
Usage in custom route:
@mcp.custom_route("/mcp", methods=["POST"])
async def mcp_handler(request: Request):
error = await authenticate_request(request)
if error:
return error
# Continue processing...
"""
if not validate_bearer_token(request):
return get_auth_error_response()
return None
def log_request_info(request: Request) -> None:
"""Log request information for debugging."""
user_id = get_user_id_from_request(request)
logger.debug(
f"Request: {request.method} {request.url.path} "
f"User: {user_id or 'anonymous'}"
)