"""Cisco Commerce Catalog API Client.
Handles authentication, request building, and response parsing for the
Cisco Commerce Catalog Web Services API.
"""
import logging
from datetime import datetime, timedelta
from typing import Any
from xml.etree import ElementTree as ET
import httpx
from .config import Settings
from .constants import ERROR_CODES, ITEM_ATTRIBUTES
logger = logging.getLogger(__name__)
class CiscoAPIError(Exception):
"""Exception raised for Cisco API errors."""
def __init__(self, code: str, message: str):
self.code = code
self.message = message
super().__init__(f"{code}: {message}")
class CiscoCatalogClient:
"""Client for interacting with Cisco Commerce Catalog API."""
def __init__(self, settings: Settings):
self.settings = settings
self._token: str | None = None
self._token_expires_at: datetime | None = None
async def _get_access_token(self) -> str:
"""Get or refresh OAuth access token."""
now = datetime.now()
# Return cached token if still valid
if (
self._token
and self._token_expires_at
and now < self._token_expires_at
):
return self._token
logger.info("Fetching new OAuth token")
async with httpx.AsyncClient() as client:
response = await client.post(
self.settings.cisco_token_url,
data={
"grant_type": "password",
"client_id": self.settings.cisco_client_id,
"client_secret": self.settings.cisco_client_secret,
"username": self.settings.cisco_cco_username,
"password": self.settings.cisco_cco_password,
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
response.raise_for_status()
data = response.json()
self._token = data["access_token"]
expires_in = data.get("expires_in", 3600)
self._token_expires_at = now + timedelta(
seconds=expires_in - self.settings.token_refresh_buffer_seconds
)
logger.info("OAuth token obtained successfully")
return self._token
def _clear_token(self) -> None:
"""Clear cached token (e.g., after auth failure)."""
self._token = None
self._token_expires_at = None
def _build_get_item_info_xml(
self,
skus: list[str],
price_list: str,
attributes: list[str] | None = None,
) -> str:
"""Build XML request payload for Get Item Information API."""
if attributes is None:
attributes = ITEM_ATTRIBUTES
# Build attribute IDs
attr_lines = [f' <ID typeCode="PriceListShortName">{price_list}</ID>']
for attr in attributes:
if attr != "PriceListShortName":
attr_lines.append(f' <ID typeCode="{attr}"/>')
attributes_xml = "\n".join(attr_lines)
# Build item lines
items_xml = "\n".join(
f" <Item><ID>{sku}</ID></Item>" for sku in skus
)
return f"""<?xml version="1.0" encoding="UTF-8"?>
<GetCatalog xmlns="http://www.openapplications.org/oagis/10">
<ApplicationArea>
<Sender>
<LogicalID>CiscoCatalogMCP</LogicalID>
<ReferenceID>mcp-{datetime.now().strftime("%Y%m%d%H%M%S%f")}</ReferenceID>
</Sender>
<Receiver>
<LogicalID>364131937</LogicalID>
<ID>Cisco</ID>
</Receiver>
<CreationDateTime>{datetime.now().strftime("%Y-%m-%d")}</CreationDateTime>
</ApplicationArea>
<DataArea>
<Get>
<Expression>token</Expression>
</Get>
<Catalog>
<CatalogHeader>
<Extension>
{attributes_xml}
</Extension>
</CatalogHeader>
<CatalogLine>
{items_xml}
</CatalogLine>
</Catalog>
</DataArea>
</GetCatalog>"""
def _build_get_mapped_service_xml(
self,
skus: list[str],
price_list: str,
service_program: str | None = None,
service_level: str | None = None,
) -> str:
"""Build XML request payload for Get Mapped Service API."""
items_xml = "\n".join(
f" <Item><ID>{sku}</ID></Item>" for sku in skus
)
# Build attribute IDs
attr_lines = [f' <ID typeCode="pricelistShortName">{price_list}</ID>']
if service_program:
attr_lines.append(f' <ID typeCode="ServiceProgram">{service_program}</ID>')
else:
attr_lines.append(' <ID typeCode="ServiceProgram">All</ID>')
if service_level:
attr_lines.append(f' <ID typeCode="ServiceLevel">{service_level}</ID>')
attributes_xml = "\n".join(attr_lines)
return f"""<?xml version="1.0" encoding="UTF-8"?>
<GetCatalog xmlns="http://www.openapplications.org/oagis/10">
<ApplicationArea>
<Sender>
<LogicalID>CiscoCatalogMCP</LogicalID>
<ComponentID>MCP-Server</ComponentID>
<ReferenceID>mcp-svc-{datetime.now().strftime("%Y%m%d%H%M%S%f")}</ReferenceID>
</Sender>
<Receiver>
<LogicalID>364131937</LogicalID>
<ID>Cisco</ID>
</Receiver>
<CreationDateTime>{datetime.now().strftime("%Y-%m-%d")}</CreationDateTime>
</ApplicationArea>
<DataArea>
<Get>
<Expression>token</Expression>
</Get>
<Catalog>
<CatalogHeader>
<Extension>
{attributes_xml}
</Extension>
</CatalogHeader>
<CatalogLine>
{items_xml}
</CatalogLine>
</Catalog>
</DataArea>
</GetCatalog>"""
def _parse_item_response(self, data: dict[str, Any]) -> list[dict[str, Any]]:
"""Parse the Get Item Information API response."""
results = []
try:
show_catalog = data.get("ShowCatalog", data)
data_area = show_catalog.get("DataArea", {})
# Check for errors at message level
show = data_area.get("Show", {})
if show:
response_criteria = show.get("ResponseCriteria", {})
change_status = response_criteria.get("ChangeStatus", {})
status_code = change_status.get("Code", "")
reason = change_status.get("Reason", "")
if status_code and status_code.startswith("CC003"):
error_msg = ERROR_CODES.get(status_code, change_status.get("Description", "Unknown error"))
raise CiscoAPIError(status_code, error_msg)
catalog = data_area.get("Catalog", {})
catalog_lines = catalog.get("CatalogLine", [])
if not isinstance(catalog_lines, list):
catalog_lines = [catalog_lines]
for line in catalog_lines:
if not line:
continue
item = line.get("Item", {})
result = self._extract_item_data(item, line)
results.append(result)
except CiscoAPIError:
raise
except Exception as e:
logger.exception("Failed to parse API response")
results.append({"error": f"Failed to parse response: {str(e)}"})
return results
def _extract_item_data(self, item: dict[str, Any], line: dict[str, Any]) -> dict[str, Any]:
"""Extract item data from a catalog line."""
result: dict[str, Any] = {
"sku": self._get_text(item.get("ID")),
"description": self._get_text(item.get("Description")),
"upc_id": self._get_text(item.get("UPCID")),
}
# Extract classification info
classification = item.get("Classification", {})
if classification:
result.update(self._extract_classification(classification))
# Extract dimensions
dimensions = item.get("Dimensions", {})
if dimensions:
result.update(self._extract_dimensions(dimensions))
# Extract pricing info
item_price = line.get("ItemPrice", {})
if item_price:
result.update(self._extract_pricing(item_price))
# Extract extension attributes
extensions = item.get("Extension", [])
if extensions:
result.update(self._extract_extensions(extensions))
# Extract line-level extensions (additional attributes)
line_extensions = line.get("Extension", [])
if line_extensions:
result.update(self._extract_extensions(line_extensions))
return result
def _extract_classification(self, classification: dict[str, Any]) -> dict[str, Any]:
"""Extract classification data."""
result: dict[str, Any] = {}
ids = classification.get("ID", [])
if not isinstance(ids, list):
ids = [ids]
for id_entry in ids:
if isinstance(id_entry, dict):
type_code = id_entry.get("@typeCode", "")
value = self._get_text(id_entry)
if type_code and value:
# Convert to snake_case
key = self._to_snake_case(type_code)
result[key] = value
# Extract UNSPSC code
unspsc = classification.get("UNSPSCCode")
if unspsc:
result["unspsc_code"] = self._get_text(unspsc)
if isinstance(unspsc, dict):
result["unspsc_version"] = unspsc.get("@listVersionID")
return result
def _extract_dimensions(self, dimensions: dict[str, Any]) -> dict[str, Any]:
"""Extract physical dimension data."""
result: dict[str, Any] = {}
dimension_fields = [
("WidthMeasure", "width"),
("LengthMeasure", "length"),
("HeightMeasure", "height"),
("GrossWeightMeasure", "weight"),
]
for field, key in dimension_fields:
measure = dimensions.get(field)
if measure:
result[key] = self._get_text(measure)
if isinstance(measure, dict):
result[f"{key}_unit"] = measure.get("@unitCode")
return result
def _extract_pricing(self, item_price: dict[str, Any]) -> dict[str, Any]:
"""Extract pricing data."""
result: dict[str, Any] = {}
# Handle both single and multiple price entries
if isinstance(item_price, list):
item_price = item_price[0] if item_price else {}
# Get price list info
price_ids = item_price.get("ID", [])
if not isinstance(price_ids, list):
price_ids = [price_ids]
for pid in price_ids:
if isinstance(pid, dict):
type_code = pid.get("@typeCode", "")
if type_code == "PriceBookShortName":
result["price_list"] = self._get_text(pid)
# Get unit price
unit_price = item_price.get("UnitPrice", {})
unit_amount = unit_price.get("UnitAmount", {})
if unit_amount:
result["list_price"] = self._get_text(unit_amount)
if isinstance(unit_amount, dict):
result["currency"] = unit_amount.get("@currencyCode")
# Get discount info
price_break = item_price.get("PriceBreak", {})
if price_break:
discount = price_break.get("DiscountPercent")
if discount:
result["discount_percent"] = self._get_text(discount)
# Get price-related extensions
extensions = item_price.get("Extension", [])
if extensions:
result.update(self._extract_extensions(extensions))
return result
def _extract_extensions(self, extensions: Any) -> dict[str, Any]:
"""Extract data from extension elements."""
result: dict[str, Any] = {}
if not isinstance(extensions, list):
extensions = [extensions]
for ext in extensions:
if not isinstance(ext, dict):
continue
# Handle ID elements within extension
ids = ext.get("ID", [])
if not isinstance(ids, list):
ids = [ids]
for id_entry in ids:
if isinstance(id_entry, dict):
type_code = id_entry.get("@typeCode", "")
value = self._get_text(id_entry)
if type_code and value:
key = self._to_snake_case(type_code)
# Handle multiple values for same key
if key in result:
existing = result[key]
if isinstance(existing, list):
existing.append(value)
else:
result[key] = [existing, value]
else:
result[key] = value
# Handle ValueText elements (often for EOL dates)
value_texts = ext.get("ValueText", [])
if not isinstance(value_texts, list):
value_texts = [value_texts]
for vt in value_texts:
if isinstance(vt, dict):
type_code = vt.get("@typeCode", "")
value = self._get_text(vt)
if type_code and value:
key = self._to_snake_case(type_code)
result[key] = value
return result
def _get_text(self, value: Any) -> str | None:
"""Extract text value from various response formats."""
if value is None:
return None
if isinstance(value, str):
return value
if isinstance(value, dict):
# Common patterns in Cisco API responses
return value.get("#text") or value.get("$") or value.get("value") or value.get("Value")
return str(value)
def _to_snake_case(self, name: str) -> str:
"""Convert CamelCase to snake_case."""
import re
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()
async def _make_request(
self,
url: str,
payload: str,
content_type: str = "application/xml",
) -> dict[str, Any]:
"""Make an authenticated request to the Cisco API."""
token = await self._get_access_token()
async with httpx.AsyncClient(timeout=self.settings.request_timeout) as client:
response = await client.post(
url,
content=payload,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": content_type,
"Accept": "application/json",
},
)
# Handle token expiration
if response.status_code == 401:
logger.warning("Token expired, refreshing...")
self._clear_token()
token = await self._get_access_token()
response = await client.post(
url,
content=payload,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": content_type,
"Accept": "application/json",
},
)
response.raise_for_status()
return response.json()
async def get_item_information(
self,
skus: list[str],
price_list: str | None = None,
attributes: list[str] | None = None,
) -> list[dict[str, Any]]:
"""
Get detailed information for one or more SKUs.
Args:
skus: List of SKUs to look up (max 1000)
price_list: Price list code (default: from settings)
attributes: Specific attributes to request (default: all)
Returns:
List of item information dictionaries
"""
if len(skus) > self.settings.max_skus_per_request:
raise ValueError(f"Maximum {self.settings.max_skus_per_request} SKUs per request")
price_list = price_list or self.settings.cisco_price_list
xml_payload = self._build_get_item_info_xml(skus, price_list, attributes)
logger.info(f"Fetching item info for {len(skus)} SKU(s)")
data = await self._make_request(self.settings.cisco_catalog_url, xml_payload)
return self._parse_item_response(data)
async def get_mapped_services(
self,
skus: list[str],
price_list: str | None = None,
service_program: str | None = None,
service_level: str | None = None,
) -> list[dict[str, Any]]:
"""
Get mapped services for one or more SKUs.
Args:
skus: List of SKUs to look up
price_list: Price list code (default: from settings)
service_program: Filter by service program (default: All)
service_level: Filter by service level
Returns:
List of service mapping information
"""
price_list = price_list or self.settings.cisco_price_list
xml_payload = self._build_get_mapped_service_xml(
skus, price_list, service_program, service_level
)
logger.info(f"Fetching mapped services for {len(skus)} SKU(s)")
data = await self._make_request(self.settings.cisco_mapped_service_url, xml_payload)
return self._parse_mapped_service_response(data)
def _parse_mapped_service_response(self, data: dict[str, Any]) -> list[dict[str, Any]]:
"""Parse the Get Mapped Service API response."""
results = []
try:
show_catalog = data.get("ShowCatalog", data)
data_area = show_catalog.get("DataArea", {})
catalog = data_area.get("Catalog", {})
catalog_lines = catalog.get("CatalogLine", [])
if not isinstance(catalog_lines, list):
catalog_lines = [catalog_lines]
for line in catalog_lines:
if not line:
continue
item = line.get("Item", {})
sku = self._get_text(item.get("ID"))
# Extract service mappings from extensions
extensions = item.get("Extension", [])
if not isinstance(extensions, list):
extensions = [extensions]
services = []
for ext in extensions:
if not isinstance(ext, dict):
continue
service_info: dict[str, Any] = {}
ids = ext.get("ID", [])
if not isinstance(ids, list):
ids = [ids]
for id_entry in ids:
if isinstance(id_entry, dict):
type_code = id_entry.get("@typeCode", "")
value = self._get_text(id_entry)
if type_code and value:
key = self._to_snake_case(type_code)
service_info[key] = value
# Get price if available
amounts = ext.get("Amount", [])
if not isinstance(amounts, list):
amounts = [amounts]
for amt in amounts:
if isinstance(amt, dict):
price_list_code = amt.get("@typeCode", "")
price = self._get_text(amt)
if price:
service_info["price"] = price
service_info["price_list"] = price_list_code
if service_info:
services.append(service_info)
results.append({
"sku": sku,
"services": services,
})
except Exception as e:
logger.exception("Failed to parse mapped service response")
results.append({"error": f"Failed to parse response: {str(e)}"})
return results