Skip to main content
Glama
boecht

BitSight Community MCP Server

by boecht
test_subscription_helpers.py4.58 kB
import asyncio from typing import Any import pytest from fastmcp import Context from birre.domain.subscription import ( SubscriptionAttempt, cleanup_ephemeral_subscription, create_ephemeral_subscription, ) class StubContext(Context): def __init__(self) -> None: self.messages: dict[str, list[str]] = {"info": [], "warning": [], "error": []} self.metadata = {} self.tool = "subscription" self._request_id = "sub-test" async def info(self, message: str) -> None: # type: ignore[override] await asyncio.sleep(0) self.messages["info"].append(message) async def warning(self, message: str) -> None: # type: ignore[override] await asyncio.sleep(0) self.messages["warning"].append(message) async def error(self, message: str) -> None: # type: ignore[override] await asyncio.sleep(0) self.messages["error"].append(message) @property def request_id(self) -> str: # type: ignore[override] return self._request_id @property def call_id(self) -> str: # type: ignore[override] return self._request_id @pytest.mark.asyncio async def test_create_ephemeral_subscription_success( monkeypatch: pytest.MonkeyPatch, ) -> None: ctx = StubContext() async def call_v1(name: str, ctx: Context, payload: dict[str, Any]): await asyncio.sleep(0) assert name == "manageSubscriptionsBulk" assert payload["add"][0]["guid"] == "guid-1" return {"added": ["guid-1"]} attempt = await create_ephemeral_subscription( call_v1, ctx, "guid-1", logger=_logdummy(), default_folder="API", subscription_type="continuous_monitoring", debug_enabled=False, ) assert attempt == SubscriptionAttempt(True, True, False, None) assert not ctx.messages["error"] @pytest.mark.asyncio async def test_create_ephemeral_subscription_already_exists( monkeypatch: pytest.MonkeyPatch, ) -> None: ctx = StubContext() async def call_v1(name: str, ctx: Context, payload: dict[str, Any]): await asyncio.sleep(0) return {"errors": [{"guid": "guid-1", "message": "Already exists"}]} attempt = await create_ephemeral_subscription( call_v1, ctx, "guid-1", logger=_logdummy(), default_folder="API", subscription_type="continuous_monitoring", debug_enabled=False, ) assert attempt == SubscriptionAttempt(True, False, True, "Already exists") @pytest.mark.asyncio async def test_create_ephemeral_subscription_missing_config( monkeypatch: pytest.MonkeyPatch, ) -> None: ctx = StubContext() async def call_v1(name: str, ctx: Context, payload: dict[str, Any]): await asyncio.sleep(0) raise AssertionError("call_v1 should not be invoked when config missing") attempt = await create_ephemeral_subscription( call_v1, ctx, "guid-1", logger=_logdummy(), default_folder=None, subscription_type=None, debug_enabled=False, ) assert not attempt.success assert attempt.message == ( "Subscription settings are missing. " "Configure subscription_folder and subscription_type via CLI or configuration." ) assert ctx.messages["error"] @pytest.mark.asyncio async def test_cleanup_ephemeral_subscription_errors( monkeypatch: pytest.MonkeyPatch, ) -> None: ctx = StubContext() async def call_v1(name: str, ctx: Context, payload: dict[str, Any]): await asyncio.sleep(0) return {"errors": ["boom"]} result = await cleanup_ephemeral_subscription( call_v1, ctx, "guid-1", debug_enabled=False, ) assert result is False assert ctx.messages["error"] @pytest.mark.asyncio async def test_cleanup_ephemeral_subscription_success( monkeypatch: pytest.MonkeyPatch, ) -> None: ctx = StubContext() async def call_v1(name: str, ctx: Context, payload: dict[str, Any]): await asyncio.sleep(0) return {"deleted": ["guid-1"]} result = await cleanup_ephemeral_subscription( call_v1, ctx, "guid-1", debug_enabled=False, ) assert result is True def _logdummy(): class _Logger: def info(self, *args, **kwargs): """No-op info logging stub used for dependency-free tests.""" def error(self, *args, **kwargs): """No-op error logging stub used for dependency-free tests.""" return _Logger()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boecht/bitsight-community-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server