login_to_adeu_cloud
Logs the user into the Adeu Cloud backend via secure browser authentication.
Instructions
Logs the user into the Adeu Cloud backend. Securely opens a browser window for authentication.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- The `login_to_adeu_cloud` tool handler function. Decorated with @tool, it calls DesktopAuthManager.ensure_authenticated() to get an API key, then verifies it with the backend at /api/v1/auth/me, returning the authenticated user's email.
@tool( description="Logs the user into the Adeu Cloud backend. Securely opens a browser window for authentication.", annotations={"openWorldHint": True}, ) async def login_to_adeu_cloud(ctx: Context) -> str: await ctx.info("Initiating cloud authentication workflow") try: await ctx.debug("Checking DesktopAuthManager for API key") api_key = DesktopAuthManager.ensure_authenticated() if not api_key: await ctx.error("Failed to obtain API key from login flow") raise ToolError("Error: Could not obtain API key from the login flow.") url = f"{BACKEND_URL}/api/v1/auth/me" req = urllib.request.Request( url, headers={ "Authorization": f"Bearer {api_key}", "Accept": "application/json", }, ) try: await ctx.debug("Verifying token with backend", extra={"url": url}) with urllib.request.urlopen(req) as response: data = json.loads(response.read().decode("utf-8")) email = data.get("email", "Unknown Email") await ctx.info( "Login successful", extra={"email": email}, ) return f"Login successful! Connected to Adeu Cloud as: {email}." except urllib.error.HTTPError as e: if e.code == 401: await ctx.warning("Session expired or invalid token. Clearing API key.") DesktopAuthManager.clear_api_key() raise ToolError( "Your previous session expired. The stale key has been cleared. " "Please call the `login_to_adeu_cloud` tool ONE MORE TIME to log in fresh." ) from e await ctx.error( "HTTP Error verifying login", extra={"status_code": e.code, "reason": e.reason}, ) raise ToolError(f"HTTP Error verifying login: {e.code} - {e.reason}") from e except Exception as e: await ctx.error("Exception during login process", extra={"error": str(e)}) raise ToolError(f"Error during login process: {str(e)}") from e - The @tool decorator defines the tool's description ('Logs the user into the Adeu Cloud backend...') and annotation (openWorldHint). No explicit input/output schema is needed beyond the Context parameter.
@tool( description="Logs the user into the Adeu Cloud backend. Securely opens a browser window for authentication.", annotations={"openWorldHint": True}, ) - src/adeu/mcp_components/tools/auth.py:14-17 (registration)The tool is registered via the @tool() decorator from fastmcp, which auto-registers the function as an MCP tool named 'login_to_adeu_cloud' (derived from the function name).
@tool( description="Logs the user into the Adeu Cloud backend. Securely opens a browser window for authentication.", annotations={"openWorldHint": True}, ) - DesktopAuthManager class handles keychain storage (get/set/clear_api_key), interactive browser-based authentication (authenticate_interactive), and ensure_authenticated which returns existing key or triggers auth flow.
class DesktopAuthManager: """Manages the authentication lifecycle for the local MCP server.""" @staticmethod def get_api_key() -> str | None: """Retrieve the API key from the OS Keychain.""" try: return keyring.get_password(KEYRING_SERVICE_NAME, KEYRING_ACCOUNT_NAME) except Exception as e: logger.error(f"Failed to access keychain: {e}") return None @staticmethod def set_api_key(api_key: str) -> None: """Store the API key securely in the OS Keychain.""" try: keyring.set_password(KEYRING_SERVICE_NAME, KEYRING_ACCOUNT_NAME, api_key) except Exception as e: logger.error(f"Failed to save to keychain: {e}") @staticmethod def clear_api_key() -> None: """Remove the API key from the OS Keychain.""" try: keyring.delete_password(KEYRING_SERVICE_NAME, KEYRING_ACCOUNT_NAME) except Exception: pass # Ignore if it doesn't exist @classmethod def authenticate_interactive(cls) -> str: """ Spins up a local server, opens the browser to log in, and waits for the API key. Returns the raw API key upon success. """ # Create an ephemeral TCP server (port 0 lets the OS pick a free port) with AuthServer(("localhost", 0), AuthCallbackHandler) as httpd: # We access index 1 because server_address for IPv4 is a (host, port) tuple port = httpd.server_address[1] # Direct the user to the React UI login page instead of the backend directly auth_url = f"{FRONTEND_URL}/login?desktop_port={port}" logger.info(f"Opening browser for authentication: {auth_url}") # Open the user's default web browser webbrowser.open(auth_url) # Block and wait for the callback to hit the local server httpd.serve_forever() api_key = httpd.api_key if not api_key: raise RuntimeError("Authentication failed: No API key received.") # Save the key securely cls.set_api_key(api_key) logger.info("Successfully stored Adeu API Key in OS Keychain.") return api_key @classmethod def ensure_authenticated(cls) -> str: """ Returns the existing API key, or triggers interactive auth if missing. """ api_key = cls.get_api_key() if api_key: return api_key logger.info("No API key found in Keychain. Starting interactive authentication...") return cls.authenticate_interactive() - get_cloud_auth_token() is a dependency helper that checks for an existing API key and raises a ToolError referencing the login_to_adeu_cloud tool if not authenticated.
def get_cloud_auth_token() -> str: """Dependency to enforce cloud authentication before tool execution.""" api_key = DesktopAuthManager.get_api_key() if not api_key: raise ToolError( "Authentication Required: You are not logged in. " "Please call the `login_to_adeu_cloud` tool first to authenticate, " "then try this task again." ) return api_key