archive_product
Archive a product to prevent further sales while retaining its historical record for reference.
Instructions
Archive a product so it can no longer be sold.
This is a soft-delete; the product record is retained for history.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| product_id | Yes |
Implementation Reference
- src/stream_mcp/tools/products.py:129-142 (handler)The archive_product tool handler decorated with @mcp.tool. It takes a product_id and Context, then makes a DELETE request to /api/v2/products/{product_id} via the StreamClient. Handles StreamAPIError exceptions using the _err helper.
@mcp.tool async def archive_product( product_id: str, ctx: Context = None, # type: ignore[assignment] ) -> dict[str, Any]: """Archive a product so it can no longer be sold. This is a soft-delete; the product record is retained for history. """ client = await get_client(ctx) try: return await client.delete(f"{_BASE}/{product_id}") except StreamAPIError as exc: return _err(exc) - Error handler helper function _err that converts StreamAPIError into a standardized error dict with error flag, status code, message, and details.
def _err(exc: StreamAPIError) -> dict[str, Any]: return {"error": True, "code": exc.status_code, "message": str(exc), "details": exc.body} - src/stream_mcp/tools/__init__.py:25-25 (registration)Registration call: products.register(mcp) is invoked in register_all_tools to register all product tools including archive_product.
products.register(mcp) - src/stream_mcp/helpers.py:31-73 (helper)The get_client helper function that retrieves a StreamClient for the current request, supporting both local (shared from lifespan) and remote (per-user from Bearer token) modes.
async def get_client(ctx: "Context") -> StreamClient: """Return a :class:`StreamClient` for the current request. Resolution order: 1. **Lifespan client** — used in local / stdio mode where a single ``STREAM_API_KEY`` is set as an environment variable. 2. **Per-user client** — used in remote mode where each user passes their own API key as a Bearer token and (optionally) a custom base URL via the ``X-Stream-Base-URL`` header. """ # ── 1. Local mode: shared client from server lifespan ───────────── shared_client = ctx.lifespan_context.get("client") if shared_client is not None: return shared_client # ── 2. Remote mode: per-user client from Bearer token ───────────── api_key = current_api_key.get() if not api_key: raise StreamError( "No Stream API key found. " "In local mode, set the STREAM_API_KEY env var. " "In remote mode, pass your key as a Bearer token in the Authorization header." ) base_url = current_base_url.get() or settings.stream_base_url cache_key = f"{api_key}::{base_url}" if cache_key not in _client_cache: client = StreamClient( api_key=api_key, base_url=base_url, timeout=settings.stream_timeout, max_retries=settings.stream_max_retries, ) await client.__aenter__() _client_cache[cache_key] = client logger.info( "Created cached StreamClient for remote user (key=…%s, base=%s)", api_key[-4:], base_url, ) return _client_cache[cache_key] - src/stream_mcp/tools/products.py:25-142 (registration)The register function that contains all product tool definitions including archive_product. This function is called by register_all_tools to register all product tools on the FastMCP instance.
def register(mcp: FastMCP) -> None: """Register all product tools on *mcp*.""" @mcp.tool async def create_product( name: str, type: Literal["ONE_OFF", "RECURRING", "METERED"] = "ONE_OFF", price: float = 1.0, currency: str = "SAR", description: str | None = None, is_price_inclusive_of_vat: bool = True, is_price_exempt_from_vat: bool = False, recurring_interval: str | None = None, recurring_interval_count: int = 1, ctx: Context = None, # type: ignore[assignment] ) -> dict[str, Any]: """Create a new product or service in Stream. *type* is ``ONE_OFF``, ``RECURRING``, or ``METERED``. For recurring products, specify *recurring_interval* (WEEK, MONTH, SEMESTER, YEAR). """ prices = [ProductPriceInlineCreate( currency=currency, amount=price, is_price_inclusive_of_vat=is_price_inclusive_of_vat, is_price_exempt_from_vat=is_price_exempt_from_vat, )] body = CreateProductRequest( name=name, type=type, description=description, prices=prices, recurring_interval=recurring_interval, recurring_interval_count=recurring_interval_count, ) client = await get_client(ctx) try: return await client.post(_BASE, body.model_dump(exclude_none=True)) except StreamAPIError as exc: return _err(exc) @mcp.tool async def list_products( page: int = 1, limit: int = 20, type: str | None = None, active: bool | None = None, currency: str | None = None, ctx: Context = None, # type: ignore[assignment] ) -> dict[str, Any]: """List products with optional filters. *type* can be ``ONE_OFF``, ``RECURRING``, or ``METERED``. *active* filters by active/inactive status. """ params: dict[str, Any] = {"page": page, "limit": limit} if type: params["type"] = type if active is not None: params["active"] = active if currency: params["currency"] = currency client = await get_client(ctx) try: return await client.get(_BASE, params=params) except StreamAPIError as exc: return _err(exc) @mcp.tool async def get_product( product_id: str, ctx: Context = None, # type: ignore[assignment] ) -> dict[str, Any]: """Get a single product by ID.""" client = await get_client(ctx) try: return await client.get(f"{_BASE}/{product_id}") except StreamAPIError as exc: return _err(exc) @mcp.tool async def update_product( product_id: str, name: str | None = None, description: str | None = None, is_active: bool | None = None, ctx: Context = None, # type: ignore[assignment] ) -> dict[str, Any]: """Update an existing product's name, description, or active status. Only the fields you provide will be changed. """ body = UpdateProductRequest( name=name, description=description, is_active=is_active, ) client = await get_client(ctx) try: return await client.put( f"{_BASE}/{product_id}", body.model_dump(exclude_none=True), ) except StreamAPIError as exc: return _err(exc) @mcp.tool async def archive_product( product_id: str, ctx: Context = None, # type: ignore[assignment] ) -> dict[str, Any]: """Archive a product so it can no longer be sold. This is a soft-delete; the product record is retained for history. """ client = await get_client(ctx) try: return await client.delete(f"{_BASE}/{product_id}") except StreamAPIError as exc: return _err(exc)