Skip to main content
Glama
enkryptai

Enkrypt AI Secure MCP Gateway

Official
by enkryptai
enkrypt_provider.py13 kB
"""Enkrypt authentication provider.""" import json import os import time from typing import Any, Dict, List, Optional import requests from secure_mcp_gateway.plugins.auth.base import ( AuthCredentials, AuthMethod, AuthProvider, AuthResult, AuthStatus, ) from secure_mcp_gateway.utils import ( CONFIG_PATH, DOCKER_CONFIG_PATH, is_docker, logger, mask_key, ) class EnkryptAuthProvider(AuthProvider): """ Enkrypt authentication provider. Authenticates users using Enkrypt's API key and project-based system. """ def __init__( self, api_key: str = None, base_url: str = "https://api.enkryptai.com", use_remote_config: bool = False, timeout: int = 30, ): """ Initialize the Enkrypt auth provider. Args: api_key: Enkrypt API key for remote authentication base_url: Base URL for Enkrypt API use_remote_config: Whether to fetch config from remote API timeout: Request timeout in seconds """ self.api_key = api_key self.base_url = base_url self.use_remote_config = use_remote_config self.timeout = timeout self.auth_url = f"{base_url}/mcp-gateway/get-gateway" logger.info(f"Enkrypt auth provider initialized (remote={use_remote_config})") def get_name(self) -> str: """Get provider name.""" return "enkrypt" def get_version(self) -> str: """Get provider version.""" return "1.0.0" def get_supported_methods(self) -> List[AuthMethod]: """Get supported authentication methods.""" return [AuthMethod.API_KEY] def validate_config(self, config: Dict[str, Any]) -> bool: """Validate provider configuration.""" if self.use_remote_config and not self.api_key: return False return True def get_required_config_keys(self) -> List[str]: """Get required configuration keys.""" if self.use_remote_config: return ["api_key", "base_url"] return [] async def authenticate(self, credentials: AuthCredentials) -> AuthResult: """ Authenticate user with Enkrypt credentials. Args: credentials: Authentication credentials containing gateway_key, project_id, user_id Returns: AuthResult: Authentication result """ try: logger.info("[EnkryptAuthProvider] Starting authentication") # Extract credentials gateway_key = credentials.gateway_key or credentials.api_key project_id = credentials.project_id user_id = credentials.user_id # Validate required credentials if not gateway_key: return AuthResult( status=AuthStatus.INVALID_CREDENTIALS, authenticated=False, message="Gateway key is required", error="Missing gateway_key", ) # Try local configuration first local_config = await self._get_local_config( gateway_key, project_id, user_id ) if local_config: logger.info( f"[EnkryptAuthProvider] Local authentication successful for user: {user_id}" ) return AuthResult( status=AuthStatus.SUCCESS, authenticated=True, message="Authentication successful (local)", user_id=local_config.get("user_id"), project_id=local_config.get("project_id"), session_id=local_config.get("id"), gateway_config=local_config, mcp_config=local_config.get("mcp_config", []), metadata={ "source": "local", "config_id": local_config.get("mcp_config_id"), }, ) # Fall back to remote authentication if enabled if self.use_remote_config: return await self._authenticate_remote( gateway_key, project_id, user_id, credentials ) # No local config and remote disabled return AuthResult( status=AuthStatus.INVALID_CREDENTIALS, authenticated=False, message="No configuration found for provided credentials", error="Configuration not found", ) except Exception as e: logger.error(f"[EnkryptAuthProvider] Authentication error: {e}") return AuthResult( status=AuthStatus.ERROR, authenticated=False, message="Authentication failed", error=str(e), ) async def _authenticate_remote( self, gateway_key: str, project_id: str, user_id: str, credentials: AuthCredentials, ) -> AuthResult: """ Authenticate using remote Enkrypt API. Args: gateway_key: Gateway API key project_id: Project ID user_id: User ID credentials: Full credentials object Returns: AuthResult: Authentication result """ logger.info( f"[EnkryptAuthProvider] Attempting remote authentication for gateway_key: {mask_key(gateway_key)}" ) try: # Get mcp_config_id from local config first local_config = await self._get_local_config( gateway_key, project_id, user_id ) mcp_config_id = local_config.get("mcp_config_id") if local_config else None # Get configurable timeout from TimeoutManager import aiohttp from secure_mcp_gateway.services.timeout import get_timeout_manager timeout_manager = get_timeout_manager() timeout_value = timeout_manager.get_timeout("auth") # Use aiohttp for async HTTP request async with aiohttp.ClientSession() as session: async with session.post( self.auth_url, json={ "gateway_key": gateway_key, "project_id": project_id, "user_id": user_id, "mcp_config_id": mcp_config_id, }, headers={ "X-Enkrypt-Gateway-Key": gateway_key, "X-Enkrypt-API-Key": self.api_key, }, timeout=aiohttp.ClientTimeout(total=timeout_value), ) as response: if response.status != 200: return AuthResult( status=AuthStatus.INVALID_CREDENTIALS, authenticated=False, message="Invalid credentials or unauthorized", error=f"HTTP {response.status}", ) gateway_config = await response.json() if not gateway_config: return AuthResult( status=AuthStatus.INVALID_CREDENTIALS, authenticated=False, message="No configuration found", error="Empty response from server", ) logger.info("[EnkryptAuthProvider] Remote authentication successful") return AuthResult( status=AuthStatus.SUCCESS, authenticated=True, message="Authentication successful (remote)", user_id=gateway_config.get("user_id"), project_id=gateway_config.get("project_id"), session_id=gateway_config.get("id"), gateway_config=gateway_config, mcp_config=gateway_config.get("mcp_config", []), metadata={"source": "remote"}, ) except requests.Timeout: return AuthResult( status=AuthStatus.ERROR, authenticated=False, message="Authentication timeout", error="Request timeout", ) except Exception as e: return AuthResult( status=AuthStatus.ERROR, authenticated=False, message="Remote authentication failed", error=str(e), ) async def _get_local_config( self, gateway_key: str, project_id: str = None, user_id: str = None ) -> Optional[Dict[str, Any]]: """ Get configuration from local config file. Args: gateway_key: Gateway API key project_id: Project ID user_id: User ID Returns: Optional[Dict[str, Any]]: Configuration if found, None otherwise """ running_in_docker = is_docker() config_path = DOCKER_CONFIG_PATH if running_in_docker else CONFIG_PATH if not os.path.exists(config_path): logger.debug(f"[EnkryptAuthProvider] Config file not found: {config_path}") return None try: # Use asyncio.to_thread for file I/O in async context import asyncio def _load_config(): with open(config_path, encoding="utf-8") as f: return json.load(f) json_config = await asyncio.to_thread(_load_config) # Check if gateway_key exists in apikeys apikeys = json_config.get("apikeys", {}) if gateway_key not in apikeys: logger.debug("[EnkryptAuthProvider] Gateway key not found in config") return None key_info = apikeys[gateway_key] config_project_id = key_info.get("project_id") config_user_id = key_info.get("user_id") # Use config IDs if not provided if not project_id: project_id = config_project_id if not user_id: user_id = config_user_id # Validate IDs match if project_id != config_project_id or user_id != config_user_id: logger.debug("[EnkryptAuthProvider] ID mismatch in config") return None # Get project and user configurations projects = json_config.get("projects", {}) users = json_config.get("users", {}) if project_id not in projects or user_id not in users: logger.debug( "[EnkryptAuthProvider] Project or user not found in config" ) return None project_config = projects[project_id] user_config = users[user_id] mcp_config_id = project_config.get("mcp_config_id") if not mcp_config_id: logger.debug("[EnkryptAuthProvider] No MCP config ID found") return None # Get MCP configuration mcp_configs = json_config.get("mcp_configs", {}) if mcp_config_id not in mcp_configs: logger.debug("[EnkryptAuthProvider] MCP config not found") return None mcp_config_entry = mcp_configs[mcp_config_id] return { "id": f"{user_id}_{project_id}_{mcp_config_id}", "project_name": project_config.get("project_name", "not_provided"), "project_id": project_id, "user_id": user_id, "email": user_config.get("email", "not_provided"), "mcp_config": mcp_config_entry.get("mcp_config", []), "mcp_config_id": mcp_config_id, } except Exception as e: logger.error(f"[EnkryptAuthProvider] Error reading local config: {e}") return None async def validate_session(self, session_id: str) -> bool: """ Validate if a session is still valid. Args: session_id: Session ID to validate Returns: bool: True if valid, False otherwise """ # For Enkrypt, sessions are validated by checking if they exist # and haven't expired (handled by session manager) return True async def refresh_authentication( self, session_id: str, credentials: AuthCredentials ) -> AuthResult: """ Refresh authentication (not needed for API key auth). Args: session_id: Existing session ID credentials: Refresh credentials Returns: AuthResult: New authentication result """ # Re-authenticate with same credentials return await self.authenticate(credentials)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/enkryptai/secure-mcp-gateway'

If you have feedback or need assistance with the MCP directory API, please join our Discord server