Skip to main content
Glama
sources.py6.66 kB
"""Helpers for listing configured sources across products and groups. This module handles two types of sources: 1. Regular sources from /api/v1/m/{group_id}/system/inputs (via SDK) 2. Collector sources (SavedJobs) from /api/v1/m/{group_id}/lib/jobs (via HTTP) Collector sources are "Saved Jobs" of type "collection" that pull data from external systems like S3, REST APIs, databases, etc. """ import logging from typing import Any from cribl_control_plane import CriblControlPlane from cribl_control_plane.models.productscore import ProductsCore from cribl_control_plane.models.security import Security from fastmcp import Context from ..models.collectors import filter_collector_jobs from .common import ( CollectionContext, HttpCollectionContext, ProductResult, build_group_entry, build_success_result, collect_items_via_http, collect_items_via_sdk, ) logger = logging.getLogger("snc_cribl_mcp.operations.sources") # Endpoint path for collector sources (Saved Jobs) JOBS_ENDPOINT = "lib/jobs" async def collect_product_sources( client: CriblControlPlane, *, product: ProductsCore, timeout_ms: int, ctx: Context, security: Security | None = None, ) -> ProductResult: """Fetch all configured sources for all groups of a product. This function collects both regular sources (from /system/inputs) and collector sources (from /lib/jobs) and merges them into a single result. Args: client: The Cribl Control Plane client. product: The product type (Stream or Edge). timeout_ms: Request timeout in milliseconds. ctx: FastMCP context for logging. security: Security configuration with bearer token (required for collector sources). Returns: Standard result dictionary with grouped source items. Each group contains both regular sources and collector sources (filtered from SavedJobs). """ coll_ctx = CollectionContext( client=client, product=product, timeout_ms=timeout_ms, ctx=ctx, resource_type="sources", ) # Collect regular sources via SDK regular_result = await collect_items_via_sdk(coll_ctx, client.sources.list_async) # If security is not provided, return only regular sources if security is None: await ctx.warning("No security context provided; skipping collector sources.") return regular_result # Collect collector sources via HTTP, gracefully handling failures try: collector_result = await _collect_collector_sources(coll_ctx, security) except Exception as exc: # noqa: BLE001 - graceful degradation on collector failure await ctx.warning(f"Failed to fetch collector sources: {exc}; returning regular sources only.") return regular_result # Merge results return _merge_source_results(regular_result, collector_result) async def _collect_collector_sources( coll_ctx: CollectionContext, security: Security, ) -> ProductResult: """Collect collector sources (SavedJobs) for all groups of a product. Args: coll_ctx: Collection context with client, product, timeout, ctx. security: Security configuration with bearer token. Returns: Standard result dictionary with grouped collector items. """ http_ctx = HttpCollectionContext( coll_ctx=CollectionContext( client=coll_ctx.client, product=coll_ctx.product, timeout_ms=coll_ctx.timeout_ms, ctx=coll_ctx.ctx, resource_type="collector_sources", ), security=security, endpoint_path=JOBS_ENDPOINT, ) result = await collect_items_via_http(http_ctx) # Filter jobs to only include collectors (type='collection') if result.get("status") == "ok" and "groups" in result: filtered_groups: list[dict[str, Any]] = [] for group in result["groups"]: items = group.get("items", []) collector_items = filter_collector_jobs(items) filtered_groups.append( build_group_entry( group["group_id"], collector_items, reported_count=len(collector_items), ) ) result["groups"] = filtered_groups result["total_count"] = sum(g.get("count", 0) for g in filtered_groups) return result def _merge_source_results( regular_result: ProductResult, collector_result: ProductResult, ) -> ProductResult: """Merge regular sources and collector sources into a single result. Args: regular_result: Result from regular sources collection. collector_result: Result from collector sources collection. Returns: Merged result with both regular and collector sources per group. """ # Handle edge cases where one result failed if regular_result.get("status") != "ok": # If regular sources failed, return collector result with error context if collector_result.get("status") == "ok": return collector_result return regular_result if collector_result.get("status") != "ok": # If collectors failed, return regular sources only return regular_result # Build a map of group_id -> items for collector sources collector_by_group: dict[str, list[dict[str, Any]]] = {} for group in collector_result.get("groups", []): group_id = group.get("group_id") if group_id: collector_by_group[group_id] = group.get("items", []) # Merge collector sources into regular source groups merged_groups: list[dict[str, Any]] = [] seen_group_ids: set[str] = set() for group in regular_result.get("groups", []): group_id = group.get("group_id") seen_group_ids.add(group_id) regular_items = group.get("items", []) collector_items = collector_by_group.get(group_id, []) # Combine items and update count combined_items = regular_items + collector_items merged_groups.append( build_group_entry( group_id, combined_items, reported_count=len(combined_items), ) ) # Add any groups that only have collector sources (shouldn't normally happen) for group_id, items in collector_by_group.items(): if group_id not in seen_group_ids: merged_groups.append(build_group_entry(group_id, items, reported_count=len(items))) return build_success_result([], is_grouped=True, groups=merged_groups) __all__ = ["collect_product_sources"]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/atree1023/snc-cribl-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server