Skip to main content
Glama

SAP Commerce MCP Server

asm.py4.65 kB
"""FastMCP tools for Assisted Service Module (ASM) endpoints.""" from __future__ import annotations from typing import Annotated, Any from fastmcp import Context, FastMCP from pydantic import Field from app.models import JsonObject from app.occ_client import occ_client from app.utils import OccApiError from .common import handle_occ_error def register(server: FastMCP) -> None: """Register ASM-related tools on the server.""" @server.tool( "asm.customer360", description="Retrieve Customer 360 data for a specific user.", ) async def asm_customer360( ctx: Context, base_site_id: Annotated[ str, Field(description="OCC base site identifier.", min_length=1, examples=["powertools-spa"]), ], user_id: Annotated[ str, Field(description="User identifier for which to fetch Customer 360 data."), ], query: Annotated[ dict[str, Any], Field(description="Customer 360 request payload (see ASM schema)."), ], ) -> JsonObject: await ctx.report_progress(0, total=1, message="Requesting Customer 360 data") try: payload = await occ_client.get_customer360(base_site_id, user_id, query) except OccApiError as error: await handle_occ_error(ctx, "asm.customer360", error) await ctx.report_progress(1, total=1, message="Retrieved Customer 360 data") return payload @server.tool( "asm.customers.create", description="Create a customer account via ASM.", ) async def asm_customers_create( ctx: Context, base_site: Annotated[ str, Field(description="Base site identifier used for the new customer."), ], customer: Annotated[ dict[str, Any], Field(description="Customer registration payload."), ], ) -> JsonObject: await ctx.report_progress(0, total=1, message="Creating ASM customer") try: payload = await occ_client.create_customer(base_site, customer) except OccApiError as error: await handle_occ_error(ctx, "asm.customers.create", error) await ctx.report_progress(1, total=1, message="Customer created") return payload @server.tool( "asm.customers.search", description="Search for customers with pagination support.", ) async def asm_customers_search( ctx: Context, base_site: Annotated[ str, Field(description="Base site identifier used for the search."), ], page: Annotated[ int | None, Field(description="Optional page number (0-indexed).", ge=0), ] = None, page_size: Annotated[ int | None, Field(description="Optional page size."), ] = None, sort: Annotated[ str | None, Field(description="Optional sort code supported by ASM."), ] = None, filters: Annotated[ dict[str, Any] | None, Field(description="Optional search payload (CustomerSearchInput)."), ] = None, ) -> JsonObject: await ctx.report_progress(0, total=1, message="Searching ASM customers") try: payload = await occ_client.search_customers( base_site, page=page, page_size=page_size, sort=sort, payload=filters, ) except OccApiError as error: await handle_occ_error(ctx, "asm.customers.search", error) await ctx.report_progress(1, total=1, message="Retrieved ASM customers") return payload @server.tool( "asm.customers.suggest", description="Retrieve autocomplete customer suggestions.", ) async def asm_customers_suggest( ctx: Context, base_site: Annotated[ str, Field(description="Base site identifier used for suggestions."), ], query: Annotated[ dict[str, Any], Field(description="Suggestion request payload (CustomerSuggestionsInput)."), ], ) -> JsonObject: await ctx.report_progress(0, total=1, message="Fetching ASM customer suggestions") try: payload = await occ_client.suggest_customers(base_site, query) except OccApiError as error: await handle_occ_error(ctx, "asm.customers.suggest", error) await ctx.report_progress(1, total=1, message="Retrieved ASM suggestions") return payload

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/commerce-cloud-integrations/sap-commerce-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server