Skip to main content
Glama

AnyDocs MCP Server

by funky1688
api_key.pyโ€ข13.4 kB
#!/usr/bin/env python3 """ API Key Authentication Implementation of API key-based authentication. """ import secrets import hashlib import hmac from typing import Any, Dict, List, Optional from datetime import datetime, timedelta import structlog from .base import ( BaseAuthenticator, BaseAuthorizer, User, AuthToken, Permission, UserRole, AuthenticationError, InvalidTokenError, TokenExpiredError, ) logger = structlog.get_logger(__name__) class APIKey: """API Key model.""" def __init__( self, key_id: str, key_hash: str, user_id: str, name: str, scopes: List[str] = None, expires_at: Optional[datetime] = None, is_active: bool = True, created_at: Optional[datetime] = None, last_used: Optional[datetime] = None, metadata: Dict[str, Any] = None, ): self.key_id = key_id self.key_hash = key_hash self.user_id = user_id self.name = name self.scopes = scopes or [] self.expires_at = expires_at self.is_active = is_active self.created_at = created_at or datetime.utcnow() self.last_used = last_used self.metadata = metadata or {} def is_expired(self) -> bool: """Check if the API key is expired.""" if self.expires_at is None: return False return datetime.utcnow() > self.expires_at def is_valid(self) -> bool: """Check if the API key is valid.""" return self.is_active and not self.is_expired() def update_last_used(self) -> None: """Update the last used timestamp.""" self.last_used = datetime.utcnow() def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" return { "key_id": self.key_id, "user_id": self.user_id, "name": self.name, "scopes": self.scopes, "expires_at": self.expires_at.isoformat() if self.expires_at else None, "is_active": self.is_active, "created_at": self.created_at.isoformat(), "last_used": self.last_used.isoformat() if self.last_used else None, "metadata": self.metadata, } class APIKeyAuthenticator(BaseAuthenticator): """API Key-based authenticator.""" def __init__(self, secret_key: str, default_expiry_days: int = 365): """Initialize API key authenticator. Args: secret_key: Secret key for HMAC signing default_expiry_days: Default expiry period for new keys """ self.secret_key = secret_key.encode() self.default_expiry_days = default_expiry_days self._api_keys: Dict[str, APIKey] = {} # key_id -> APIKey self._users: Dict[str, User] = {} # user_id -> User logger.info("API Key authenticator initialized", default_expiry_days=default_expiry_days) def _generate_key_pair(self) -> tuple[str, str]: """Generate a new API key pair (key_id, secret). Returns: Tuple of (key_id, secret) """ # Generate key ID (public identifier) key_id = f"ak_{secrets.token_urlsafe(16)}" # Generate secret (private key) secret = secrets.token_urlsafe(32) return key_id, secret def _hash_secret(self, secret: str) -> str: """Hash API key secret using HMAC. Args: secret: Raw secret to hash Returns: Hashed secret """ return hmac.new( self.secret_key, secret.encode(), hashlib.sha256 ).hexdigest() def _verify_secret(self, secret: str, key_hash: str) -> bool: """Verify API key secret against hash. Args: secret: Raw secret to verify key_hash: Stored hash to verify against Returns: True if secret matches hash """ computed_hash = self._hash_secret(secret) return hmac.compare_digest(computed_hash, key_hash) async def create_api_key( self, user: User, name: str, scopes: List[str] = None, expires_in_days: Optional[int] = None, metadata: Dict[str, Any] = None, ) -> tuple[APIKey, str]: """Create a new API key for a user. Args: user: User to create key for name: Human-readable name for the key scopes: List of scopes for the key expires_in_days: Expiry period in days (None for no expiry) metadata: Additional metadata Returns: Tuple of (APIKey object, raw secret) """ key_id, secret = self._generate_key_pair() key_hash = self._hash_secret(secret) # Calculate expiry expires_at = None if expires_in_days is not None: expires_at = datetime.utcnow() + timedelta(days=expires_in_days) elif self.default_expiry_days > 0: expires_at = datetime.utcnow() + timedelta(days=self.default_expiry_days) # Create API key api_key = APIKey( key_id=key_id, key_hash=key_hash, user_id=user.id, name=name, scopes=scopes or [], expires_at=expires_at, metadata=metadata or {}, ) # Store API key and user self._api_keys[key_id] = api_key self._users[user.id] = user logger.info("API key created", key_id=key_id, user_id=user.id, name=name, expires_at=expires_at) return api_key, secret async def authenticate(self, credentials: Dict[str, Any]) -> Optional[User]: """Authenticate using API key credentials. Args: credentials: Dict with 'api_key' containing the full key Returns: User object if authentication successful """ api_key = credentials.get("api_key") if not api_key: raise AuthenticationError("API key is required") # Parse API key format: key_id.secret try: key_id, secret = api_key.split(".", 1) except ValueError: raise AuthenticationError("Invalid API key format") # Find API key stored_key = self._api_keys.get(key_id) if not stored_key: logger.warning("API key not found", key_id=key_id) raise AuthenticationError("Invalid API key") # Verify secret if not self._verify_secret(secret, stored_key.key_hash): logger.warning("API key secret verification failed", key_id=key_id) raise AuthenticationError("Invalid API key") # Check if key is valid if not stored_key.is_valid(): logger.warning("API key is invalid", key_id=key_id, is_active=stored_key.is_active, is_expired=stored_key.is_expired()) raise AuthenticationError("API key is expired or inactive") # Update last used stored_key.update_last_used() # Get user user = self._users.get(stored_key.user_id) if not user: logger.error("User not found for API key", key_id=key_id, user_id=stored_key.user_id) raise AuthenticationError("User not found") # Update user last login user.update_last_login() logger.info("API key authentication successful", key_id=key_id, user_id=user.id, username=user.username) return user async def validate_token(self, token: str) -> Optional[AuthToken]: """Validate API key token. Args: token: API key token to validate Returns: AuthToken object if valid """ try: # Parse API key format key_id, secret = token.split(".", 1) except ValueError: raise InvalidTokenError("Invalid API key format") # Find API key stored_key = self._api_keys.get(key_id) if not stored_key: raise InvalidTokenError("API key not found") # Verify secret if not self._verify_secret(secret, stored_key.key_hash): raise InvalidTokenError("Invalid API key secret") # Check expiry if stored_key.is_expired(): raise TokenExpiredError("API key has expired") # Check if active if not stored_key.is_active: raise InvalidTokenError("API key is inactive") # Update last used stored_key.update_last_used() return AuthToken( token=token, token_type="api_key", expires_at=stored_key.expires_at, user_id=stored_key.user_id, scopes=stored_key.scopes, metadata={"key_id": key_id, "key_name": stored_key.name}, ) async def create_token(self, user: User, **kwargs) -> AuthToken: """Create API key token for user. Args: user: User to create token for **kwargs: Additional parameters (name, scopes, expires_in_days) Returns: AuthToken object """ name = kwargs.get("name", f"Token for {user.username}") scopes = kwargs.get("scopes", []) expires_in_days = kwargs.get("expires_in_days") api_key, secret = await self.create_api_key( user=user, name=name, scopes=scopes, expires_in_days=expires_in_days, ) # Combine key_id and secret full_key = f"{api_key.key_id}.{secret}" return AuthToken( token=full_key, token_type="api_key", expires_at=api_key.expires_at, user_id=user.id, scopes=api_key.scopes, metadata={"key_id": api_key.key_id, "key_name": api_key.name}, ) async def revoke_token(self, token: str) -> bool: """Revoke API key token. Args: token: API key token to revoke Returns: True if token was revoked """ try: key_id, _ = token.split(".", 1) except ValueError: return False api_key = self._api_keys.get(key_id) if not api_key: return False api_key.is_active = False logger.info("API key revoked", key_id=key_id, user_id=api_key.user_id) return True async def get_user(self, user_id: str) -> Optional[User]: """Get user by ID. Args: user_id: User identifier Returns: User object if found """ return self._users.get(user_id) async def list_api_keys(self, user_id: str) -> List[APIKey]: """List API keys for a user. Args: user_id: User identifier Returns: List of API keys for the user """ return [key for key in self._api_keys.values() if key.user_id == user_id] async def get_api_key(self, key_id: str) -> Optional[APIKey]: """Get API key by ID. Args: key_id: API key identifier Returns: APIKey object if found """ return self._api_keys.get(key_id) async def delete_api_key(self, key_id: str) -> bool: """Delete API key. Args: key_id: API key identifier Returns: True if key was deleted """ if key_id in self._api_keys: api_key = self._api_keys[key_id] del self._api_keys[key_id] logger.info("API key deleted", key_id=key_id, user_id=api_key.user_id) return True return False async def add_user(self, user: User) -> None: """Add a user to the authenticator. Args: user: User to add """ self._users[user.id] = user logger.info("User added to API key authenticator", user_id=user.id, username=user.username) class APIKeyAuthorizer(BaseAuthorizer): """Simple role-based authorizer for API keys.""" async def check_permission(self, user: User, permission: Permission, resource: Optional[str] = None) -> bool: """Check if user has permission. Args: user: User to check permission: Required permission resource: Optional resource identifier (not used in simple implementation) Returns: True if user has permission """ return user.has_permission(permission)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/funky1688/AnyDocs-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server