Skip to main content
Glama
pipeline_functions.py37.2 kB
"""Pydantic models for Cribl pipeline functions and their configurations. This module defines typed models for all Cribl pipeline function types. Each function has a specific configuration schema defined in docs/pipeline_functions/<id>.json. The models preserve all configuration data during serialization, solving the issue where the SDK's empty FunctionSpecificConfigs class would serialize conf as {}. """ from typing import Annotated, Any import pydantic from pydantic import BaseModel, ConfigDict, Field # ============================================================================= # Common Helper Models # ============================================================================= class EvalField(BaseModel): """A key-value pair for eval operations.""" model_config = ConfigDict(extra="allow", populate_by_name=True) name: str | None = None value: str disabled: bool | None = None class SamplingRule(BaseModel): """A sampling rule with filter and rate.""" model_config = ConfigDict(extra="allow", populate_by_name=True) filter: str = Field(default="true", alias="filter") rate: int = 1 class MaskRule(BaseModel): """A masking rule with regex match and replace expression.""" model_config = ConfigDict(extra="allow", populate_by_name=True) match_regex: str = Field(..., alias="matchRegex") replace_expr: str = Field(default="''", alias="replaceExpr") disabled: bool | None = None class RegexListItem(BaseModel): """An item in a regex list.""" model_config = ConfigDict(extra="allow", populate_by_name=True) regex: str class PatternListItem(BaseModel): """An item in a pattern list for Grok.""" model_config = ConfigDict(extra="allow", populate_by_name=True) pattern: str class TimestampItem(BaseModel): """A timestamp extraction item.""" model_config = ConfigDict(extra="allow", populate_by_name=True) regex: str strptime: str class LookupInField(BaseModel): """A lookup input field mapping.""" model_config = ConfigDict(extra="allow", populate_by_name=True) event_field: str = Field(..., alias="eventField") lookup_field: str | None = Field(default=None, alias="lookupField") class LookupOutField(BaseModel): """A lookup output field mapping.""" model_config = ConfigDict(extra="allow", populate_by_name=True) lookup_field: str = Field(..., alias="lookupField") event_field: str | None = Field(default=None, alias="eventField") default_value: str | None = Field(default=None, alias="defaultValue") class DnsLookupField(BaseModel): """A DNS lookup field configuration.""" model_config = ConfigDict(extra="allow", populate_by_name=True) in_field_name: str | None = Field(default=None, alias="inFieldName") resource_record_type: str | None = Field(default="A", alias="resourceRecordType") out_field_name: str | None = Field(default=None, alias="outFieldName") class ReverseLookupField(BaseModel): """A reverse DNS lookup field configuration.""" model_config = ConfigDict(extra="allow", populate_by_name=True) in_field_name: str | None = Field(default=None, alias="inFieldName") out_field_name: str | None = Field(default=None, alias="outFieldName") class GeoipAdditionalField(BaseModel): """Additional GeoIP field configuration.""" model_config = ConfigDict(extra="allow", populate_by_name=True) extra_in_field: str = Field(..., alias="extraInField") extra_out_field: str = Field(..., alias="extraOutField") class RenameField(BaseModel): """A rename field pair.""" model_config = ConfigDict(extra="allow", populate_by_name=True) current_name: str = Field(..., alias="currentName") new_name: str = Field(..., alias="newName") class RedisCommand(BaseModel): """A Redis command configuration.""" model_config = ConfigDict(extra="allow", populate_by_name=True) command: str key_expr: str = Field(..., alias="keyExpr") out_field: str | None = Field(default=None, alias="outField") args_expr: str | None = Field(default=None, alias="argsExpr") class RootNode(BaseModel): """A Redis cluster root node.""" model_config = ConfigDict(extra="allow", populate_by_name=True) host: str port: int class PublishMetricField(BaseModel): """A publish metrics field configuration.""" model_config = ConfigDict(extra="allow", populate_by_name=True) in_field_name: str = Field(..., alias="inFieldName") out_field_expr: str | None = Field(default=None, alias="outFieldExpr") metric_type: str = Field(default="gauge", alias="metricType") class AggregateMetricItem(BaseModel): """An aggregate metric configuration.""" model_config = ConfigDict(extra="allow", populate_by_name=True) agg: str metric_type: str = Field(default="automatic", alias="metricType") class HeaderField(BaseModel): """A CEF header field.""" model_config = ConfigDict(extra="allow", populate_by_name=True) name: str | None = None value: str class ExtensionField(BaseModel): """A CEF extension field.""" model_config = ConfigDict(extra="allow", populate_by_name=True) name: str value: str class SidLookupField(BaseModel): """A SID lookup field.""" model_config = ConfigDict(extra="allow", populate_by_name=True) name: str | None = None expr: str disabled: bool | None = None class TlsOptions(BaseModel): """TLS connection options.""" model_config = ConfigDict(extra="allow", populate_by_name=True) reject_unauthorized: bool | None = Field(default=True, alias="rejectUnauthorized") servername: str | None = None certificate_name: str | None = Field(default=None, alias="certificateName") ca_path: str | None = Field(default=None, alias="caPath") priv_key_path: str | None = Field(default=None, alias="privKeyPath") cert_path: str | None = Field(default=None, alias="certPath") passphrase: str | None = None min_version: str | None = Field(default=None, alias="minVersion") max_version: str | None = Field(default=None, alias="maxVersion") class TimestampFormat(BaseModel): """Timestamp format configuration for event breaker.""" model_config = ConfigDict(extra="allow", populate_by_name=True) type: str = "auto" length: int | None = None format: str | None = None class V3User(BaseModel): """SNMPv3 user configuration.""" model_config = ConfigDict(extra="allow", populate_by_name=True) name: str | None = None auth_protocol: str | None = Field(default="none", alias="authProtocol") auth_key: str | None = Field(default=None, alias="authKey") priv_protocol: str | None = Field(default="none", alias="privProtocol") priv_key: str | None = Field(default=None, alias="privKey") # ============================================================================= # Function-Specific Configuration Models # ============================================================================= class AggregateMetricsConf(BaseModel): """Configuration for the aggregate_metrics function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) time_window: str = Field(default="10s", alias="timeWindow") aggregations: Annotated[list[AggregateMetricItem], Field(default_factory=list)] groupbys: list[str] | None = None passthrough: bool | None = None preserve_group_bys: bool | None = Field(default=None, alias="preserveGroupBys") sufficient_stats_only: bool | None = Field(default=None, alias="sufficientStatsOnly") prefix: str | None = None flush_event_limit: int | None = Field(default=None, alias="flushEventLimit") flush_mem_limit: str | None = Field(default=None, alias="flushMemLimit") cumulative: bool | None = None should_treat_dots_as_literals: bool | None = Field(default=None, alias="shouldTreatDotsAsLiterals") add: list[EvalField] | None = None flush_on_input_close: bool | None = Field(default=None, alias="flushOnInputClose") lag_tolerance: str | None = Field(default=None, alias="lagTolerance") idle_time_limit: str | None = Field(default=None, alias="idleTimeLimit") class AggregationConf(BaseModel): """Configuration for the aggregation function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) time_window: str = Field(default="10s", alias="timeWindow") aggregations: list[str] = Field(default_factory=list) groupbys: list[str] | None = None passthrough: bool | None = None preserve_group_bys: bool | None = Field(default=None, alias="preserveGroupBys") sufficient_stats_only: bool | None = Field(default=None, alias="sufficientStatsOnly") metrics_mode: bool | None = Field(default=None, alias="metricsMode") prefix: str | None = None flush_event_limit: int | None = Field(default=None, alias="flushEventLimit") flush_mem_limit: str | None = Field(default=None, alias="flushMemLimit") cumulative: bool | None = None search_agg_mode: str | None = Field(default=None, alias="searchAggMode") add: list[EvalField] | None = None should_treat_dots_as_literals: bool | None = Field(default=None, alias="shouldTreatDotsAsLiterals") flush_on_input_close: bool | None = Field(default=None, alias="flushOnInputClose") lag_tolerance: str | None = Field(default=None, alias="lagTolerance") idle_time_limit: str | None = Field(default=None, alias="idleTimeLimit") class AutoTimestampConf(BaseModel): """Configuration for the auto_timestamp function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) src_field: str | None = Field(default="_raw", alias="srcField") dst_field: str | None = Field(default="_time", alias="dstField") default_timezone: str | None = Field(default="local", alias="defaultTimezone") time_expression: str | None = Field(default=None, alias="timeExpression") offset: int | None = None max_len: int | None = Field(default=None, alias="maxLen") default_time: str | None = Field(default="now", alias="defaultTime") latest_date_allowed: str | None = Field(default=None, alias="latestDateAllowed") earliest_date_allowed: str | None = Field(default=None, alias="earliestDateAllowed") spacer: str | None = None timestamps: list[TimestampItem] | None = None class CefConf(BaseModel): """Configuration for the cef function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) output_field: str | None = Field(default="_raw", alias="outputField") header: list[HeaderField] | None = None extension: list[ExtensionField] | None = None class ChainConf(BaseModel): """Configuration for the chain function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) processor: str class CloneConf(BaseModel): """Configuration for the clone function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) clones: list[dict[str, Any]] | None = None class CodeConf(BaseModel): """Configuration for the code function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) code: str | None = None max_num_of_iterations: int | None = Field(default=None, alias="maxNumOfIterations") active_log_sample_rate: int | None = Field(default=None, alias="activeLogSampleRate") use_unique_log_channel: bool | None = Field(default=None, alias="useUniqueLogChannel") class CommentConf(BaseModel): """Configuration for the comment function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) comment: str | None = None class DnsLookupConf(BaseModel): """Configuration for the dns_lookup function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) dns_lookup_fields: list[DnsLookupField] | None = Field(default=None, alias="dnsLookupFields") reverse_lookup_fields: list[ReverseLookupField] | None = Field(default=None, alias="reverseLookupFields") dns_servers: list[str] | None = Field(default=None, alias="dnsServers") cache_ttl: int | None = Field(default=None, alias="cacheTTL") max_cache_size: int | None = Field(default=None, alias="maxCacheSize") use_resolv_conf: bool | None = Field(default=None, alias="useResolvConf") lookup_fallback: bool | None = Field(default=None, alias="lookupFallback") domain_overrides: list[str] | None = Field(default=None, alias="domainOverrides") lookup_fail_log_level: str | None = Field(default=None, alias="lookupFailLogLevel") class DropConf(BaseModel): """Configuration for the drop function (empty schema).""" model_config = ConfigDict(extra="allow", populate_by_name=True) class DropDimensionsConf(BaseModel): """Configuration for the drop_dimensions function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) time_window: str = Field(default="10s", alias="timeWindow") drop_dimensions: list[str] = Field(default_factory=list, alias="dropDimensions") flush_on_input_close: bool | None = Field(default=None, alias="flushOnInputClose") class DynamicSamplingConf(BaseModel): """Configuration for the dynamic_sampling function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) mode: str = "log" key_expr: str = Field(default="`${host}`", alias="keyExpr") sample_period: int | None = Field(default=None, alias="samplePeriod") min_events: int | None = Field(default=None, alias="minEvents") max_sample_rate: int | None = Field(default=None, alias="maxSampleRate") class EvalConf(BaseModel): """Configuration for the eval function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) add: list[EvalField] | None = None keep: list[str] | None = None remove: list[str] | None = None class EventBreakerConf(BaseModel): """Configuration for the event_breaker function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) existing_or_new: str = Field(default="existing", alias="existingOrNew") should_mark_cribl_breaker: bool | None = Field(default=None, alias="shouldMarkCriblBreaker") existing_rule: str | None = Field(default=None, alias="existingRule") rule_type: str | None = Field(default=None, alias="ruleType") max_event_bytes: int | None = Field(default=None, alias="maxEventBytes") timestamp_anchor_regex: str | None = Field(default=None, alias="timestampAnchorRegex") timestamp: TimestampFormat | None = None timestamp_timezone: str | None = Field(default=None, alias="timestampTimezone") timestamp_earliest: str | None = Field(default=None, alias="timestampEarliest") timestamp_latest: str | None = Field(default=None, alias="timestampLatest") event_breaker_regex: str | None = Field(default=None, alias="eventBreakerRegex") json_array_field: str | None = Field(default=None, alias="jsonArrayField") parent_fields_to_copy: list[str] | None = Field(default=None, alias="parentFieldsToCopy") json_extract_all: bool | None = Field(default=None, alias="jsonExtractAll") json_time_field: str | None = Field(default=None, alias="jsonTimeField") delimiter_regex: str | None = Field(default=None, alias="delimiterRegex") fields_line_regex: str | None = Field(default=None, alias="fieldsLineRegex") header_line_regex: str | None = Field(default=None, alias="headerLineRegex") null_field_val: str | None = Field(default=None, alias="nullFieldVal") clean_fields: bool | None = Field(default=None, alias="cleanFields") delimiter: str | None = None quote_char: str | None = Field(default=None, alias="quoteChar") escape_char: str | None = Field(default=None, alias="escapeChar") time_field: str | None = Field(default=None, alias="timeField") class FlattenConf(BaseModel): """Configuration for the flatten function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) fields: list[str] | None = None prefix: str | None = None depth: int | None = None delimiter: str | None = None class FoldkeysConf(BaseModel): """Configuration for the foldkeys function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) delete_original: bool | None = Field(default=None, alias="deleteOriginal") separator: str | None = None selection_reg_exp: str | None = Field(default=None, alias="selectionRegExp") class GeoipConf(BaseModel): """Configuration for the geoip function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) file: str in_field: str | None = Field(default="ip", alias="inField") out_field: str | None = Field(default="geoip", alias="outField") additional_fields: list[GeoipAdditionalField] | None = Field(default=None, alias="additionalFields") out_field_mappings: dict[str, Any] | None = Field(default=None, alias="outFieldMappings") class GrokConf(BaseModel): """Configuration for the grok function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) pattern: str pattern_list: list[PatternListItem] | None = Field(default=None, alias="patternList") source: str | None = None class JsonUnrollConf(BaseModel): """Configuration for the json_unroll function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) path: str name: str | None = None class LookupConf(BaseModel): """Configuration for the lookup function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) file: str db_lookup: bool | None = Field(default=None, alias="dbLookup") match_mode: str | None = Field(default=None, alias="matchMode") match_type: str | None = Field(default=None, alias="matchType") reload_period_sec: int | None = Field(default=None, alias="reloadPeriodSec") in_fields: list[LookupInField] | None = Field(default=None, alias="inFields") out_fields: list[LookupOutField] | None = Field(default=None, alias="outFields") add_to_event: bool | None = Field(default=None, alias="addToEvent") ignore_case: bool | None = Field(default=None, alias="ignoreCase") class MaskConf(BaseModel): """Configuration for the mask function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) rules: Annotated[list[MaskRule], Field(default_factory=list)] fields: list[str] | None = None depth: int | None = None flags: list[EvalField] | None = None class NumerifyConf(BaseModel): """Configuration for the numerify function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) depth: int | None = None ignore_fields: list[str] | None = Field(default=None, alias="ignoreFields") filter_expr: str | None = Field(default=None, alias="filterExpr") format: str | None = None digits: int | None = None class OtlpLogsConf(BaseModel): """Configuration for the otlp_logs function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) drop_non_log_events: bool | None = Field(default=None, alias="dropNonLogEvents") batch_otlp_logs: bool | None = Field(default=None, alias="batchOTLPLogs") send_batch_size: int | None = Field(default=None, alias="sendBatchSize") timeout: int | None = None send_batch_max_size: int | None = Field(default=None, alias="sendBatchMaxSize") metadata_keys: list[str] | None = Field(default=None, alias="metadataKeys") metadata_cardinality_limit: int | None = Field(default=None, alias="metadataCardinalityLimit") class OtlpMetricsConf(BaseModel): """Configuration for the otlp_metrics function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) resource_attribute_prefixes: list[str] | None = Field(default=None, alias="resourceAttributePrefixes") drop_non_metric_events: bool | None = Field(default=None, alias="dropNonMetricEvents") otlp_version: str | None = Field(default=None, alias="otlpVersion") batch_otlp_metrics: bool | None = Field(default=None, alias="batchOTLPMetrics") send_batch_size: int | None = Field(default=None, alias="sendBatchSize") timeout: int | None = None send_batch_max_size: int | None = Field(default=None, alias="sendBatchMaxSize") metadata_keys: list[str] | None = Field(default=None, alias="metadataKeys") metadata_cardinality_limit: int | None = Field(default=None, alias="metadataCardinalityLimit") class OtlpTracesConf(BaseModel): """Configuration for the otlp_traces function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) drop_non_trace_events: bool | None = Field(default=None, alias="dropNonTraceEvents") otlp_version: str | None = Field(default=None, alias="otlpVersion") batch_otlp_traces: bool | None = Field(default=None, alias="batchOTLPTraces") send_batch_size: int | None = Field(default=None, alias="sendBatchSize") timeout: int | None = None send_batch_max_size: int | None = Field(default=None, alias="sendBatchMaxSize") metadata_keys: list[str] | None = Field(default=None, alias="metadataKeys") metadata_cardinality_limit: int | None = Field(default=None, alias="metadataCardinalityLimit") class PublishMetricsConf(BaseModel): """Configuration for the publish_metrics function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) fields: list[PublishMetricField] | None = None overwrite: bool | None = None dimensions: list[str] | None = None remove_metrics: list[str] | None = Field(default=None, alias="removeMetrics") remove_dimensions: list[str] | None = Field(default=None, alias="removeDimensions") class RedisConf(BaseModel): """Configuration for the redis function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) commands: Annotated[list[RedisCommand], Field(default_factory=list)] deployment_type: str | None = Field(default="standalone", alias="deploymentType") auth_type: str | None = Field(default="none", alias="authType") max_block_secs: int | None = Field(default=None, alias="maxBlockSecs") enable_client_side_caching: bool | None = Field(default=None, alias="enableClientSideCaching") url: str | None = None tls_options: TlsOptions | None = Field(default=None, alias="tlsOptions") root_nodes: list[RootNode] | None = Field(default=None, alias="rootNodes") tls: bool | None = None scale_reads: str | None = Field(default=None, alias="scaleReads") master_name: str | None = Field(default=None, alias="masterName") username: str | None = None password: str | None = None credentials_secret: str | None = Field(default=None, alias="credentialsSecret") text_secret: str | None = Field(default=None, alias="textSecret") class RegexExtractConf(BaseModel): """Configuration for the regex_extract function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) regex: str regex_list: list[RegexListItem] | None = Field(default=None, alias="regexList") source: str | None = None iterations: int | None = None field_name_expression: str | None = Field(default=None, alias="fieldNameExpression") overwrite: bool | None = None class RegexFilterConf(BaseModel): """Configuration for the regex_filter function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) regex: str | None = None regex_list: list[RegexListItem] | None = Field(default=None, alias="regexList") field: str | None = None class RenameConf(BaseModel): """Configuration for the rename function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) base_fields: list[str] | None = Field(default=None, alias="baseFields") rename: list[RenameField] | None = None rename_expr: str | None = Field(default=None, alias="renameExpr") wildcard_depth: int | None = Field(default=None, alias="wildcardDepth") class RollupMetricsConf(BaseModel): """Configuration for the rollup_metrics function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) dimensions: list[str] | None = None time_window: str | None = Field(default=None, alias="timeWindow") gauge_rollup: str | None = Field(default=None, alias="gaugeRollup") class SamplingConf(BaseModel): """Configuration for the sampling function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) rules: list[SamplingRule] | None = None class SerdeConf(BaseModel): """Configuration for the serde (parser) function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) mode: str = "extract" type: str = "csv" src_field: str | None = Field(default="_raw", alias="srcField") dst_field: str | None = Field(default=None, alias="dstField") fields: list[str] | None = None keep: list[str] | None = None remove: list[str] | None = None field_filter_expr: str | None = Field(default=None, alias="fieldFilterExpr") clean_fields: bool | None = Field(default=None, alias="cleanFields") allowed_key_chars: list[str] | None = Field(default=None, alias="allowedKeyChars") allowed_value_chars: list[str] | None = Field(default=None, alias="allowedValueChars") delim_char: str | None = Field(default=None, alias="delimChar") quote_char: str | None = Field(default=None, alias="quoteChar") escape_char: str | None = Field(default=None, alias="escapeChar") null_value: str | None = Field(default=None, alias="nullValue") regex: str | None = None regex_list: list[RegexListItem] | None = Field(default=None, alias="regexList") iterations: int | None = None field_name_expression: str | None = Field(default=None, alias="fieldNameExpression") overwrite: bool | None = None pattern: str | None = None pattern_list: list[PatternListItem] | None = Field(default=None, alias="patternList") class SerializeConf(BaseModel): """Configuration for the serialize function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) type: str = "csv" fields: list[str] | None = None src_field: str | None = Field(default=None, alias="srcField") dst_field: str | None = Field(default="_raw", alias="dstField") delim_char: str | None = Field(default=None, alias="delimChar") quote_char: str | None = Field(default=None, alias="quoteChar") escape_char: str | None = Field(default=None, alias="escapeChar") null_value: str | None = Field(default=None, alias="nullValue") clean_fields: bool | None = Field(default=None, alias="cleanFields") pair_delimiter: str | None = Field(default=None, alias="pairDelimiter") key_value_delimiter: str | None = Field(default=None, alias="keyValueDelimiter") class SidlookupConf(BaseModel): """Configuration for the sidlookup function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) fields: list[SidLookupField] | None = None class SnmpTrapSerializeConf(BaseModel): """Configuration for the snmp_trap_serialize function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) strict: bool | None = None drop_failed_events: bool | None = Field(default=None, alias="dropFailedEvents") v3_user: V3User | None = Field(default=None, alias="v3User") class SuppressConf(BaseModel): """Configuration for the suppress function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) key_expr: str = Field(..., alias="keyExpr") allow: int = 1 suppress_period_sec: int = Field(default=30, alias="suppressPeriodSec") drop_events_mode: bool | None = Field(default=None, alias="dropEventsMode") max_cache_size: int | None = Field(default=None, alias="maxCacheSize") cache_idle_timeout_periods: int | None = Field(default=None, alias="cacheIdleTimeoutPeriods") num_events_idle_timeout_trigger: int | None = Field(default=None, alias="numEventsIdleTimeoutTrigger") class TeeConf(BaseModel): """Configuration for the tee function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) command: str args: list[str] | None = None restart_on_exit: bool | None = Field(default=None, alias="restartOnExit") env: dict[str, str] | None = None class TrimTimestampConf(BaseModel): """Configuration for the trim_timestamp function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) field: str | None = None class UnrollConf(BaseModel): """Configuration for the unroll function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) src_expr: str = Field(default="_raw", alias="srcExpr") dst_field: str = Field(default="_raw", alias="dstField") class XmlUnrollConf(BaseModel): """Configuration for the xml_unroll function.""" model_config = ConfigDict(extra="allow", populate_by_name=True) unroll: str inherit: str | None = None unroll_idx_field: str | None = Field(default=None, alias="unrollIdxField") pretty: bool | None = None # ============================================================================= # Union type for all function configurations # ============================================================================= # Type alias for all possible function configurations FunctionConf = ( AggregateMetricsConf | AggregationConf | AutoTimestampConf | CefConf | ChainConf | CloneConf | CodeConf | CommentConf | DnsLookupConf | DropConf | DropDimensionsConf | DynamicSamplingConf | EvalConf | EventBreakerConf | FlattenConf | FoldkeysConf | GeoipConf | GrokConf | JsonUnrollConf | LookupConf | MaskConf | NumerifyConf | OtlpLogsConf | OtlpMetricsConf | OtlpTracesConf | PublishMetricsConf | RedisConf | RegexExtractConf | RegexFilterConf | RenameConf | RollupMetricsConf | SamplingConf | SerdeConf | SerializeConf | SidlookupConf | SnmpTrapSerializeConf | SuppressConf | TeeConf | TrimTimestampConf | UnrollConf | XmlUnrollConf | dict[str, Any] # Fallback for unknown function types ) # Mapping from function ID to configuration class FUNCTION_CONF_MAP: dict[str, type[BaseModel]] = { "aggregate_metrics": AggregateMetricsConf, "aggregation": AggregationConf, "auto_timestamp": AutoTimestampConf, "cef": CefConf, "chain": ChainConf, "clone": CloneConf, "code": CodeConf, "comment": CommentConf, "dns_lookup": DnsLookupConf, "drop": DropConf, "drop_dimensions": DropDimensionsConf, "dynamic_sampling": DynamicSamplingConf, "eval": EvalConf, "event_breaker": EventBreakerConf, "flatten": FlattenConf, "foldkeys": FoldkeysConf, "geoip": GeoipConf, "grok": GrokConf, "json_unroll": JsonUnrollConf, "lookup": LookupConf, "mask": MaskConf, "numerify": NumerifyConf, "otlp_logs": OtlpLogsConf, "otlp_metrics": OtlpMetricsConf, "otlp_traces": OtlpTracesConf, "publish_metrics": PublishMetricsConf, "redis": RedisConf, "regex_extract": RegexExtractConf, "regex_filter": RegexFilterConf, "rename": RenameConf, "rollup_metrics": RollupMetricsConf, "sampling": SamplingConf, "serde": SerdeConf, "serialize": SerializeConf, "sidlookup": SidlookupConf, "snmp_trap_serialize": SnmpTrapSerializeConf, "suppress": SuppressConf, "tee": TeeConf, "trim_timestamp": TrimTimestampConf, "unroll": UnrollConf, "xml_unroll": XmlUnrollConf, } def get_function_conf_class(function_id: str) -> type[BaseModel]: """Get the configuration class for a function ID. Args: function_id: The function type identifier (e.g., "eval", "mask"). Returns: The Pydantic model class for the function's configuration. Returns a generic BaseModel for unknown function types. """ return FUNCTION_CONF_MAP.get(function_id, DropConf) def parse_function_conf(function_id: str, conf_data: dict[str, Any]) -> BaseModel: """Parse a function configuration dict into the appropriate typed model. Args: function_id: The function type identifier. conf_data: The raw configuration dictionary from the API. Returns: A typed Pydantic model instance with the configuration data. """ conf_class = get_function_conf_class(function_id) return conf_class.model_validate(conf_data) # ============================================================================= # Pipeline Function Model # ============================================================================= class PipelineFunctionConf(BaseModel): """A pipeline function with its configuration. This model captures all fields of a pipeline function, preserving the function-specific configuration in a typed manner. """ model_config = ConfigDict(extra="allow", populate_by_name=True) id: str """Function type identifier (e.g., 'eval', 'mask', 'drop').""" conf: dict[str, Any] = Field(default_factory=dict) """Function-specific configuration. Uses dict to preserve all data.""" filter_: Annotated[str | None, pydantic.Field(alias="filter")] = "true" """Filter expression that selects events to process.""" description: str | None = None """Optional description of this function step.""" disabled: bool | None = None """If True, events will not pass through this function.""" final: bool | None = None """If True, stops results from passing to downstream functions.""" group_id: str | None = Field(default=None, alias="groupId") """Optional group ID for organizing functions.""" # ============================================================================= # Pipeline Configuration Model # ============================================================================= class PipelineGroups(BaseModel): """A pipeline function group.""" model_config = ConfigDict(extra="allow", populate_by_name=True) name: str description: str | None = None disabled: bool | None = None class PipelineConf(BaseModel): """Configuration for a pipeline.""" model_config = ConfigDict(extra="allow", populate_by_name=True) async_func_timeout: int | None = Field(default=None, alias="asyncFuncTimeout") """Time (ms) to wait for async function completion.""" output: str | None = "default" """Output destination for processed events.""" description: str | None = None """Pipeline description.""" streamtags: list[str] | None = None """Tags for filtering and grouping.""" functions: list[PipelineFunctionConf] | None = None """List of functions in this pipeline.""" groups: dict[str, PipelineGroups] | None = None """Function groups within the pipeline.""" class PipelineItem(BaseModel): """A complete pipeline item as returned by the API.""" model_config = ConfigDict(extra="allow", populate_by_name=True) id: str """Pipeline identifier.""" conf: PipelineConf """Pipeline configuration including functions.""" # ============================================================================= # Utility Functions # ============================================================================= def serialize_pipeline_item(item: PipelineItem) -> dict[str, Any]: """Serialize a pipeline item to a JSON-compatible dictionary. Args: item: The pipeline item to serialize. Returns: A dictionary representation preserving all data including function configs. """ return item.model_dump(mode="json", exclude_none=True, by_alias=True) def parse_pipeline_item(data: dict[str, Any]) -> PipelineItem: """Parse a raw pipeline dictionary into a typed PipelineItem. Args: data: Raw pipeline data from the API. Returns: A typed PipelineItem instance. """ return PipelineItem.model_validate(data) __all__ = [ "FUNCTION_CONF_MAP", "AggregateMetricsConf", "AggregationConf", "AutoTimestampConf", "CefConf", "ChainConf", "CloneConf", "CodeConf", "CommentConf", "DnsLookupConf", "DropConf", "DropDimensionsConf", "DynamicSamplingConf", "EvalConf", "EventBreakerConf", "FlattenConf", "FoldkeysConf", "FunctionConf", "GeoipConf", "GrokConf", "JsonUnrollConf", "LookupConf", "MaskConf", "NumerifyConf", "OtlpLogsConf", "OtlpMetricsConf", "OtlpTracesConf", "PipelineConf", "PipelineFunctionConf", "PipelineGroups", "PipelineItem", "PublishMetricsConf", "RedisConf", "RegexExtractConf", "RegexFilterConf", "RenameConf", "RollupMetricsConf", "SamplingConf", "SerdeConf", "SerializeConf", "SidlookupConf", "SnmpTrapSerializeConf", "SuppressConf", "TeeConf", "TrimTimestampConf", "UnrollConf", "XmlUnrollConf", "get_function_conf_class", "parse_function_conf", "parse_pipeline_item", "serialize_pipeline_item", ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/atree1023/snc-cribl-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server