catalogs.py•4.07 kB
"""Catalog-related FastMCP tools."""
from __future__ import annotations
from typing import Annotated
from fastmcp import Context, FastMCP
from pydantic import Field
from app.models import JsonObject
from app.occ_client import occ_client
from app.utils import OccApiError
from .common import handle_occ_error
def register(server: FastMCP) -> None:
"""Register catalog tools on the server."""
@server.tool("catalogs.list", description="List catalogs for a given base site.")
async def catalogs_list(
ctx: Context,
base_site_id: Annotated[
str,
Field(description="The OCC base site identifier.", min_length=1, examples=["electronics"]),
],
fields: Annotated[
str | None,
Field(
description="Override the fields query parameter (defaults to OCC_DEFAULT_FIELDS).",
examples=["DEFAULT"],
),
] = None,
) -> JsonObject:
await ctx.report_progress(0, total=1, message=f"Fetching catalogs for base site '{base_site_id}'")
try:
payload = await occ_client.get_catalogs(base_site_id, fields)
except OccApiError as error:
await handle_occ_error(ctx, "catalogs.list", error)
await ctx.report_progress(1, total=1, message="Retrieved catalogs")
return payload
@server.tool("catalogs.get", description="Fetch a single catalog for a base site.")
async def catalogs_get(
ctx: Context,
base_site_id: Annotated[
str,
Field(description="The OCC base site identifier.", min_length=1, examples=["electronics"]),
],
catalog_id: Annotated[
str,
Field(description="Catalog identifier to retrieve.", min_length=1, examples=["electronicsProductCatalog"]),
],
fields: Annotated[
str | None,
Field(
description="Override the fields query parameter (defaults to OCC_DEFAULT_FIELDS).",
examples=["FULL"],
),
] = None,
) -> JsonObject:
await ctx.report_progress(
0,
total=1,
message=f"Fetching catalog '{catalog_id}' for base site '{base_site_id}'",
)
try:
payload = await occ_client.get_catalog(base_site_id, catalog_id, fields)
except OccApiError as error:
await handle_occ_error(ctx, "catalogs.get", error)
await ctx.report_progress(1, total=1, message="Retrieved catalog")
return payload
@server.tool("catalogVersions.get", description="Fetch a specific catalog version for a catalog.")
async def catalog_versions_get(
ctx: Context,
base_site_id: Annotated[
str,
Field(description="The OCC base site identifier.", min_length=1, examples=["electronics"]),
],
catalog_id: Annotated[
str,
Field(description="Catalog identifier that contains the version.", min_length=1),
],
catalog_version_id: Annotated[
str,
Field(description="Catalog version to retrieve (e.g. 'Online').", min_length=1),
],
fields: Annotated[
str | None,
Field(
description="Override the fields query parameter (defaults to OCC_DEFAULT_FIELDS).",
examples=["DEFAULT"],
),
] = None,
) -> JsonObject:
await ctx.report_progress(
0,
total=1,
message=(
f"Fetching catalog version '{catalog_version_id}' for catalog '{catalog_id}'"
f" in base site '{base_site_id}'"
),
)
try:
payload = await occ_client.get_catalog_version(base_site_id, catalog_id, catalog_version_id, fields)
except OccApiError as error:
await handle_occ_error(ctx, "catalogVersions.get", error)
await ctx.report_progress(1, total=1, message="Retrieved catalog version")
return payload