Skip to main content
Glama
si_lambda.py9.26 kB
# Helpers for SI lambdas from abc import abstractmethod import boto3 import json import os from datetime import datetime, timezone from typing import Iterable, NotRequired, Optional, TypedDict, cast, overload from si_redshift import Redshift from si_lago_api import LagoApi from si_auth_api import SiAuthApi from si_posthog_api import PosthogApi from si_types import SqlTimestamp, WorkspaceId, timestamp_to_sql_timestamp import logging class SiLambdaEnv(TypedDict): """Log level for the logger. Defaults to INFO""" SI_LOG_LEVEL: NotRequired[str] """Enable to make this a dry-run that doesn't actually write anything anywhere""" SI_DRY_RUN: NotRequired[bool] """Secret ID or ARN to a Lambda containing authentication information for Redshift""" REDSHIFT_LAMBDA_ACCESS: NotRequired[str] """Workgroup to connect to. Defaults to platform-app-datastore""" REDSHIFT_LAMBDA_WORKGROUP_NAME: NotRequired[str] """Database to connect to. Defaults to data""" REDSHIFT_LAMBDA_DATABASE: NotRequired[str] """Lago API URL. Defaults to https://api.getlago.com/api""" LAGO_API_URL: NotRequired[str] """The Lago API token""" LAGO_API_TOKEN: NotRequired[str] """ARN to an AWS secret containing the Lago API token""" LAGO_API_TOKEN_ARN: NotRequired[str] """Posthog API URL. Defaults to https://us.i.posthog.com""" POSTHOG_API_URL: NotRequired[str] """Posthog project API key.""" POSTHOG_PROJECT_API_KEY: NotRequired[str] """Auth API URL. Defaults to https://auth-api.systeminit.com""" AUTH_API_URL: NotRequired[str] BILLING_USER_EMAIL: NotRequired[str] BIlLING_USER_PASSWORD_ARN: NotRequired[str] BILLING_USER_WORKSPACE_ID: NotRequired[str] # Billing metric code. Defaults to resource-hours SI_BILLING_METRIC_CODE: NotRequired[str] # Cost per resource hour. Defaults to 0.007 SI_BILLING_COST_PER_RESOURCE_HOUR: NotRequired[float] class SiLambda: @classmethod def lambda_handler(cls, event: SiLambdaEnv = {}, _context=None): lambda_instance = cls(event) lambda_instance.run() def __init__(self, event: SiLambdaEnv, session = boto3.Session()): self.start_timestamp = datetime.now(timezone.utc) self.start_timestamp_sql = timestamp_to_sql_timestamp(self.start_timestamp) self.session = session self.event = event """Whether this is a dry-run that doesn't actually write anything anywhere""" self.dry_run = event.get("SI_DRY_RUN", False) self.billing_metric_code = event.get("SI_BILLING_METRIC_CODE", "resource-hours") self.billing_cost_per_resource_hour = event.get("SI_BILLING_COST_PER_RESOURCE_HOUR", 0.007) self._lago = None self._redshift = None self._auth_api = None self._posthog_api = None logging.getLogger().setLevel(self.getenv("SI_LOG_LEVEL", self.getenv("LOG_LEVEL", "INFO"))) if self.dry_run: logging.info("Running in dry-run mode") logging.info(self.start_timestamp_sql) @property def lago(self) -> LagoApi: """Get the Lago API for this lambda, configured from the lambda environment.""" if self._lago is None: lago_api_url = self.getenv("LAGO_API_URL", "https://api.getlago.com/api") lago_api_token = self.getenv("LAGO_API_TOKEN") if lago_api_token is None: secret = self.getenv_secret_value("LAGO_API_TOKEN_ARN") assert secret is not None, "LAGO_API_TOKEN or LAGO_API_TOKEN_ARN must be set" lago_api_token = json.loads(secret["SecretString"])["LAGO_API_TOKEN"] self._lago = LagoApi(lago_api_url, lago_api_token) return self._lago @property def redshift(self): """Get the Redshift API client, configured from the lambda environment.""" if self._redshift is None: secret = self.getenv_secret_value("LAMBDA_REDSHIFT_ACCESS") assert secret is not None, "LAMBDA_REDSHIFT_ACCESS must be set" workgroup_name = self.getenv("LAMBDA_REDSHIFT_WORKGROUP_NAME", "platform-app-datastore") database = self.getenv("LAMBDA_REDSHIFT_DATABASE", "data") self._redshift = Redshift( self.session, WorkgroupName=workgroup_name, Database=database, SecretArn=secret["ARN"], ) return self._redshift @property def auth_api(self): """Get the Auth API client, configured from the lambda environment """ if self._auth_api is None: auth_api_url = self.getenv("AUTH_API_URL", "https://auth-api.systeminit.com") assert auth_api_url is not None, "AUTH_API_URL must be set" billing_user_email = self.getenv("BILLING_USER_EMAIL") assert billing_user_email is not None, "BILLING_USER_EMAIL must be set" billing_user_password = self.getenv_secret_json("BILLING_USER_PASSWORD_ARN") assert billing_user_password is not None, "BILLING_USER_PASSWORD_ARN must be set" billing_user_workspace_id = cast(Optional[WorkspaceId], self.getenv("BILLING_USER_WORKSPACE_ID")) assert billing_user_workspace_id is not None, "BILLING_USER_WORKSPACE_ID must be set" self._auth_api = SiAuthApi.login(auth_api_url, billing_user_email, billing_user_password["BILLING_USER_PASWORD"], billing_user_workspace_id) return self._auth_api @property def posthog(self): """Get the Posthog API client, configured from the lambda environment """ if self._posthog_api is None: posthog_api_url = self.getenv("POSTHOG_API_URL", "https://us.i.posthog.com") posthog_project_api_key = self.getenv("POSTHOG_PROJECT_API_KEY") assert posthog_project_api_key is not None, "POSTHOG_PROJECT_API_KEY must be set" self._posthog_api = PosthogApi(posthog_api_url, posthog_project_api_key) return self._posthog_api @overload def getenv(self, key: str, default: str) -> str: ... @overload def getenv(self, key: str, default: Optional[str] = None) -> Optional[str]: ... def getenv(self, key: str, default: Optional[str] = None) -> Optional[str]: """Get an environment variable, overrideable by the event values.""" value = self.event.get(key) if value is not None: return value return os.getenv(key, default) def getenv_secret_json(self, key: str): secret = self.getenv_secret_value(key) if secret is None: return None return json.loads(secret["SecretString"]) def getenv_secret_value(self, key: str): secret_id = self.getenv(key) if secret_id is None: return None return self.get_secret_value(secret_id) def get_secret_value(self, secret_id: str): """Get a secret from an arn.""" secretsmanager = self.session.client(service_name="secretsmanager") return secretsmanager.get_secret_value(SecretId=secret_id) # Get the range of data that hasn't been uploaded yet for a given upload type, # starting from either the last upload or the beginning of time, and ending with the last # complete data we have. def get_upload_range(self, upload_type: str, batch_hours: int): # Start the upload where we last left off, or at the beginning of time return list(cast( Iterable[tuple[SqlTimestamp, SqlTimestamp, SqlTimestamp]], self.redshift.query_raw( f""" SELECT COALESCE(uploaded_to, DATE_TRUNC('hour', first_event)) AS batch_start, LEAST(DATEADD(HOUR, {batch_hours}, batch_start), last_complete_hour_end) AS batch_end, last_complete_hour_end FROM workspace_operations.workspace_update_events_summary LEFT OUTER JOIN workspace_operations.upload_progress ON upload_type = :upload_type """, upload_type=upload_type ) ))[0] # Update progress of a given upload type def update_upload_progress(self, upload_type: str, uploaded_to: SqlTimestamp): if self.dry_run: logging.info(f"Dry run: not updating upload progress to {uploaded_to}") return self.redshift.execute( f""" -- There doesn't seem to be a nicer way to INSERT OR UPDATE in Redshift MERGE INTO workspace_operations.upload_progress USING (SELECT :upload_type::text AS upload_type, :uploaded_to::timestamp AS uploaded_to ) AS my_source ON upload_progress.upload_type = my_source.upload_type WHEN MATCHED THEN UPDATE SET uploaded_to = my_source.uploaded_to WHEN NOT MATCHED THEN INSERT (upload_type, uploaded_to) VALUES (my_source.upload_type, my_source.uploaded_to) """, upload_type=upload_type, uploaded_to=uploaded_to ) @abstractmethod def run(self): ...

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server