Skip to main content
Glama
middleware.py6.28 kB
""" FastMCP authentication integration and middleware. Provides functions for setting up and configuring FastMCP authentication with proper error handling and logging. """ import os import sys from typing import Optional from fastmcp import FastMCP from fastmcp.server.auth.providers.jwt import JWTVerifier from src.logging import get_logger logger = get_logger('AUTH_MIDDLEWARE') def setup_auth_provider(public_key_pem: Optional[str] = None) -> JWTVerifier: """ Set up and configure the JWTVerifier for FastMCP. Args: public_key_pem: PEM-encoded public key. If None, will read from environment. Returns: Configured JWTVerifier instance """ # Get public key from parameter or environment variable if public_key_pem is None: public_key_pem = os.getenv("PUBLIC_KEY_PEM", "") # Log warning if public key is not set if not public_key_pem: logger.warning("PUBLIC_KEY_PEM not found in environment variables - authentication may fail") logger.info("please set PUBLIC_KEY_PEM in your .env file with the correct public key") # Configure JWTVerifier with the public key return JWTVerifier(public_key=public_key_pem) def create_authenticated_mcp(server_name: str = "observe-community", public_key_pem: Optional[str] = None) -> FastMCP: """ Create a FastMCP instance with authentication configured. Args: server_name: Name for the MCP server public_key_pem: PEM-encoded public key. If None, will read from environment. Returns: Configured FastMCP instance with authentication """ auth_provider = setup_auth_provider(public_key_pem) return FastMCP(name=server_name, auth=auth_provider) def validate_auth_configuration() -> dict: """ Validate authentication configuration and return status. Returns: Dictionary with configuration status and recommendations """ public_key_pem = os.getenv("PUBLIC_KEY_PEM", "") status = { "configured": bool(public_key_pem), "public_key_length": len(public_key_pem) if public_key_pem else 0, "warnings": [], "recommendations": [] } if not public_key_pem: status["warnings"].append("PUBLIC_KEY_PEM not found in environment variables") status["recommendations"].append("Set PUBLIC_KEY_PEM in your .env file with the correct public key") elif len(public_key_pem) < 100: # Basic sanity check status["warnings"].append("PUBLIC_KEY_PEM appears to be too short for a valid PEM key") status["recommendations"].append("Verify that PUBLIC_KEY_PEM contains a complete PEM-encoded public key") elif not public_key_pem.startswith("-----BEGIN"): status["warnings"].append("PUBLIC_KEY_PEM does not appear to be in PEM format") status["recommendations"].append("Ensure PUBLIC_KEY_PEM is a properly formatted PEM-encoded public key") return status class AuthMiddleware: """ Authentication middleware wrapper for additional functionality. """ def __init__(self, auth_provider: JWTVerifier): """ Initialize auth middleware. Args: auth_provider: Configured JWTVerifier instance """ self.auth_provider = auth_provider self._stats = { "successful_authentications": 0, "failed_authentications": 0, "unauthorized_attempts": 0 } def get_stats(self) -> dict: """ Get authentication statistics. Returns: Dictionary with authentication statistics """ return self._stats.copy() def reset_stats(self) -> None: """Reset authentication statistics.""" self._stats = { "successful_authentications": 0, "failed_authentications": 0, "unauthorized_attempts": 0 } def log_successful_auth(self, client_id: str) -> None: """ Log a successful authentication. Args: client_id: Client identifier """ self._stats["successful_authentications"] += 1 logger.info(f"successful authentication | client:{client_id}") def log_failed_auth(self, reason: str) -> None: """ Log a failed authentication attempt. Args: reason: Reason for authentication failure """ self._stats["failed_authentications"] += 1 logger.warning(f"authentication failed | reason:{reason}") def log_unauthorized_attempt(self, required_scopes: list, user_scopes: list) -> None: """ Log an unauthorized access attempt. Args: required_scopes: Scopes required for access user_scopes: User's actual scopes """ self._stats["unauthorized_attempts"] += 1 logger.warning(f"unauthorized attempt | required_scopes:{required_scopes} | user_scopes:{user_scopes}") # Global auth middleware instance _auth_middleware: Optional[AuthMiddleware] = None def get_auth_middleware() -> Optional[AuthMiddleware]: """ Get the global auth middleware instance. Returns: AuthMiddleware instance or None if not initialized """ return _auth_middleware def initialize_auth_middleware(auth_provider: JWTVerifier) -> AuthMiddleware: """ Initialize the global auth middleware instance. Args: auth_provider: Configured JWTVerifier instance Returns: Initialized AuthMiddleware instance """ global _auth_middleware _auth_middleware = AuthMiddleware(auth_provider) return _auth_middleware def check_auth_health() -> dict: """ Check authentication system health. Returns: Dictionary with health status """ config_status = validate_auth_configuration() middleware = get_auth_middleware() health = { "status": "healthy" if config_status["configured"] else "unhealthy", "configuration": config_status, "middleware_initialized": middleware is not None, "statistics": middleware.get_stats() if middleware else None } return health

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rustomax/observe-experimental-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server