Skip to main content
Glama
boecht

BitSight Community MCP Server

by boecht
subscription.py7.02 kB
from __future__ import annotations import json import logging from collections.abc import Sequence from typing import Any, NamedTuple from fastmcp import Context from birre.domain.common import CallV1Tool from birre.infrastructure.logging import BoundLogger class SubscriptionAttempt(NamedTuple): """Result of attempting to ensure a BitSight subscription.""" success: bool created: bool already_subscribed: bool message: str | None = None def _extract_guid_values(response: dict[str, Any], keys: Sequence[str]) -> list[str]: """Collect GUID strings from lists contained in the response.""" guids: list[str] = [] for key in keys: value = response.get(key) if not isinstance(value, list): continue for item in value: if isinstance(item, str): guids.append(item) elif isinstance(item, dict): guid_value = item.get("guid") if isinstance(guid_value, str): guids.append(guid_value) return guids def _build_subscription_payload( folder_name: str | None, subscription_type: str | None, ) -> dict[str, str | list[str]] | None: """Create the base payload for subscription actions when configured.""" if not folder_name or not subscription_type: return None return {"folder": [folder_name], "type": subscription_type} async def _log_bulk_response( ctx: Context, result: Any, action: str, *, debug_enabled: bool ) -> None: """Emit debug logging for bulk subscription responses when enabled.""" if not debug_enabled: return if isinstance(result, dict): try: pretty = json.dumps(result, indent=2, sort_keys=True) except TypeError: pretty = str(result) else: pretty = str(result) await ctx.info(f"manageSubscriptionsBulk({action}) raw response: {pretty}") async def _handle_bulk_errors( ctx: Context, errors: Any, guid: str ) -> SubscriptionAttempt | None: """Interpret the errors section from the bulk subscription response.""" if not isinstance(errors, list): return None for error in errors: if not isinstance(error, dict): continue error_guid = error.get("guid") message = str(error.get("message") or "") normalized_message = message.lower() if error_guid and error_guid != guid: continue if "already exists" in normalized_message: await ctx.info( f"Company {guid} already subscribed according to bulk response" ) return SubscriptionAttempt(True, False, True, message or None) if len(errors) > 0: message = f"FastMCP bulk subscription reported errors: {errors}" await ctx.error(message) return SubscriptionAttempt(False, False, False, message) return None async def _interpret_manage_subscription_response( ctx: Context, result: Any, guid: str ) -> SubscriptionAttempt: """Translate the FastMCP bulk response into a SubscriptionAttempt.""" if not isinstance(result, dict): message = ( f"Unexpected response while managing subscription via FastMCP: {result}" ) await ctx.error(message) return SubscriptionAttempt(False, False, False, message) added_guids = set(_extract_guid_values(result, ("added", "add"))) if guid in added_guids: await ctx.info( f"Created temporary subscription for company {guid} using bulk API" ) return SubscriptionAttempt(True, True, False) attempt = await _handle_bulk_errors(ctx, result.get("errors"), guid) if attempt is not None: return attempt modified_guids = set(_extract_guid_values(result, ("modified",))) if guid in modified_guids: await ctx.info( f"Subscription for company {guid} already active (reported as modified)" ) return SubscriptionAttempt(True, False, True) await ctx.info( f"No add/modify/errors reported for {guid}; assuming already subscribed" ) return SubscriptionAttempt(True, False, True) async def create_ephemeral_subscription( call_v1_tool: CallV1Tool, ctx: Context, guid: str, *, logger: BoundLogger, default_folder: str | None, subscription_type: str | None, debug_enabled: bool, ) -> SubscriptionAttempt: """Guarantee that the target company is subscribed before fetching data.""" try: await ctx.info(f"Ensuring BitSight subscription for company: {guid}") subscription_base = _build_subscription_payload( default_folder, subscription_type ) if not subscription_base: message = ( "Subscription settings are missing. Configure subscription_folder and " "subscription_type via CLI or configuration." ) await ctx.error(message) return SubscriptionAttempt(False, False, False, message) subscription_payload = {"add": [{**subscription_base, "guid": guid}]} result = await call_v1_tool( "manageSubscriptionsBulk", ctx, subscription_payload ) await _log_bulk_response(ctx, result, "add", debug_enabled=debug_enabled) return await _interpret_manage_subscription_response(ctx, result, guid) except Exception as exc: # pragma: no cover - defensive logging message = f"Failed to ensure subscription for {guid}: {exc}" await ctx.error(message) exc_info = exc if logging.getLogger().isEnabledFor(logging.DEBUG) else False logger.error( "subscription.ensure_failed", guid=guid, exc_info=exc_info, ) return SubscriptionAttempt(False, False, False, message) async def cleanup_ephemeral_subscription( call_v1_tool: CallV1Tool, ctx: Context, guid: str, *, debug_enabled: bool, ) -> bool: """Revoke a temporary subscription once the data has been retrieved.""" try: await ctx.info(f"Cleaning up ephemeral subscription for company: {guid}") delete_payload = {"delete": [{"guid": guid}]} result = await call_v1_tool("manageSubscriptionsBulk", ctx, delete_payload) await _log_bulk_response(ctx, result, "delete", debug_enabled=debug_enabled) if isinstance(result, dict) and result.get("errors"): await ctx.error( f"Failed to delete subscription via FastMCP: {result['errors']}" ) return False await ctx.info("Issued FastMCP delete request for ephemeral subscription") return True except Exception as exc: # pragma: no cover - defensive logging await ctx.error(f"Failed to cleanup ephemeral subscription for {guid}: {exc}") return False __all__ = [ "SubscriptionAttempt", "create_ephemeral_subscription", "cleanup_ephemeral_subscription", ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boecht/bitsight-community-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server