Skip to main content
Glama
by cycodehq
cycode_token_based_client.pyโ€ข3.78 kB
from threading import Lock from typing import Optional import arrow from requests import Response from cycode.cli.user_settings.credentials_manager import CredentialsManager from cycode.cli.user_settings.jwt_creator import JwtCreator from cycode.cyclient.cycode_client import CycodeClient _NGINX_PLAIN_ERRORS = [ b'Invalid JWT Token', b'JWT Token Needed', b'JWT Token validation failed', ] class CycodeTokenBasedClient(CycodeClient): """Send requests with JWT.""" def __init__(self, client_id: str, client_secret: str) -> None: super().__init__() self.client_secret = client_secret self.client_id = client_id self._credentials_manager = CredentialsManager() # load cached access token access_token, expires_in, creator = self._credentials_manager.get_access_token() self._access_token = self._expires_in = None if creator == JwtCreator.create(client_id, client_secret): # we must be sure that cached access token is created using the same client id and client secret. # because client id and client secret could be passed via command, via env vars or via config file. # we must not use cached access token if client id or client secret was changed. self._access_token = access_token self._expires_in = arrow.get(expires_in) if expires_in else None self._lock = Lock() def get_access_token(self) -> str: with self._lock: self.refresh_access_token_if_needed() return self._access_token def invalidate_access_token(self, in_storage: bool = False) -> None: self._access_token = None self._expires_in = None if in_storage: self._credentials_manager.update_access_token(None, None, None) def refresh_access_token_if_needed(self) -> None: if self._access_token is None or self._expires_in is None or arrow.utcnow() >= self._expires_in: self.refresh_access_token() def refresh_access_token(self) -> None: auth_response = self.post( url_path='api/v1/auth/api-token', body={'clientId': self.client_id, 'secret': self.client_secret}, without_auth=True, hide_response_content_log=True, ) auth_response_data = auth_response.json() self._access_token = auth_response_data['token'] self._expires_in = arrow.utcnow().shift(seconds=auth_response_data['expires_in'] * 0.8) jwt_creator = JwtCreator.create(self.client_id, self.client_secret) self._credentials_manager.update_access_token(self._access_token, self._expires_in.timestamp(), jwt_creator) def get_request_headers(self, additional_headers: Optional[dict] = None, without_auth: bool = False) -> dict: headers = super().get_request_headers(additional_headers=additional_headers) if not without_auth: headers = self._add_auth_header(headers) return headers def _add_auth_header(self, headers: dict) -> dict: headers['Authorization'] = f'Bearer {self.get_access_token()}' return headers def _execute( self, *args, **kwargs, ) -> Response: response = super()._execute(*args, **kwargs) # backend returns 200 and plain text. no way to catch it with .raise_for_status() nginx_error_response = any(response.content.startswith(plain_error) for plain_error in _NGINX_PLAIN_ERRORS) if response.status_code == 200 and nginx_error_response: # if cached token is invalid, try to refresh it and retry the request self.refresh_access_token() response = super()._execute(*args, **kwargs) return response

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cycodehq/cycode-cli'

If you have feedback or need assistance with the MCP directory API, please join our Discord server