Skip to main content
Glama

AnyDocs MCP Server

by funky1688
base.pyโ€ข12.9 kB
#!/usr/bin/env python3 """ Authentication Base Classes Base classes and interfaces for authentication and authorization. """ from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Union from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum class UserRole(str, Enum): """User roles for authorization.""" ADMIN = "admin" DOC_ADMIN = "doc_admin" API_USER = "api_user" READONLY = "readonly" class Permission(str, Enum): """System permissions.""" # System administration SYSTEM_ADMIN = "system:admin" SYSTEM_CONFIG = "system:config" SYSTEM_MONITOR = "system:monitor" # Documentation management DOC_READ = "doc:read" DOC_WRITE = "doc:write" DOC_DELETE = "doc:delete" DOC_CONFIG = "doc:config" # User management USER_READ = "user:read" USER_WRITE = "user:write" USER_DELETE = "user:delete" # API access API_READ = "api:read" API_WRITE = "api:write" @dataclass class User: """User information.""" id: str username: str email: Optional[str] = None role: UserRole = UserRole.API_USER permissions: List[Permission] = None is_active: bool = True created_at: Optional[datetime] = None last_login: Optional[datetime] = None metadata: Dict[str, Any] = None def __post_init__(self): """Initialize default values.""" if self.permissions is None: self.permissions = self._get_default_permissions() if self.metadata is None: self.metadata = {} if self.created_at is None: self.created_at = datetime.utcnow() def _get_default_permissions(self) -> List[Permission]: """Get default permissions based on user role.""" role_permissions = { UserRole.ADMIN: [ Permission.SYSTEM_ADMIN, Permission.SYSTEM_CONFIG, Permission.SYSTEM_MONITOR, Permission.DOC_READ, Permission.DOC_WRITE, Permission.DOC_DELETE, Permission.DOC_CONFIG, Permission.USER_READ, Permission.USER_WRITE, Permission.USER_DELETE, Permission.API_READ, Permission.API_WRITE, ], UserRole.DOC_ADMIN: [ Permission.DOC_READ, Permission.DOC_WRITE, Permission.DOC_DELETE, Permission.DOC_CONFIG, Permission.API_READ, Permission.API_WRITE, ], UserRole.API_USER: [ Permission.DOC_READ, Permission.API_READ, ], UserRole.READONLY: [ Permission.DOC_READ, Permission.API_READ, ], } return role_permissions.get(self.role, []) def has_permission(self, permission: Permission) -> bool: """Check if user has a specific permission.""" return permission in self.permissions def has_any_permission(self, permissions: List[Permission]) -> bool: """Check if user has any of the specified permissions.""" return any(perm in self.permissions for perm in permissions) def has_all_permissions(self, permissions: List[Permission]) -> bool: """Check if user has all of the specified permissions.""" return all(perm in self.permissions for perm in permissions) def add_permission(self, permission: Permission) -> None: """Add a permission to the user.""" if permission not in self.permissions: self.permissions.append(permission) def remove_permission(self, permission: Permission) -> None: """Remove a permission from the user.""" if permission in self.permissions: self.permissions.remove(permission) def update_last_login(self) -> None: """Update the last login timestamp.""" self.last_login = datetime.utcnow() @dataclass class AuthToken: """Authentication token information.""" token: str token_type: str = "bearer" expires_at: Optional[datetime] = None user_id: Optional[str] = None scopes: List[str] = None metadata: Dict[str, Any] = None def __post_init__(self): """Initialize default values.""" if self.scopes is None: self.scopes = [] if self.metadata is None: self.metadata = {} def is_expired(self) -> bool: """Check if the token is expired.""" if self.expires_at is None: return False return datetime.utcnow() > self.expires_at def is_valid(self) -> bool: """Check if the token is valid (not expired and has token value).""" return bool(self.token) and not self.is_expired() def time_until_expiry(self) -> Optional[timedelta]: """Get time until token expires.""" if self.expires_at is None: return None return self.expires_at - datetime.utcnow() class AuthenticationError(Exception): """Authentication related errors.""" pass class AuthorizationError(Exception): """Authorization related errors.""" pass class TokenExpiredError(AuthenticationError): """Token has expired.""" pass class InvalidTokenError(AuthenticationError): """Invalid token format or signature.""" pass class InsufficientPermissionsError(AuthorizationError): """User lacks required permissions.""" pass class BaseAuthenticator(ABC): """Base class for authentication providers.""" @abstractmethod async def authenticate(self, credentials: Dict[str, Any]) -> Optional[User]: """Authenticate user with provided credentials. Args: credentials: Authentication credentials (varies by provider) Returns: User object if authentication successful, None otherwise Raises: AuthenticationError: If authentication fails """ pass @abstractmethod async def validate_token(self, token: str) -> Optional[AuthToken]: """Validate an authentication token. Args: token: Token to validate Returns: AuthToken object if valid, None otherwise Raises: InvalidTokenError: If token format is invalid TokenExpiredError: If token has expired """ pass @abstractmethod async def create_token(self, user: User, **kwargs) -> AuthToken: """Create an authentication token for a user. Args: user: User to create token for **kwargs: Additional token parameters Returns: AuthToken object Raises: AuthenticationError: If token creation fails """ pass @abstractmethod async def revoke_token(self, token: str) -> bool: """Revoke an authentication token. Args: token: Token to revoke Returns: True if token was revoked, False if not found """ pass async def refresh_token(self, token: str) -> Optional[AuthToken]: """Refresh an authentication token. Args: token: Token to refresh Returns: New AuthToken if refresh successful, None otherwise Raises: AuthenticationError: If refresh fails """ # Default implementation - subclasses can override auth_token = await self.validate_token(token) if not auth_token or not auth_token.user_id: return None # Get user and create new token user = await self.get_user(auth_token.user_id) if not user: return None return await self.create_token(user) @abstractmethod async def get_user(self, user_id: str) -> Optional[User]: """Get user by ID. Args: user_id: User identifier Returns: User object if found, None otherwise """ pass async def get_user_by_token(self, token: str) -> Optional[User]: """Get user associated with a token. Args: token: Authentication token Returns: User object if token is valid and user exists, None otherwise """ auth_token = await self.validate_token(token) if not auth_token or not auth_token.user_id: return None return await self.get_user(auth_token.user_id) class BaseAuthorizer(ABC): """Base class for authorization providers.""" @abstractmethod async def check_permission(self, user: User, permission: Permission, resource: Optional[str] = None) -> bool: """Check if user has permission for a resource. Args: user: User to check permission: Required permission resource: Optional resource identifier Returns: True if user has permission, False otherwise """ pass async def check_permissions(self, user: User, permissions: List[Permission], resource: Optional[str] = None) -> bool: """Check if user has all specified permissions. Args: user: User to check permissions: List of required permissions resource: Optional resource identifier Returns: True if user has all permissions, False otherwise """ for permission in permissions: if not await self.check_permission(user, permission, resource): return False return True async def check_any_permission(self, user: User, permissions: List[Permission], resource: Optional[str] = None) -> bool: """Check if user has any of the specified permissions. Args: user: User to check permissions: List of permissions (user needs at least one) resource: Optional resource identifier Returns: True if user has any permission, False otherwise """ for permission in permissions: if await self.check_permission(user, permission, resource): return True return False async def require_permission(self, user: User, permission: Permission, resource: Optional[str] = None) -> None: """Require user to have a specific permission. Args: user: User to check permission: Required permission resource: Optional resource identifier Raises: InsufficientPermissionsError: If user lacks permission """ if not await self.check_permission(user, permission, resource): raise InsufficientPermissionsError(f"User {user.username} lacks permission {permission.value}") async def require_permissions(self, user: User, permissions: List[Permission], resource: Optional[str] = None) -> None: """Require user to have all specified permissions. Args: user: User to check permissions: List of required permissions resource: Optional resource identifier Raises: InsufficientPermissionsError: If user lacks any permission """ missing_permissions = [] for permission in permissions: if not await self.check_permission(user, permission, resource): missing_permissions.append(permission.value) if missing_permissions: raise InsufficientPermissionsError( f"User {user.username} lacks permissions: {', '.join(missing_permissions)}" ) async def require_any_permission(self, user: User, permissions: List[Permission], resource: Optional[str] = None) -> None: """Require user to have at least one of the specified permissions. Args: user: User to check permissions: List of permissions (user needs at least one) resource: Optional resource identifier Raises: InsufficientPermissionsError: If user lacks all permissions """ if not await self.check_any_permission(user, permissions, resource): permission_names = [p.value for p in permissions] raise InsufficientPermissionsError( f"User {user.username} lacks any of the required permissions: {', '.join(permission_names)}" )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/funky1688/AnyDocs-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server