Skip to main content
Glama
test_pipelines.py13.9 kB
"""Unit tests for pipeline collection helpers. Covers success paths, 404 handling per-group, and error propagation. Uses HTTP collection (like breakers/lookups) to preserve function configs. """ # pyright: reportPrivateUsage=false from unittest.mock import AsyncMock, MagicMock import httpx import pytest from cribl_control_plane.errors import CriblControlPlaneError from cribl_control_plane.models.productscore import ProductsCore from cribl_control_plane.models.security import Security from fastmcp import Context from snc_cribl_mcp.operations.common import serialize_model from snc_cribl_mcp.operations.pipelines import collect_product_pipelines @pytest.fixture def mock_ctx() -> Context: """Provide a Context-like AsyncMock for tool logging.""" ctx = MagicMock(spec=Context) ctx.info = AsyncMock() ctx.warning = AsyncMock() return ctx @pytest.fixture def mock_security() -> Security: """Provide a mock Security object with bearer token.""" return Security(bearer_auth="test-token") class TestSerializeModel: """Tests for the serialize_model helper function.""" def test_serialize_model_with_pydantic_model(self) -> None: """Objects with model_dump should be serialized correctly.""" mock_obj = MagicMock() mock_obj.model_dump.return_value = {"id": "test", "name": "pipeline1"} result = serialize_model(mock_obj) assert result == {"id": "test", "name": "pipeline1"} mock_obj.model_dump.assert_called_once_with(mode="json", exclude_none=True) def test_serialize_model_without_model_dump(self) -> None: """Objects without model_dump should return empty dict.""" obj = object() result = serialize_model(obj) assert result == {} def test_serialize_model_raises_type_error(self) -> None: """Objects where model_dump raises TypeError should return empty dict.""" mock_obj = MagicMock() mock_obj.model_dump.side_effect = TypeError("Invalid type") result = serialize_model(mock_obj) assert result == {} @pytest.mark.asyncio async def test_collect_product_pipelines_success(mock_ctx: Context, mock_security: Security) -> None: """It should list pipelines for each group via HTTP and aggregate results.""" # Mock groups list mock_client = MagicMock() groups_response = MagicMock(items=[MagicMock(), MagicMock()]) groups_response.items[0].model_dump.return_value = {"id": "g1"} groups_response.items[1].model_dump.return_value = {"id": "g2"} mock_client.groups.list_async = AsyncMock(return_value=groups_response) # Mock sdk_configuration for base URL and async_client mock_client.sdk_configuration = MagicMock(server_url="https://example/api/v1") # Mock HTTP responses for pipelines endpoint mock_http_client = AsyncMock() mock_client.sdk_configuration.async_client = mock_http_client # Create mock responses with function conf data preserved g1_response = MagicMock() g1_response.status_code = 200 g1_response.json.return_value = { "items": [ { "id": "p1", "conf": { "functions": [ { "id": "eval", "filter": "true", "conf": {"add": [{"name": "test", "value": "'value'"}]}, } ] }, }, {"id": "p2", "conf": {"functions": []}}, ], "count": 2, } g1_response.raise_for_status = MagicMock() g2_response = MagicMock() g2_response.status_code = 200 g2_response.json.return_value = { "items": [{"id": "p3", "conf": {"functions": []}}], "count": 1, } g2_response.raise_for_status = MagicMock() async def mock_get(url: str, **kwargs: object) -> MagicMock: if "/m/g1/pipelines" in url: return g1_response return g2_response mock_http_client.get = AsyncMock(side_effect=mock_get) result = await collect_product_pipelines( mock_client, mock_security, product=ProductsCore.STREAM, timeout_ms=10000, ctx=mock_ctx, ) assert result["status"] == "ok" assert result["total_count"] == 3 assert len(result["groups"]) == 2 assert result["groups"][0]["group_id"] == "g1" assert result["groups"][0]["count"] == 2 # Verify function conf data is preserved p1 = result["groups"][0]["items"][0] assert p1["conf"]["functions"][0]["conf"]["add"][0]["name"] == "test" @pytest.mark.asyncio async def test_collect_product_pipelines_preserves_function_conf( mock_ctx: Context, mock_security: Security, ) -> None: """The HTTP collection should preserve all function conf data.""" mock_client = MagicMock() groups_response = MagicMock(items=[MagicMock()]) groups_response.items[0].model_dump.return_value = {"id": "g1"} mock_client.groups.list_async = AsyncMock(return_value=groups_response) mock_client.sdk_configuration = MagicMock(server_url="https://example/api/v1") mock_http_client = AsyncMock() mock_client.sdk_configuration.async_client = mock_http_client # Pipeline with complex function configurations pipeline_data = { "items": [ { "id": "test_pipeline", "conf": { "output": "default", "functions": [ { "id": "regex_extract", "filter": "true", "conf": { "regex": "/ASA-\\d+-(?<__code>\\d+)/", "source": "_raw", }, "description": "Extract ASA Code", }, { "id": "sampling", "filter": "true", "conf": {"rules": [{"filter": "__action=='permitted'", "rate": 10}]}, }, { "id": "mask", "filter": "true", "conf": { "rules": [ { "matchRegex": "/password=[^&]+/", "replaceExpr": "'password=***'", } ], "fields": ["_raw"], }, }, ], }, } ], "count": 1, } response = MagicMock() response.status_code = 200 response.json.return_value = pipeline_data response.raise_for_status = MagicMock() mock_http_client.get = AsyncMock(return_value=response) result = await collect_product_pipelines( mock_client, mock_security, product=ProductsCore.STREAM, timeout_ms=10000, ctx=mock_ctx, ) assert result["status"] == "ok" pipeline = result["groups"][0]["items"][0] # Verify regex_extract conf is preserved regex_func = pipeline["conf"]["functions"][0] assert regex_func["id"] == "regex_extract" assert regex_func["conf"]["regex"] == "/ASA-\\d+-(?<__code>\\d+)/" assert regex_func["conf"]["source"] == "_raw" # Verify sampling conf is preserved sampling_func = pipeline["conf"]["functions"][1] assert sampling_func["id"] == "sampling" assert sampling_func["conf"]["rules"][0]["filter"] == "__action=='permitted'" assert sampling_func["conf"]["rules"][0]["rate"] == 10 # Verify mask conf is preserved mask_func = pipeline["conf"]["functions"][2] assert mask_func["id"] == "mask" assert mask_func["conf"]["rules"][0]["matchRegex"] == "/password=[^&]+/" @pytest.mark.asyncio async def test_collect_product_pipelines_404_per_group(mock_ctx: Context, mock_security: Security) -> None: """404 on a group's pipelines should be treated as empty for that group.""" mock_client = MagicMock() groups_response = MagicMock(items=[MagicMock()]) groups_response.items[0].model_dump.return_value = {"id": "g404"} mock_client.groups.list_async = AsyncMock(return_value=groups_response) mock_client.sdk_configuration = MagicMock(server_url="https://example/api/v1") mock_http_client = AsyncMock() mock_client.sdk_configuration.async_client = mock_http_client response = MagicMock() response.status_code = 404 mock_http_client.get = AsyncMock(return_value=response) result = await collect_product_pipelines( mock_client, mock_security, product=ProductsCore.STREAM, timeout_ms=10000, ctx=mock_ctx, ) assert result["status"] == "ok" assert result["total_count"] == 0 assert result["groups"][0]["group_id"] == "g404" assert result["groups"][0]["count"] == 0 # warning should have been awaited at least once assert getattr(mock_ctx.warning, "await_count", 0) >= 1 @pytest.mark.asyncio async def test_collect_product_pipelines_network_error(mock_ctx: Context, mock_security: Security) -> None: """Network failures should be raised as RuntimeError.""" mock_client = MagicMock() groups_response = MagicMock(items=[MagicMock()]) groups_response.items[0].model_dump.return_value = {"id": "g1"} mock_client.groups.list_async = AsyncMock(return_value=groups_response) mock_client.sdk_configuration = MagicMock(server_url="https://example/api/v1") mock_http_client = AsyncMock() mock_client.sdk_configuration.async_client = mock_http_client mock_http_client.get = AsyncMock(side_effect=httpx.ConnectError("fail")) with pytest.raises(RuntimeError, match="Network error while listing pipelines"): await collect_product_pipelines( mock_client, mock_security, product=ProductsCore.STREAM, timeout_ms=10000, ctx=mock_ctx, ) @pytest.mark.asyncio async def test_collect_product_pipelines_unavailable_product_returns_unavailable( mock_ctx: Context, mock_security: Security, ) -> None: """If listing groups returns 404, the function should return an 'unavailable' status.""" mock_client = MagicMock() api_error_404 = CriblControlPlaneError( message="Not found", body=None, raw_response=MagicMock(status_code=404), ) mock_client.groups.list_async = AsyncMock(side_effect=api_error_404) result = await collect_product_pipelines( mock_client, mock_security, product=ProductsCore.STREAM, timeout_ms=10000, ctx=mock_ctx, ) assert result["status"] == "unavailable" assert result["total_count"] == 0 assert result["groups"] == [] @pytest.mark.asyncio async def test_collect_product_pipelines_network_error_on_groups( mock_ctx: Context, mock_security: Security, ) -> None: """Network error while listing groups should raise RuntimeError.""" mock_client = MagicMock() mock_client.groups.list_async = AsyncMock(side_effect=httpx.ConnectError("Network failure")) with pytest.raises(RuntimeError, match="Network error while listing stream groups"): await collect_product_pipelines( mock_client, mock_security, product=ProductsCore.STREAM, timeout_ms=10000, ctx=mock_ctx, ) @pytest.mark.asyncio async def test_collect_product_pipelines_api_error_non_404_on_groups( mock_ctx: Context, mock_security: Security, ) -> None: """Non-404 API error while listing groups should raise RuntimeError.""" mock_client = MagicMock() api_error_500 = CriblControlPlaneError( message="Server error", body=None, raw_response=MagicMock(status_code=500), ) mock_client.groups.list_async = AsyncMock(side_effect=api_error_500) with pytest.raises(RuntimeError, match="Cribl API error while listing stream groups for pipelines"): await collect_product_pipelines( mock_client, mock_security, product=ProductsCore.STREAM, timeout_ms=10000, ctx=mock_ctx, ) @pytest.mark.asyncio async def test_collect_product_pipelines_skips_groups_without_id( mock_ctx: Context, mock_security: Security, ) -> None: """Groups without id or groupId should be skipped.""" mock_client = MagicMock() groups_response = MagicMock(items=[MagicMock(), MagicMock()]) # First group has no id, second has id groups_response.items[0].model_dump.return_value = {"name": "no_id_group"} groups_response.items[1].model_dump.return_value = {"id": "g1"} mock_client.groups.list_async = AsyncMock(return_value=groups_response) mock_client.sdk_configuration = MagicMock(server_url="https://example/api/v1") mock_http_client = AsyncMock() mock_client.sdk_configuration.async_client = mock_http_client response = MagicMock() response.status_code = 200 response.json.return_value = { "items": [{"id": "p1", "conf": {"functions": []}}], "count": 1, } response.raise_for_status = MagicMock() mock_http_client.get = AsyncMock(return_value=response) result = await collect_product_pipelines( mock_client, mock_security, product=ProductsCore.STREAM, timeout_ms=10000, ctx=mock_ctx, ) # Only one group should be processed (the one with id) assert len(result["groups"]) == 1 assert result["groups"][0]["group_id"] == "g1" assert result["total_count"] == 1

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/atree1023/snc-cribl-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server