Skip to main content
Glama

SAP Commerce MCP Server

catalogs.py4.07 kB
"""Catalog-related FastMCP tools.""" from __future__ import annotations from typing import Annotated from fastmcp import Context, FastMCP from pydantic import Field from app.models import JsonObject from app.occ_client import occ_client from app.utils import OccApiError from .common import handle_occ_error def register(server: FastMCP) -> None: """Register catalog tools on the server.""" @server.tool("catalogs.list", description="List catalogs for a given base site.") async def catalogs_list( ctx: Context, base_site_id: Annotated[ str, Field(description="The OCC base site identifier.", min_length=1, examples=["electronics"]), ], fields: Annotated[ str | None, Field( description="Override the fields query parameter (defaults to OCC_DEFAULT_FIELDS).", examples=["DEFAULT"], ), ] = None, ) -> JsonObject: await ctx.report_progress(0, total=1, message=f"Fetching catalogs for base site '{base_site_id}'") try: payload = await occ_client.get_catalogs(base_site_id, fields) except OccApiError as error: await handle_occ_error(ctx, "catalogs.list", error) await ctx.report_progress(1, total=1, message="Retrieved catalogs") return payload @server.tool("catalogs.get", description="Fetch a single catalog for a base site.") async def catalogs_get( ctx: Context, base_site_id: Annotated[ str, Field(description="The OCC base site identifier.", min_length=1, examples=["electronics"]), ], catalog_id: Annotated[ str, Field(description="Catalog identifier to retrieve.", min_length=1, examples=["electronicsProductCatalog"]), ], fields: Annotated[ str | None, Field( description="Override the fields query parameter (defaults to OCC_DEFAULT_FIELDS).", examples=["FULL"], ), ] = None, ) -> JsonObject: await ctx.report_progress( 0, total=1, message=f"Fetching catalog '{catalog_id}' for base site '{base_site_id}'", ) try: payload = await occ_client.get_catalog(base_site_id, catalog_id, fields) except OccApiError as error: await handle_occ_error(ctx, "catalogs.get", error) await ctx.report_progress(1, total=1, message="Retrieved catalog") return payload @server.tool("catalogVersions.get", description="Fetch a specific catalog version for a catalog.") async def catalog_versions_get( ctx: Context, base_site_id: Annotated[ str, Field(description="The OCC base site identifier.", min_length=1, examples=["electronics"]), ], catalog_id: Annotated[ str, Field(description="Catalog identifier that contains the version.", min_length=1), ], catalog_version_id: Annotated[ str, Field(description="Catalog version to retrieve (e.g. 'Online').", min_length=1), ], fields: Annotated[ str | None, Field( description="Override the fields query parameter (defaults to OCC_DEFAULT_FIELDS).", examples=["DEFAULT"], ), ] = None, ) -> JsonObject: await ctx.report_progress( 0, total=1, message=( f"Fetching catalog version '{catalog_version_id}' for catalog '{catalog_id}'" f" in base site '{base_site_id}'" ), ) try: payload = await occ_client.get_catalog_version(base_site_id, catalog_id, catalog_version_id, fields) except OccApiError as error: await handle_occ_error(ctx, "catalogVersions.get", error) await ctx.report_progress(1, total=1, message="Retrieved catalog version") return payload

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/commerce-cloud-integrations/sap-commerce-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server