Skip to main content
Glama

DIY Helper MCP Servers

by jrszilard
webhooks.py7.82 kB
# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. from __future__ import annotations import hmac import json import time import base64 import hashlib from typing import cast from .._types import HeadersLike from .._utils import get_required_header from .._models import construct_type from .._resource import SyncAPIResource, AsyncAPIResource from .._exceptions import InvalidWebhookSignatureError from ..types.webhooks.unwrap_webhook_event import UnwrapWebhookEvent __all__ = ["Webhooks", "AsyncWebhooks"] class Webhooks(SyncAPIResource): def unwrap( self, payload: str | bytes, headers: HeadersLike, *, secret: str | None = None, ) -> UnwrapWebhookEvent: """Validates that the given payload was sent by OpenAI and parses the payload.""" if secret is None: secret = self._client.webhook_secret self.verify_signature(payload=payload, headers=headers, secret=secret) return cast( UnwrapWebhookEvent, construct_type( type_=UnwrapWebhookEvent, value=json.loads(payload), ), ) def verify_signature( self, payload: str | bytes, headers: HeadersLike, *, secret: str | None = None, tolerance: int = 300, ) -> None: """Validates whether or not the webhook payload was sent by OpenAI. Args: payload: The webhook payload headers: The webhook headers secret: The webhook secret (optional, will use client secret if not provided) tolerance: Maximum age of the webhook in seconds (default: 300 = 5 minutes) """ if secret is None: secret = self._client.webhook_secret if secret is None: raise ValueError( "The webhook secret must either be set using the env var, OPENAI_WEBHOOK_SECRET, " "on the client class, OpenAI(webhook_secret='123'), or passed to this function" ) signature_header = get_required_header(headers, "webhook-signature") timestamp = get_required_header(headers, "webhook-timestamp") webhook_id = get_required_header(headers, "webhook-id") # Validate timestamp to prevent replay attacks try: timestamp_seconds = int(timestamp) except ValueError: raise InvalidWebhookSignatureError("Invalid webhook timestamp format") from None now = int(time.time()) if now - timestamp_seconds > tolerance: raise InvalidWebhookSignatureError("Webhook timestamp is too old") from None if timestamp_seconds > now + tolerance: raise InvalidWebhookSignatureError("Webhook timestamp is too new") from None # Extract signatures from v1,<base64> format # The signature header can have multiple values, separated by spaces. # Each value is in the format v1,<base64>. We should accept if any match. signatures: list[str] = [] for part in signature_header.split(): if part.startswith("v1,"): signatures.append(part[3:]) else: signatures.append(part) # Decode the secret if it starts with whsec_ if secret.startswith("whsec_"): decoded_secret = base64.b64decode(secret[6:]) else: decoded_secret = secret.encode() body = payload.decode("utf-8") if isinstance(payload, bytes) else payload # Prepare the signed payload (OpenAI uses webhookId.timestamp.payload format) signed_payload = f"{webhook_id}.{timestamp}.{body}" expected_signature = base64.b64encode( hmac.new(decoded_secret, signed_payload.encode(), hashlib.sha256).digest() ).decode() # Accept if any signature matches if not any(hmac.compare_digest(expected_signature, sig) for sig in signatures): raise InvalidWebhookSignatureError( "The given webhook signature does not match the expected signature" ) from None class AsyncWebhooks(AsyncAPIResource): def unwrap( self, payload: str | bytes, headers: HeadersLike, *, secret: str | None = None, ) -> UnwrapWebhookEvent: """Validates that the given payload was sent by OpenAI and parses the payload.""" if secret is None: secret = self._client.webhook_secret self.verify_signature(payload=payload, headers=headers, secret=secret) body = payload.decode("utf-8") if isinstance(payload, bytes) else payload return cast( UnwrapWebhookEvent, construct_type( type_=UnwrapWebhookEvent, value=json.loads(body), ), ) def verify_signature( self, payload: str | bytes, headers: HeadersLike, *, secret: str | None = None, tolerance: int = 300, ) -> None: """Validates whether or not the webhook payload was sent by OpenAI. Args: payload: The webhook payload headers: The webhook headers secret: The webhook secret (optional, will use client secret if not provided) tolerance: Maximum age of the webhook in seconds (default: 300 = 5 minutes) """ if secret is None: secret = self._client.webhook_secret if secret is None: raise ValueError( "The webhook secret must either be set using the env var, OPENAI_WEBHOOK_SECRET, " "on the client class, OpenAI(webhook_secret='123'), or passed to this function" ) from None signature_header = get_required_header(headers, "webhook-signature") timestamp = get_required_header(headers, "webhook-timestamp") webhook_id = get_required_header(headers, "webhook-id") # Validate timestamp to prevent replay attacks try: timestamp_seconds = int(timestamp) except ValueError: raise InvalidWebhookSignatureError("Invalid webhook timestamp format") from None now = int(time.time()) if now - timestamp_seconds > tolerance: raise InvalidWebhookSignatureError("Webhook timestamp is too old") from None if timestamp_seconds > now + tolerance: raise InvalidWebhookSignatureError("Webhook timestamp is too new") from None # Extract signatures from v1,<base64> format # The signature header can have multiple values, separated by spaces. # Each value is in the format v1,<base64>. We should accept if any match. signatures: list[str] = [] for part in signature_header.split(): if part.startswith("v1,"): signatures.append(part[3:]) else: signatures.append(part) # Decode the secret if it starts with whsec_ if secret.startswith("whsec_"): decoded_secret = base64.b64decode(secret[6:]) else: decoded_secret = secret.encode() body = payload.decode("utf-8") if isinstance(payload, bytes) else payload # Prepare the signed payload (OpenAI uses webhookId.timestamp.payload format) signed_payload = f"{webhook_id}.{timestamp}.{body}" expected_signature = base64.b64encode( hmac.new(decoded_secret, signed_payload.encode(), hashlib.sha256).digest() ).decode() # Accept if any signature matches if not any(hmac.compare_digest(expected_signature, sig) for sig in signatures): raise InvalidWebhookSignatureError("The given webhook signature does not match the expected signature")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jrszilard/diy-helper-mcp-servers'

If you have feedback or need assistance with the MCP directory API, please join our Discord server