from __future__ import annotations
from collections.abc import Mapping
from pathlib import Path
from typing import Any
import ssl
import httpx
from fastmcp import FastMCP
from prance import ResolvingParser
from prance.util import resolver as prance_resolver
SCHEMA_REF_PREFIX = "#/components/schemas/"
DEFAULT_V1_API_BASE_URL = "https://api.bitsighttech.com/v1"
DEFAULT_V2_API_BASE_URL = "https://api.bitsighttech.com/v2"
def _get_schema_definitions(spec: Any) -> Mapping[str, Any]:
if not isinstance(spec, Mapping):
return {}
components = spec.get("components")
if not isinstance(components, Mapping):
return {}
schemas = components.get("schemas")
if not isinstance(schemas, Mapping):
return {}
return schemas
def _iter_api_responses(spec: Any):
if not isinstance(spec, Mapping):
return
paths = spec.get("paths")
if not isinstance(paths, Mapping):
return
for path_item in paths.values():
if not isinstance(path_item, Mapping):
continue
for operation in path_item.values():
if not isinstance(operation, Mapping):
continue
responses = operation.get("responses")
if isinstance(responses, Mapping):
yield responses
def _schema_description(schemas: Mapping[str, Any], ref: str) -> str:
schema_name = ref.split("/")[-1]
schema = schemas.get(schema_name)
if isinstance(schema, Mapping):
maybe_description = schema.get("description")
if isinstance(maybe_description, str):
return maybe_description
return ""
def _convert_response(response: Mapping[str, Any], schemas: Mapping[str, Any]) -> Mapping[str, Any] | None:
if "content" in response:
return None
ref = response.get("$ref")
if not (isinstance(ref, str) and ref.startswith(SCHEMA_REF_PREFIX)):
return None
description = _schema_description(schemas, ref)
return {
"description": description,
"content": {"application/json": {"schema": {"$ref": ref}}},
}
def _wrap_schema_responses(spec: Any) -> None:
schemas = _get_schema_definitions(spec)
for responses in _iter_api_responses(spec):
for status, response in responses.items():
if not isinstance(response, Mapping):
continue
replacement = _convert_response(response, schemas)
if replacement is not None:
responses[status] = replacement
def _load_api_spec(path: str) -> Any:
parser = ResolvingParser(
str(Path(path).resolve()),
strict=True,
resolve_types=prance_resolver.RESOLVE_FILES | prance_resolver.RESOLVE_HTTP,
)
specification = parser.specification
_wrap_schema_responses(specification)
return specification
def _create_client(
base_url: str, api_key: str, *, verify: bool | str = True
) -> httpx.AsyncClient:
verify_option: bool | ssl.SSLContext
if isinstance(verify, str):
context = ssl.create_default_context(cafile=verify)
tls_version = getattr(ssl, "TLSVersion", None)
if tls_version is not None:
context.minimum_version = tls_version.TLSv1_2
else: # pragma: no cover - fallback for older Python versions
context.options |= getattr(ssl, "OP_NO_TLSv1", 0)
context.options |= getattr(ssl, "OP_NO_TLSv1_1", 0)
verify_option = context
else:
verify_option = verify
client_kwargs: dict[str, Any] = {
"base_url": base_url,
"auth": (api_key, ""),
"headers": {"Accept": "application/json"},
"timeout": 30.0,
"verify": verify_option,
}
return httpx.AsyncClient(**client_kwargs)
def create_v1_api_server(
api_key: str,
*,
verify: bool | str = True,
base_url: str = DEFAULT_V1_API_BASE_URL,
) -> FastMCP:
"""Build the BitSight v1 FastMCP server."""
spec = _load_api_spec("apis/bitsight.v1.schema.json")
client = _create_client(base_url, api_key, verify=verify)
return FastMCP.from_openapi(
openapi_spec=spec,
client=client,
name="BitSight-v1-API",
)
def create_v2_api_server(
api_key: str,
*,
verify: bool | str = True,
base_url: str = DEFAULT_V2_API_BASE_URL,
) -> FastMCP:
"""Build the BitSight v2 FastMCP server."""
spec = _load_api_spec("apis/bitsight.v2.schema.json")
client = _create_client(base_url, api_key, verify=verify)
return FastMCP.from_openapi(
openapi_spec=spec,
client=client,
name="BitSight-v2-API",
)
__all__ = ["create_v1_api_server", "create_v2_api_server"]