server.py•1.48 kB
"""FastMCP server exposing SAP Commerce OCC discovery tools."""
from __future__ import annotations
from contextlib import asynccontextmanager
from pathlib import Path
from typing import AsyncIterator
from fastmcp import FastMCP
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from .occ_client import occ_client
from .settings import settings
from .tools import register_all
DOCS_PATH = Path(__file__).resolve().parent.parent / "docs" / "api-docs.json"
@asynccontextmanager
async def lifespan(_: FastMCP) -> AsyncIterator[None]:
try:
yield
finally:
await occ_client.aclose()
mcp = FastMCP(
"sap-commerce-occ",
instructions="Interact with SAP Commerce OCC discovery endpoints.",
lifespan=lifespan,
)
# Register tool collections
register_all(mcp)
@mcp.resource("resource://occ/docs", name="occ/docs", description="SAP Commerce OCC OpenAPI reference location.")
async def occ_docs_resource() -> dict[str, str]:
return {
"description": "Official SAP Commerce OCC OpenAPI specification for discovery endpoints.",
"path": str(DOCS_PATH),
}
@mcp.custom_route("/health", methods=["GET"])
async def health_route(_: Request) -> Response:
return JSONResponse({"status": "ok", "base_url": str(settings.occ_base_url)})
def run_stdio() -> None:
"""Run the server using stdio transport."""
mcp.run(transport="stdio")
if __name__ == "__main__":
run_stdio()