Skip to main content
Glama
si_lago_api.py11 kB
from collections.abc import Iterable from itertools import islice from typing import ( Any, Literal, NewType, NotRequired, Optional, TypeVar, TypedDict, cast, ) from pip._vendor import requests from pip._vendor.requests.exceptions import HTTPError from pip._vendor.requests.models import Response from si_types import IsoTimestamp import logging import urllib.parse import sys ExternalSubscriptionId = NewType("ExternalSubscriptionId", str) class LagoEvent(TypedDict): transaction_id: str external_subscription_id: ExternalSubscriptionId timestamp: float code: str properties: dict[str, int] T = TypeVar("T") class LagoApi: def __init__(self, lago_api_url: str, lago_api_token: str): self.lago_api_url = lago_api_url self.lago_api_token = lago_api_token def request(self, method: str, path: str, **kwargs): logging.debug(f"Lago API request: {method} {path}") response = requests.api.request( method, urllib.parse.urljoin(self.lago_api_url, path), headers={"Authorization": f"Bearer {self.lago_api_token}"}, **kwargs, ) try: response.raise_for_status() except HTTPError as e: message = f"{e.response.status_code} for {method} {path}: {e.response.text}" try: json = cast(LagoHTTPError.ResponseJson, e.response.json()) except: logging.warning("Failed to parse JSON from Lago API response.", exc_info=sys.exc_info()) json = None raise LagoHTTPError(message, response=e.response, json=json) from e return response def get(self, path: str): return self.request("GET", path) def delete(self, path: str): return self.request("DELETE", path) def put(self, path: str, json): return self.request("PUT", path, json=json) def post(self, path: str, json): return self.request("POST", path, json=json) def upload_events(self, events: Iterable[LagoEvent], *, dry_run = False): """ Uploads events to Lago, batching them in groups of 100 (Lago's maximum). If a batch fails due to an event already having been uploaded, we retry the ones that try to upload each event individually. Exceptions: - HTTPError: Any HTTP error response except 422 (which indicates the event already exists) will be thrown. """ total_events = 0 new_events = 0 for event_batch in batch(events, 100): total_events += len(event_batch) try: path = "/api/v1/events/batch" payload = {"events": event_batch} if dry_run: logging.info(f"Would POST {path} with payload: {payload}") else: self.post(path, payload) new_events += len(event_batch) logging.debug(f"Uploaded {len(event_batch)} events.") # If the batch failed because some events were already uploaded, retry the rest except LagoHTTPError as e: if not (e.json and e.json['status'] == 422 and e.json['code'] == "validation_errors"): raise for error in e.json['error_details'].values(): if error.get("transaction_id") != ["value_already_exist"]: raise # Retry the events that did not fail logging.debug( f"Events already existed: {[event_batch[int(i)]['transaction_id'] for i in e.json['error_details'].keys()]}" ) retry_event_batch = [ event for (i, event) in enumerate(event_batch) if str(i) not in e.json['error_details'].keys() ] if len(retry_event_batch) > 0: logging.warning( f"{len(e.json['error_details'])} / {len(event_batch)} events already existed. Reuploading remaining {len(retry_event_batch)} events." ) # Retry the events that didn't fail. Any failure here is a real error assert not dry_run self.post("/api/v1/events/batch", {"events": retry_event_batch}) new_events += len(retry_event_batch) return new_events, total_events class LagoHTTPError(HTTPError): class ResponseJson(TypedDict): status: int code: str error_details: dict[str, Any] def __init__(self, *args, json: Optional[ResponseJson], **kwargs): super().__init__(*args, **kwargs) self.json = json class LagoResponseMetadata(TypedDict): current_page: int next_page: NotRequired[Optional[int]] prev_page: NotRequired[Optional[int]] total_pages: int total_count: int LagoCurrency = Literal[ "AED", "AFN", "ALL", "AMD", "ANG", "AOA", "ARS", "AUD", "AWG", "AZN", "BAM", "BBD", "BDT", "BGN", "BIF", "BMD", "BND", "BOB", "BRL", "BSD", "BWP", "BYN", "BZD", "CAD", "CDF", "CHF", "CLF", "CLP", "CNY", "COP", "CRC", "CVE", "CZK", "DJF", "DKK", "DOP", "DZD", "EGP", "ETB", "EUR", "FJD", "FKP", "GBP", "GEL", "GIP", "GMD", "GNF", "GTQ", "GYD", "HKD", "HNL", "HRK", "HTG", "HUF", "IDR", "ILS", "INR", "ISK", "JMD", "JPY", "KES", "KGS", "KHR", "KMF", "KRW", "KYD", "KZT", "LAK", "LBP", "LKR", "LRD", "LSL", "MAD", "MDL", "MGA", "MKD", "MMK", "MNT", "MOP", "MRO", "MUR", "MVR", "MWK", "MXN", "MYR", "MZN", "NAD", "NGN", "NIO", "NOK", "NPR", "NZD", "PAB", "PEN", "PGK", "PHP", "PKR", "PLN", "PYG", "QAR", "RON", "RSD", "RUB", "RWF", "SAR", "SBD", "SCR", "SEK", "SGD", "SHP", "SLL", "SOS", "SRD", "STD", "SZL", "THB", "TJS", "TOP", "TRY", "TTD", "TWD", "TZS", "UAH", "UGX", "USD", "UYU", "UZS", "VND", "VUV", ] class LagoInvoice(TypedDict): lago_id: str sequential_id: NotRequired[int] number: str issuing_date: IsoTimestamp payment_dispute_lost_at: NotRequired[IsoTimestamp] payment_due_date: NotRequired[IsoTimestamp] payment_overdue: NotRequired[bool] net_payment_term: NotRequired[int] invoice_type: Literal[ "subscription", "add_on", "credit", "one_off", "progressive_billing" ] status: Literal["draft", "finalized", "voided", "failed"] payment_status: Literal["pending", "succeeded", "failed"] currency: LagoCurrency fees_amount_cents: int coupons_amount_cents: int credit_notes_amount_cents: int sub_total_excluding_taxes_amount_cents: int taxes_amount_cents: int sub_total_including_taxes_amount_cents: int prepaid_credit_amount_cents: int progressive_billing_credit_amount_cents: int total_amount_cents: int customer: dict[str, object] metadata: list[dict[str, object]] applied_taxes: list[dict[str, object]] applied_usage_thresholds: NotRequired[Optional[list[dict[str, object]]]] class LagoChargeProperties(TypedDict): graduated_ranges: list[dict[str, object]] graduated_percentage_ranges: list[dict[str, object]] amount: NotRequired[Optional[str]] free_units: NotRequired[Optional[int]] package_size: NotRequired[Optional[int]] rate: NotRequired[Optional[str]] fixed_amount: NotRequired[Optional[str]] free_units_per_events: NotRequired[Optional[int]] free_units_per_total_aggregation: NotRequired[Optional[str]] per_transaction_max_amount: NotRequired[Optional[str]] per_transaction_min_amount: NotRequired[Optional[str]] grouped_by: list[str] volume_ranges: list[dict[str, object]] class LagoCharge(TypedDict): lago_id: str lago_billable_metric_id: str billable_metric_code: str invoice_display_name: NotRequired[Optional[str]] created_at: IsoTimestamp charge_model: Literal[ "standard", "graduated", "graduated_percentage", "package", "percentage", "volume", "dynamic", ] pay_in_advance: NotRequired[Optional[bool]] invoiceable: NotRequired[Optional[bool]] regroup_paid_fees: NotRequired[Optional[Literal["invoice"]]] prorated: NotRequired[Optional[bool]] min_amount_cents: NotRequired[Optional[int]] properties: LagoChargeProperties filters: list[dict[str, object]] taxes: list[dict[str, object]] class LagoPlan(TypedDict): lago_id: str name: str invoice_display_name: NotRequired[Optional[str]] created_at: IsoTimestamp code: str interval: Literal["weekly", "monthly", "quarterly", "yearly"] description: NotRequired[Optional[str]] amount_cents: int amount_currency: LagoCurrency trial_period: NotRequired[Optional[float]] pay_in_advance: bool bill_charges_monthly: NotRequired[Optional[bool]] active_subscriptions_count: int draft_invoices_count: int minimum_commitment: NotRequired[Optional[dict[str, object]]] charges: list[LagoCharge] taxes: list[dict[str, object]] usage_thresholds: list[dict[str, object]] class LagoSubscription(TypedDict): lago_id: str external_id: ExternalSubscriptionId lago_customer_id: str external_customer_id: str billing_time: Literal["calendar", "anniversary"] name: NotRequired[Optional[str]] plan_code: str status: Literal["active", "pending", "terminated", "canceled"] created_at: IsoTimestamp canceled_at: NotRequired[Optional[IsoTimestamp]] started_at: NotRequired[Optional[IsoTimestamp]] ending_at: NotRequired[Optional[IsoTimestamp]] subscription_at: IsoTimestamp terminated_at: NotRequired[Optional[IsoTimestamp]] previous_plan_code: NotRequired[Optional[str]] next_plan_code: NotRequired[Optional[str]] downgrade_plan_date: NotRequired[Optional[IsoTimestamp]] trial_ended_at: NotRequired[Optional[IsoTimestamp]] plan: LagoPlan class LagoInvoicesResponse(TypedDict): invoices: list[LagoInvoice] meta: LagoResponseMetadata class LagoSubscriptionsResponse(TypedDict): subscriptions: list[LagoSubscription] meta: LagoResponseMetadata def batch(iterable: Iterable[T], n): "Batch data into lists of length n. The last batch may be shorter." # batched('ABCDEFG', 3) --> ABC DEF G it = iter(iterable) while True: batch = list(islice(it, n)) if not batch: return yield batch

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server