Skip to main content
Glama

AnyDocs MCP Server

by funky1688
manager.pyโ€ข14.2 kB
#!/usr/bin/env python3 """ Authentication Manager Unified authentication and authorization management. """ from typing import Any, Dict, List, Optional, Union from enum import Enum import structlog from .base import ( BaseAuthenticator, BaseAuthorizer, User, AuthToken, Permission, UserRole, AuthenticationError, AuthorizationError, InvalidTokenError, TokenExpiredError, ) from .api_key import APIKeyAuthenticator, APIKeyAuthorizer from .jwt_auth import JWTAuthenticator, JWTAuthorizer from .oauth2 import OAuth2Authenticator, OAuth2Authorizer logger = structlog.get_logger(__name__) class AuthMethod(Enum): """Authentication methods.""" API_KEY = "api_key" JWT = "jwt" OAUTH2 = "oauth2" class AuthManager: """Unified authentication and authorization manager.""" def __init__( self, default_method: AuthMethod = AuthMethod.API_KEY, jwt_secret: Optional[str] = None, oauth2_redirect_uri: Optional[str] = None, secret_key: Optional[str] = None, ): """Initialize authentication manager. Args: default_method: Default authentication method jwt_secret: JWT secret key (required for JWT auth) oauth2_redirect_uri: OAuth2 redirect URI """ self.default_method = default_method # Initialize authenticators self._authenticators: Dict[AuthMethod, BaseAuthenticator] = { AuthMethod.API_KEY: APIKeyAuthenticator(secret_key=secret_key or "default-secret-key"), AuthMethod.JWT: JWTAuthenticator(secret_key=jwt_secret) if jwt_secret else None, AuthMethod.OAUTH2: OAuth2Authenticator(default_redirect_uri=oauth2_redirect_uri) if oauth2_redirect_uri else None, } # Initialize authorizers self._authorizers: Dict[AuthMethod, BaseAuthorizer] = { AuthMethod.API_KEY: APIKeyAuthorizer(), AuthMethod.JWT: JWTAuthorizer() if jwt_secret else None, AuthMethod.OAUTH2: OAuth2Authorizer() if oauth2_redirect_uri else None, } # Remove None values self._authenticators = {k: v for k, v in self._authenticators.items() if v is not None} self._authorizers = {k: v for k, v in self._authorizers.items() if v is not None} logger.info( "Authentication manager initialized", default_method=default_method.value, available_methods=[method.value for method in self._authenticators.keys()], ) def get_authenticator(self, method: AuthMethod) -> BaseAuthenticator: """Get authenticator for method. Args: method: Authentication method Returns: Authenticator instance Raises: ValueError: If method not available """ if method not in self._authenticators: raise ValueError(f"Authentication method '{method.value}' not available") return self._authenticators[method] def get_authorizer(self, method: AuthMethod) -> BaseAuthorizer: """Get authorizer for method. Args: method: Authentication method Returns: Authorizer instance Raises: ValueError: If method not available """ if method not in self._authorizers: raise ValueError(f"Authorization method '{method.value}' not available") return self._authorizers[method] async def authenticate( self, credentials: Dict[str, Any], method: Optional[AuthMethod] = None, ) -> tuple[User, AuthMethod]: """Authenticate user with credentials. Args: credentials: Authentication credentials method: Specific authentication method to use Returns: Tuple of (user, auth_method) Raises: AuthenticationError: If authentication fails """ method = method or self.default_method try: authenticator = self.get_authenticator(method) user = await authenticator.authenticate(credentials) logger.info( "User authentication successful", user_id=user.id, method=method.value, ) return user, method except Exception as e: logger.error( "User authentication failed", method=method.value, error=str(e), ) raise AuthenticationError(f"Authentication failed: {e}") async def validate_token( self, token: str, method: Optional[AuthMethod] = None, ) -> tuple[User, AuthMethod]: """Validate authentication token. Args: token: Authentication token method: Specific authentication method to use Returns: Tuple of (user, auth_method) Raises: InvalidTokenError: If token is invalid TokenExpiredError: If token is expired """ # If method not specified, try all available methods methods_to_try = [method] if method else list(self._authenticators.keys()) last_error = None for auth_method in methods_to_try: try: authenticator = self.get_authenticator(auth_method) user = await authenticator.validate_token(token) logger.info( "Token validation successful", user_id=user.id, method=auth_method.value, ) return user, auth_method except (InvalidTokenError, TokenExpiredError) as e: last_error = e continue except Exception as e: logger.warning( "Token validation error", method=auth_method.value, error=str(e), ) last_error = e continue # If we get here, all methods failed if last_error: raise last_error else: raise InvalidTokenError("Token validation failed for all methods") async def create_token( self, user: User, method: Optional[AuthMethod] = None, **kwargs, ) -> AuthToken: """Create authentication token for user. Args: user: User to create token for method: Authentication method to use **kwargs: Additional token parameters Returns: Authentication token Raises: AuthenticationError: If token creation fails """ method = method or self.default_method try: authenticator = self.get_authenticator(method) token = await authenticator.create_token(user, **kwargs) logger.info( "Token creation successful", user_id=user.id, method=method.value, ) return token except Exception as e: logger.error( "Token creation failed", user_id=user.id, method=method.value, error=str(e), ) raise AuthenticationError(f"Token creation failed: {e}") async def revoke_token( self, token: str, method: Optional[AuthMethod] = None, ) -> bool: """Revoke authentication token. Args: token: Token to revoke method: Authentication method to use Returns: True if revocation successful """ # If method not specified, try all available methods methods_to_try = [method] if method else list(self._authenticators.keys()) for auth_method in methods_to_try: try: authenticator = self.get_authenticator(auth_method) if await authenticator.revoke_token(token): logger.info( "Token revocation successful", method=auth_method.value, ) return True except Exception as e: logger.warning( "Token revocation error", method=auth_method.value, error=str(e), ) continue return False async def check_permission( self, user: User, permission: Permission, resource: Optional[str] = None, method: Optional[AuthMethod] = None, ) -> bool: """Check if user has permission. Args: user: User to check permission: Permission to check resource: Optional resource identifier method: Authorization method to use Returns: True if user has permission """ method = method or self.default_method try: authorizer = self.get_authorizer(method) has_permission = await authorizer.check_permission(user, permission, resource) logger.debug( "Permission check", user_id=user.id, permission=permission.value, resource=resource, has_permission=has_permission, method=method.value, ) return has_permission except Exception as e: logger.error( "Permission check failed", user_id=user.id, permission=permission.value, resource=resource, method=method.value, error=str(e), ) return False async def require_permission( self, user: User, permission: Permission, resource: Optional[str] = None, method: Optional[AuthMethod] = None, ) -> None: """Require user to have permission. Args: user: User to check permission: Required permission resource: Optional resource identifier method: Authorization method to use Raises: AuthorizationError: If user lacks permission """ method = method or self.default_method try: authorizer = self.get_authorizer(method) await authorizer.require_permission(user, permission, resource) logger.debug( "Permission requirement satisfied", user_id=user.id, permission=permission.value, resource=resource, method=method.value, ) except AuthorizationError: logger.warning( "Permission requirement failed", user_id=user.id, permission=permission.value, resource=resource, method=method.value, ) raise except Exception as e: logger.error( "Permission requirement check failed", user_id=user.id, permission=permission.value, resource=resource, method=method.value, error=str(e), ) raise AuthorizationError(f"Permission check failed: {e}") def get_available_methods(self) -> List[AuthMethod]: """Get list of available authentication methods. Returns: List of available authentication methods """ return list(self._authenticators.keys()) def is_method_available(self, method: AuthMethod) -> bool: """Check if authentication method is available. Args: method: Authentication method to check Returns: True if method is available """ return method in self._authenticators # Convenience methods for specific authenticators def get_api_key_authenticator(self) -> Optional[APIKeyAuthenticator]: """Get API key authenticator. Returns: API key authenticator or None if not available """ return self._authenticators.get(AuthMethod.API_KEY) def get_jwt_authenticator(self) -> Optional[JWTAuthenticator]: """Get JWT authenticator. Returns: JWT authenticator or None if not available """ return self._authenticators.get(AuthMethod.JWT) def get_oauth2_authenticator(self) -> Optional[OAuth2Authenticator]: """Get OAuth2 authenticator. Returns: OAuth2 authenticator or None if not available """ return self._authenticators.get(AuthMethod.OAUTH2) def get_api_key_authorizer(self) -> Optional[APIKeyAuthorizer]: """Get API key authorizer. Returns: API key authorizer or None if not available """ return self._authorizers.get(AuthMethod.API_KEY) def get_jwt_authorizer(self) -> Optional[JWTAuthorizer]: """Get JWT authorizer. Returns: JWT authorizer or None if not available """ return self._authorizers.get(AuthMethod.JWT) def get_oauth2_authorizer(self) -> Optional[OAuth2Authorizer]: """Get OAuth2 authorizer. Returns: OAuth2 authorizer or None if not available """ return self._authorizers.get(AuthMethod.OAUTH2)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/funky1688/AnyDocs-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server