asm.py•2.03 kB
"""Assisted Service Module (ASM) endpoints for the OCC client."""
from __future__ import annotations
from typing import Any
from app.models import JsonObject
from app.utils import clean_params
from .base import SupportsOccHttp
class AsmMixin:
async def get_customer360(
self: SupportsOccHttp,
base_site_id: str,
user_id: str,
payload: dict[str, Any],
) -> JsonObject:
"""Retrieve Customer 360 data for the given user."""
path = f"/{base_site_id}/users/{user_id}/customer360"
return await self.post_json(path, params={}, json_data=payload or {})
async def create_customer(
self: SupportsOccHttp,
base_site: str,
payload: dict[str, Any],
) -> JsonObject:
"""Create a new customer account via ASM."""
params = {"baseSite": base_site}
return await self.post_json("/customers", params=params, json_data=payload or {})
async def search_customers(
self: SupportsOccHttp,
base_site: str,
*,
page: int | None = None,
page_size: int | None = None,
sort: str | None = None,
payload: dict[str, Any] | None = None,
) -> JsonObject:
"""Search for customers with optional pagination and criteria."""
params = clean_params(
{
"baseSite": base_site,
"page": page,
"pageSize": page_size,
"sort": sort,
}
)
return await self.post_json(
"/customers/search",
params=params,
json_data=payload or {},
)
async def suggest_customers(
self: SupportsOccHttp,
base_site: str,
payload: dict[str, Any],
) -> JsonObject:
"""Retrieve autocomplete customer suggestions."""
params = {"baseSite": base_site}
return await self.post_json(
"/customers/suggestions/search",
params=params,
json_data=payload or {},
)