Skip to main content
Glama
jwt_auth.py7.5 kB
""" LedgAI QuantConnect MCP Server - JWT Authentication Multi-tenant JWT authentication with QuantConnect credentials """ import os import jwt import logging from typing import Dict, Any, Optional, Set from datetime import datetime, timezone from dataclasses import dataclass from cryptography.hazmat.primitives import serialization logger = logging.getLogger(__name__) @dataclass class QuantConnectCredentials: """QuantConnect API credentials extracted from JWT""" user_id: str api_token: str organization_id: Optional[str] = None @dataclass class AuthenticatedUser: """Authenticated user information from JWT""" user_id: str scopes: Set[str] qc_credentials: QuantConnectCredentials organization_id: Optional[str] = None class JWTAuthError(Exception): """JWT Authentication related errors""" pass class InsufficientScopesError(JWTAuthError): """Raised when user doesn't have required scopes""" pass class JWTAuthenticator: """ JWT token validator for LedgAI QuantConnect MCP Server Validates JWT tokens containing user scopes and QuantConnect credentials """ def __init__( self, secret_key: Optional[str] = None, algorithm: str = "HS256", issuer: str = "ledgai", audience: str = "quantconnect-mcp" ): """ Initialize JWT authenticator Args: secret_key: JWT signing secret (from environment if not provided) algorithm: JWT signing algorithm issuer: Expected token issuer audience: Expected token audience """ self.secret_key = secret_key or os.getenv('JWT_SECRET_KEY') or 'dev-secret-key-change-in-production' if not self.secret_key: raise ValueError("JWT_SECRET_KEY environment variable or secret_key parameter required") self.algorithm = algorithm self.issuer = issuer self.audience = audience def validate_token(self, token: str) -> AuthenticatedUser: """ Validate JWT token and extract user information Args: token: JWT token string Returns: AuthenticatedUser object with user info and credentials Raises: JWTAuthError: If token is invalid or expired """ try: # Decode and validate JWT token payload = jwt.decode( token, self.secret_key, algorithms=[self.algorithm], issuer=self.issuer, audience=self.audience, options={ "require_exp": True, "require_iat": True, "require_sub": True } ) # Extract user information user_id = payload.get('sub') if not user_id: raise JWTAuthError("Token missing 'sub' claim") # Extract scopes scopes = set(payload.get('scopes', [])) if not isinstance(scopes, set) or not scopes: raise JWTAuthError("Token missing or invalid 'scopes' claim") # Extract QuantConnect credentials qc_creds_data = payload.get('qc_credentials', {}) if not qc_creds_data: raise JWTAuthError("Token missing 'qc_credentials' claim") qc_credentials = QuantConnectCredentials( user_id=qc_creds_data.get('user_id'), api_token=qc_creds_data.get('api_token'), organization_id=qc_creds_data.get('organization_id') ) if not qc_credentials.user_id or not qc_credentials.api_token: raise JWTAuthError("Invalid QuantConnect credentials in token") return AuthenticatedUser( user_id=user_id, scopes=scopes, qc_credentials=qc_credentials, organization_id=payload.get('organization_id') ) except jwt.ExpiredSignatureError: raise JWTAuthError("Token has expired") except jwt.InvalidTokenError as e: raise JWTAuthError(f"Invalid token: {str(e)}") except Exception as e: logger.error(f"JWT validation error: {str(e)}") raise JWTAuthError(f"Token validation failed: {str(e)}") def extract_bearer_token(self, authorization_header: Optional[str]) -> str: """ Extract Bearer token from Authorization header Args: authorization_header: HTTP Authorization header value Returns: JWT token string Raises: JWTAuthError: If header is missing or invalid format """ if not authorization_header: raise JWTAuthError("Missing Authorization header") parts = authorization_header.split(' ') if len(parts) != 2 or parts[0].lower() != 'bearer': raise JWTAuthError("Invalid Authorization header format. Expected 'Bearer <token>'") return parts[1] def create_token( self, user_id: str, scopes: Set[str], qc_credentials: QuantConnectCredentials, expires_in_hours: int = 24, organization_id: Optional[str] = None ) -> str: """ Create JWT token for development/testing purposes Args: user_id: User identifier scopes: Set of authorized scopes qc_credentials: QuantConnect API credentials expires_in_hours: Token expiration time in hours organization_id: Optional organization ID Returns: JWT token string """ now = datetime.now(timezone.utc) payload = { 'iss': self.issuer, 'aud': self.audience, 'sub': user_id, 'iat': now.timestamp(), 'exp': (now.timestamp() + (expires_in_hours * 3600)), 'scopes': list(scopes), 'qc_credentials': { 'user_id': qc_credentials.user_id, 'api_token': qc_credentials.api_token, 'organization_id': qc_credentials.organization_id } } if organization_id: payload['organization_id'] = organization_id return jwt.encode(payload, self.secret_key, algorithm=self.algorithm) # Global authenticator instance _authenticator: Optional[JWTAuthenticator] = None def get_authenticator() -> JWTAuthenticator: """Get or create global JWT authenticator instance""" global _authenticator if _authenticator is None: _authenticator = JWTAuthenticator() return _authenticator def validate_authorization(authorization_header: Optional[str]) -> AuthenticatedUser: """ Validate authorization header and return authenticated user Args: authorization_header: HTTP Authorization header value Returns: AuthenticatedUser object Raises: JWTAuthError: If authentication fails """ authenticator = get_authenticator() token = authenticator.extract_bearer_token(authorization_header) return authenticator.validate_token(token)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/i-dream-of-ai/quantconnect-mcp-jwt'

If you have feedback or need assistance with the MCP directory API, please join our Discord server