Skip to main content
Glama

SAP Commerce MCP Server

integration_utils.py4.77 kB
"""Shared helpers for live integration tests.""" from __future__ import annotations from typing import Any, Dict, List from fastmcp.client.client import CallToolResult from fastmcp.exceptions import ToolError import app.occ_client as occ_module from app.occ_client import OccClient TARGET_BASE_SITE = "apparel-uk" def structured(result: CallToolResult) -> Dict[str, Any]: """Return the structured content from a tool result, defaulting to an empty dict.""" if result.structured_content is not None: return result.structured_content # type: ignore[return-value] return {} async def gather_product_context( client, base_site_id: str = TARGET_BASE_SITE, *, store_query: str = "New York", product_page_size: int = 5, ) -> Dict[str, Any]: """Collect commonly used data for discovery/cart flows.""" basesites_result = await client.call_tool("basesites.list") base_sites = structured(basesites_result).get("baseSites", []) base_site = next((site for site in base_sites if site.get("uid") == base_site_id), None) if base_site is None and base_sites: base_site = base_sites[0] base_site_id = base_site.get("uid") if base_site is None: raise AssertionError("No base site information available") store_entry = None stores_payload: Dict[str, Any] = {} try: stores_result = await client.call_tool( "stores.list", { "base_site_id": base_site_id, "query": store_query, "page_size": 5, }, ) stores_payload = structured(stores_result) stores = stores_payload.get("stores", []) store_entry = stores[0] if stores else None except ToolError: stores_payload = {} store_entry = None catalogs_result = await client.call_tool("catalogs.list", {"base_site_id": base_site_id}) catalog_list = structured(catalogs_result).get("catalogs", []) if not catalog_list: raise AssertionError("No catalogs returned for base site") catalog_entry = catalog_list[0] catalog_id = catalog_entry.get("id") catalog_versions = ( catalog_entry.get("catalogVersions") or catalog_entry.get("versions") or [] ) version_entry = next( ( version for version in catalog_versions if (version.get("id") or version.get("catalogVersion")) == "Online" ), catalog_versions[0] if catalog_versions else None, ) if catalog_versions else None if version_entry is None: raise AssertionError("No catalog version available") version_id = version_entry.get("id") or version_entry.get("catalogVersion") version_result = await client.call_tool( "catalogVersions.get", { "base_site_id": base_site_id, "catalog_id": catalog_id, "catalog_version_id": version_id, }, ) version_payload = structured(version_result) categories: List[Dict[str, Any]] = ( version_payload.get("categories") or version_entry.get("categories") or version_entry.get("rootCategories") or [] ) if not categories: raise AssertionError("No categories available for catalog version") category_entry = categories[0] category_id = category_entry.get("id") products_result = await client.call_tool( "categories.products", { "base_site_id": base_site_id, "category_id": category_id, "page_size": product_page_size, }, ) products_payload = structured(products_result) products = products_payload.get("products", []) if not products: raise AssertionError("No products returned for category") return { "base_site": base_site, "stores_payload": stores_payload, "store_entry": store_entry, "catalog_entry": catalog_entry, "catalog_id": catalog_id, "version_entry": version_entry, "version_id": version_id, "version_payload": version_payload, "category_entry": category_entry, "category_id": category_id, "products": products, "products_payload": products_payload, } def reset_occ_client() -> None: """Reinitialize the global OCC client to ensure a fresh HTTP session.""" occ_module.occ_client = OccClient() async def list_base_site_ids(client) -> List[str]: """Return all base site identifiers available from the OCC endpoint.""" basesites_result = await client.call_tool("basesites.list") base_sites = structured(basesites_result).get("baseSites", []) return [site.get("uid") for site in base_sites if site.get("uid")]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/commerce-cloud-integrations/sap-commerce-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server