Skip to main content
Glama

SAP Commerce MCP Server

carts.py11.2 kB
"""FastMCP tools for cart and cart entry operations.""" from __future__ import annotations from typing import Annotated, Any from fastmcp import Context, FastMCP from pydantic import Field from app.models import JsonObject from app.occ_client import occ_client from app.utils import OccApiError from .common import handle_occ_error def register(server: FastMCP) -> None: """Register cart-related tools with the FastMCP server.""" @server.tool("carts.list", description="List carts for a user (defaults to anonymous).") async def carts_list( ctx: Context, base_site_id: Annotated[str, Field(description="Base site identifier.")], current_page: Annotated[int | None, Field(ge=0)] = None, page_size: Annotated[int | None, Field(ge=1)] = None, sort: Annotated[str | None, Field(description="Optional sort key." )] = None, saved_carts_only: Annotated[bool | None, Field(description="Return only saved carts.")] = None, fields: Annotated[str | None, Field(description="Fields selector.")] = None, user_id: Annotated[str, Field(description="User identifier.")] = "anonymous", ) -> JsonObject: await ctx.report_progress(0, total=1, message="Fetching carts") try: payload = await occ_client.get_carts( base_site_id, user_id, current_page=current_page, page_size=page_size, sort=sort, saved_carts_only=saved_carts_only, fields=fields, ) except OccApiError as error: await handle_occ_error(ctx, "carts.list", error) await ctx.report_progress(1, total=1, message="Retrieved carts") return payload @server.tool("carts.create", description="Create or restore a cart.") async def carts_create( ctx: Context, base_site_id: Annotated[str, Field(description="Base site identifier.")], fields: Annotated[str | None, Field(description="Fields selector.")] = None, old_cart_id: Annotated[str | None, Field(description="Optional cart to restore.")] = None, to_merge_cart_guid: Annotated[str | None, Field(description="Cart GUID to merge.")] = None, to_merge_cart_id: Annotated[str | None, Field(description="Cart ID to merge.")] = None, cart: Annotated[dict[str, Any] | None, Field(description="Cart payload (CartWsDTO).")] = None, user_id: Annotated[str, Field(description="User identifier.")] = "anonymous", ) -> JsonObject: await ctx.report_progress(0, total=1, message="Creating cart") try: payload = await occ_client.create_cart( base_site_id, user_id, fields=fields, old_cart_id=old_cart_id, to_merge_cart_guid=to_merge_cart_guid, to_merge_cart_id=to_merge_cart_id, payload=cart, ) except OccApiError as error: await handle_occ_error(ctx, "carts.create", error) await ctx.report_progress(1, total=1, message="Cart ready") return payload @server.tool("carts.get", description="Fetch a cart by ID.") async def carts_get( ctx: Context, base_site_id: Annotated[str, Field(description="Base site identifier.")], cart_id: Annotated[str, Field(description="Cart identifier.")], fields: Annotated[str | None, Field(description="Fields selector.")] = None, user_id: Annotated[str, Field(description="User identifier.")] = "anonymous", ) -> JsonObject: await ctx.report_progress(0, total=1, message="Fetching cart") try: payload = await occ_client.get_cart( base_site_id, user_id, cart_id, fields=fields, ) except OccApiError as error: await handle_occ_error(ctx, "carts.get", error) await ctx.report_progress(1, total=1, message="Cart retrieved") return payload @server.tool("carts.delete", description="Delete a cart.") async def carts_delete( ctx: Context, base_site_id: Annotated[str, Field(description="Base site identifier.")], cart_id: Annotated[str, Field(description="Cart identifier.")], user_id: Annotated[str, Field(description="User identifier.")] = "anonymous", ) -> dict[str, Any]: await ctx.report_progress(0, total=1, message="Deleting cart") try: metadata = await occ_client.delete_cart(base_site_id, user_id, cart_id) except OccApiError as error: await handle_occ_error(ctx, "carts.delete", error) await ctx.report_progress(1, total=1, message="Cart deleted") return metadata @server.tool("cartEntries.list", description="List entries in a cart.") async def cart_entries_list( ctx: Context, base_site_id: Annotated[str, Field(description="Base site identifier.")], cart_id: Annotated[str, Field(description="Cart identifier.")], fields: Annotated[str | None, Field(description="Fields selector.")] = None, user_id: Annotated[str, Field(description="User identifier.")] = "anonymous", ) -> JsonObject: await ctx.report_progress(0, total=1, message="Fetching cart entries") try: payload = await occ_client.get_cart_entries( base_site_id, user_id, cart_id, fields=fields, ) except OccApiError as error: await handle_occ_error(ctx, "cartEntries.list", error) await ctx.report_progress(1, total=1, message="Entries retrieved") return payload @server.tool("cartEntries.add", description="Add a product to a cart.") async def cart_entries_add( ctx: Context, base_site_id: Annotated[str, Field(description="Base site identifier.")], cart_id: Annotated[str, Field(description="Cart identifier.")], entry: Annotated[dict[str, Any], Field(description="OrderEntry payload." )], fields: Annotated[str | None, Field(description="Fields selector.")] = None, user_id: Annotated[str, Field(description="User identifier.")] = "anonymous", ) -> JsonObject: await ctx.report_progress(0, total=1, message="Adding cart entry") try: payload = await occ_client.add_cart_entry( base_site_id, user_id, cart_id, fields=fields, payload=entry, ) except OccApiError as error: await handle_occ_error(ctx, "cartEntries.add", error) await ctx.report_progress(1, total=1, message="Entry added") return payload @server.tool("cartEntries.get", description="Get details of a cart entry.") async def cart_entries_get( ctx: Context, base_site_id: Annotated[str, Field(description="Base site identifier.")], cart_id: Annotated[str, Field(description="Cart identifier.")], entry_number: Annotated[str, Field(description="Entry number.")], fields: Annotated[str | None, Field(description="Fields selector.")] = None, user_id: Annotated[str, Field(description="User identifier.")] = "anonymous", ) -> JsonObject: await ctx.report_progress(0, total=1, message="Fetching cart entry") try: payload = await occ_client.get_cart_entry( base_site_id, user_id, cart_id, entry_number, fields=fields, ) except OccApiError as error: await handle_occ_error(ctx, "cartEntries.get", error) await ctx.report_progress(1, total=1, message="Entry retrieved") return payload @server.tool("cartEntries.update", description="Update a cart entry via PUT.") async def cart_entries_update( ctx: Context, base_site_id: Annotated[str, Field(description="Base site identifier.")], cart_id: Annotated[str, Field(description="Cart identifier.")], entry_number: Annotated[str, Field(description="Entry number.")], entry: Annotated[dict[str, Any], Field(description="OrderEntry payload.")], fields: Annotated[str | None, Field(description="Fields selector.")] = None, user_id: Annotated[str, Field(description="User identifier.")] = "anonymous", ) -> JsonObject: await ctx.report_progress(0, total=1, message="Updating cart entry") try: payload = await occ_client.update_cart_entry( base_site_id, user_id, cart_id, entry_number, fields=fields, payload=entry, ) except OccApiError as error: await handle_occ_error(ctx, "cartEntries.update", error) await ctx.report_progress(1, total=1, message="Entry updated") return payload @server.tool("cartEntries.patch", description="Patch a cart entry via PATCH.") async def cart_entries_patch( ctx: Context, base_site_id: Annotated[str, Field(description="Base site identifier.")], cart_id: Annotated[str, Field(description="Cart identifier.")], entry_number: Annotated[str, Field(description="Entry number.")], entry: Annotated[dict[str, Any], Field(description="Partial OrderEntry payload.")], fields: Annotated[str | None, Field(description="Fields selector.")] = None, user_id: Annotated[str, Field(description="User identifier.")] = "anonymous", ) -> JsonObject: await ctx.report_progress(0, total=1, message="Patching cart entry") try: payload = await occ_client.patch_cart_entry( base_site_id, user_id, cart_id, entry_number, fields=fields, payload=entry, ) except OccApiError as error: await handle_occ_error(ctx, "cartEntries.patch", error) await ctx.report_progress(1, total=1, message="Entry patched") return payload @server.tool("cartEntries.delete", description="Delete a cart entry.") async def cart_entries_delete( ctx: Context, base_site_id: Annotated[str, Field(description="Base site identifier.")], cart_id: Annotated[str, Field(description="Cart identifier.")], entry_number: Annotated[str, Field(description="Entry number.")], user_id: Annotated[str, Field(description="User identifier.")] = "anonymous", ) -> dict[str, Any]: await ctx.report_progress(0, total=1, message="Deleting cart entry") try: metadata = await occ_client.delete_cart_entry( base_site_id, user_id, cart_id, entry_number, ) except OccApiError as error: await handle_occ_error(ctx, "cartEntries.delete", error) await ctx.report_progress(1, total=1, message="Entry deleted") return metadata

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/commerce-cloud-integrations/sap-commerce-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server