asm.py•4.65 kB
"""FastMCP tools for Assisted Service Module (ASM) endpoints."""
from __future__ import annotations
from typing import Annotated, Any
from fastmcp import Context, FastMCP
from pydantic import Field
from app.models import JsonObject
from app.occ_client import occ_client
from app.utils import OccApiError
from .common import handle_occ_error
def register(server: FastMCP) -> None:
"""Register ASM-related tools on the server."""
@server.tool(
"asm.customer360",
description="Retrieve Customer 360 data for a specific user.",
)
async def asm_customer360(
ctx: Context,
base_site_id: Annotated[
str,
Field(description="OCC base site identifier.", min_length=1, examples=["powertools-spa"]),
],
user_id: Annotated[
str,
Field(description="User identifier for which to fetch Customer 360 data."),
],
query: Annotated[
dict[str, Any],
Field(description="Customer 360 request payload (see ASM schema)."),
],
) -> JsonObject:
await ctx.report_progress(0, total=1, message="Requesting Customer 360 data")
try:
payload = await occ_client.get_customer360(base_site_id, user_id, query)
except OccApiError as error:
await handle_occ_error(ctx, "asm.customer360", error)
await ctx.report_progress(1, total=1, message="Retrieved Customer 360 data")
return payload
@server.tool(
"asm.customers.create",
description="Create a customer account via ASM.",
)
async def asm_customers_create(
ctx: Context,
base_site: Annotated[
str,
Field(description="Base site identifier used for the new customer."),
],
customer: Annotated[
dict[str, Any],
Field(description="Customer registration payload."),
],
) -> JsonObject:
await ctx.report_progress(0, total=1, message="Creating ASM customer")
try:
payload = await occ_client.create_customer(base_site, customer)
except OccApiError as error:
await handle_occ_error(ctx, "asm.customers.create", error)
await ctx.report_progress(1, total=1, message="Customer created")
return payload
@server.tool(
"asm.customers.search",
description="Search for customers with pagination support.",
)
async def asm_customers_search(
ctx: Context,
base_site: Annotated[
str,
Field(description="Base site identifier used for the search."),
],
page: Annotated[
int | None,
Field(description="Optional page number (0-indexed).", ge=0),
] = None,
page_size: Annotated[
int | None,
Field(description="Optional page size."),
] = None,
sort: Annotated[
str | None,
Field(description="Optional sort code supported by ASM."),
] = None,
filters: Annotated[
dict[str, Any] | None,
Field(description="Optional search payload (CustomerSearchInput)."),
] = None,
) -> JsonObject:
await ctx.report_progress(0, total=1, message="Searching ASM customers")
try:
payload = await occ_client.search_customers(
base_site,
page=page,
page_size=page_size,
sort=sort,
payload=filters,
)
except OccApiError as error:
await handle_occ_error(ctx, "asm.customers.search", error)
await ctx.report_progress(1, total=1, message="Retrieved ASM customers")
return payload
@server.tool(
"asm.customers.suggest",
description="Retrieve autocomplete customer suggestions.",
)
async def asm_customers_suggest(
ctx: Context,
base_site: Annotated[
str,
Field(description="Base site identifier used for suggestions."),
],
query: Annotated[
dict[str, Any],
Field(description="Suggestion request payload (CustomerSuggestionsInput)."),
],
) -> JsonObject:
await ctx.report_progress(0, total=1, message="Fetching ASM customer suggestions")
try:
payload = await occ_client.suggest_customers(base_site, query)
except OccApiError as error:
await handle_occ_error(ctx, "asm.customers.suggest", error)
await ctx.report_progress(1, total=1, message="Retrieved ASM suggestions")
return payload