deactivate_payment_link
Archive a payment link to prevent further transactions. Specify a payment link ID to disable it from accepting payments.
Instructions
Deactivate (archive) a payment link so it can no longer be used.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| payment_link_id | Yes | ||
| deactivate_message | No |
Implementation Reference
- The main handler function for deactivate_payment_link tool. It takes payment_link_id and optional deactivate_message parameters, creates an UpdatePaymentLinkStatusRequest with status 'INACTIVE', and makes a PATCH request to the Stream API to deactivate/archiving the payment link.
async def deactivate_payment_link( payment_link_id: str, deactivate_message: str | None = None, ctx: Context = None, # type: ignore[assignment] ) -> dict[str, Any]: """Deactivate (archive) a payment link so it can no longer be used.""" body = UpdatePaymentLinkStatusRequest( status="INACTIVE", deactivate_message=deactivate_message, ) client = await get_client(ctx) try: return await client.patch( f"{_BASE}/{payment_link_id}/status", body.model_dump(exclude_none=True), ) except StreamAPIError as exc: return _err(exc) - Pydantic schema definition for UpdatePaymentLinkStatusRequest, which validates the request body when deactivating a payment link. It requires a status field ('INACTIVE', 'ACTIVE', or 'COMPLETED') and optionally a deactivate_message.
class UpdatePaymentLinkStatusRequest(BaseModel): """Request body for updating a payment link status.""" status: str = Field(..., description="New status: INACTIVE, ACTIVE, or COMPLETED.") deactivate_message: str | None = Field(default=None, description="Message shown when link is deactivated.") - src/stream_mcp/tools/payment_links.py:24-118 (registration)The register function that registers all payment link tools including deactivate_payment_link. The function is decorated with @mcp.tool which automatically registers it with the MCP server.
def register(mcp: FastMCP) -> None: """Register all payment-link tools on *mcp*.""" @mcp.tool async def create_payment_link( name: str, items: list[dict], description: str | None = None, currency: str = "SAR", valid_until: str | None = None, max_number_of_payments: int | None = None, organization_consumer_id: str | None = None, ctx: Context = None, # type: ignore[assignment] ) -> dict[str, Any]: """Create a new payment / checkout link on Stream. *items* is a list of objects, each containing: - product_id (str, required) - quantity (int ≥ 1, optional, default 1) - coupons (list[str], optional) You **cannot** mix one-time and recurring products in the same link. """ body = CreatePaymentLinkRequest( name=name, items=items, # type: ignore[arg-type] description=description, currency=currency, valid_until=valid_until, max_number_of_payments=max_number_of_payments, organization_consumer_id=organization_consumer_id, ) client = await get_client(ctx) try: return await client.post(_BASE, body.model_dump(exclude_none=True)) except StreamAPIError as exc: return _err(exc) @mcp.tool async def list_payment_links( page: int = 1, limit: int = 20, statuses: list[str] | None = None, from_date: str | None = None, to_date: str | None = None, ctx: Context = None, # type: ignore[assignment] ) -> dict[str, Any]: """List all payment links with optional filters. *statuses* can include: INACTIVE, ACTIVE, COMPLETED. """ params: dict[str, Any] = {"page": page, "limit": limit} if statuses: params["statuses"] = statuses if from_date: params["from_date"] = from_date if to_date: params["to_date"] = to_date client = await get_client(ctx) try: return await client.get(_BASE, params=params) except StreamAPIError as exc: return _err(exc) @mcp.tool async def get_payment_link( payment_link_id: str, ctx: Context = None, # type: ignore[assignment] ) -> dict[str, Any]: """Retrieve a single payment link by its ID.""" client = await get_client(ctx) try: return await client.get(f"{_BASE}/{payment_link_id}") except StreamAPIError as exc: return _err(exc) @mcp.tool async def deactivate_payment_link( payment_link_id: str, deactivate_message: str | None = None, ctx: Context = None, # type: ignore[assignment] ) -> dict[str, Any]: """Deactivate (archive) a payment link so it can no longer be used.""" body = UpdatePaymentLinkStatusRequest( status="INACTIVE", deactivate_message=deactivate_message, ) client = await get_client(ctx) try: return await client.patch( f"{_BASE}/{payment_link_id}/status", body.model_dump(exclude_none=True), ) except StreamAPIError as exc: return _err(exc) - Helper function _err that formats StreamAPIError exceptions into a standardized error dictionary structure, used by deactivate_payment_link for error handling.
def _err(exc: StreamAPIError) -> dict[str, Any]: return {"error": True, "code": exc.status_code, "message": str(exc), "details": exc.body} - src/stream_mcp/helpers.py:31-60 (helper)Helper function get_client that retrieves the appropriate StreamClient for the current request context, supporting both local (lifespan) and remote (per-user Bearer token) modes. Used by deactivate_payment_link to make API calls.
async def get_client(ctx: "Context") -> StreamClient: """Return a :class:`StreamClient` for the current request. Resolution order: 1. **Lifespan client** — used in local / stdio mode where a single ``STREAM_API_KEY`` is set as an environment variable. 2. **Per-user client** — used in remote mode where each user passes their own API key as a Bearer token and (optionally) a custom base URL via the ``X-Stream-Base-URL`` header. """ # ── 1. Local mode: shared client from server lifespan ───────────── shared_client = ctx.lifespan_context.get("client") if shared_client is not None: return shared_client # ── 2. Remote mode: per-user client from Bearer token ───────────── api_key = current_api_key.get() if not api_key: raise StreamError( "No Stream API key found. " "In local mode, set the STREAM_API_KEY env var. " "In remote mode, pass your key as a Bearer token in the Authorization header." ) base_url = current_base_url.get() or settings.stream_base_url cache_key = f"{api_key}::{base_url}" if cache_key not in _client_cache: client = StreamClient(