Skip to main content
Glama
example_edge.py6.14 kB
"""Cribl Edge Configuration Example This example demonstrates how to programmatically create and configure a complete data pipeline in Cribl Edge using the Control Plane SDK. This example creates: 1. A Fleet to manage the configuration. 2. A Syslog Source to receive data on port 9021. 3. An Amazon S3 Destination to store processed data. 4. A Pipeline that filters events and keeps only data in the "eventSource" and "eventID" fields. 5. A Route that connects the Source to the Pipeline and Destination. This example also deploys the configuration to the Fleet to make it active. Data flow: Syslog Source → Route → Pipeline → Amazon S3 Destination This example includes error handling and checks for existing resources. Prerequisites: - An .env file that is configured with your credentials. - Your AWS S3 values for AWS_API_KEY, AWS_SECRET_KEY, AWS_BUCKET_NAME, and AWS_REGION. - An Enterprise License on the server. """ import asyncio from auth import base_url, create_cribl_client from cribl_control_plane.models import ( Conf, ConfigGroup, FunctionSpecificConfigs, InputSyslogSyslog2, InputSyslogTLSSettingsServerSide2, InputSyslogType2, OutputS3, OutputS3Compression, OutputS3CompressionLevel, OutputS3Type, Pipeline, PipelineFunctionConf, ProductsCore, RoutesRoute, ) FLEET_ID = "my-fleet" # Syslog Source configuration SYSLOG_PORT = 9021 # Amazon S3 Destination configuration: Replace the placeholder values AWS_API_KEY = "" # Replace with your AWS Access Key ID AWS_SECRET_KEY = "" # Replace with your AWS Secret Access Key AWS_BUCKET_NAME = "your-aws-bucket-name" # Replace with your S3 bucket name AWS_REGION = "us-east-2" # Replace with your S3 bucket region # Fleet configuration my_fleet = ConfigGroup( on_prem=True, worker_remote_access=True, is_fleet=True, is_search=False, id=FLEET_ID ) # Syslog Source configuration syslog_source = InputSyslogSyslog2( id="my-syslog-source", type=InputSyslogType2.SYSLOG, tcp_port=SYSLOG_PORT, tls=InputSyslogTLSSettingsServerSide2(disabled=True) ) # Amazon S3 Destination configuration s3_destination = OutputS3( id="my-s3-destination", type=OutputS3Type.S3, bucket=AWS_BUCKET_NAME, region=AWS_REGION, aws_secret_key=AWS_SECRET_KEY, aws_api_key=AWS_API_KEY, compress=OutputS3Compression.GZIP, compression_level=OutputS3CompressionLevel.BEST_SPEED, empty_dir_cleanup_sec=300 ) # Pipeline configuration: filter events and keep only data in the "eventSource" and "eventID" fields pipeline = Pipeline( id="my-pipeline", conf=Conf( async_func_timeout=1000, functions=[ PipelineFunctionConf( filter_="true", conf=FunctionSpecificConfigs.model_validate({ # type: ignore "remove": ["*"], "keep": ["eventSource", "eventID"] }), id="eval", final=True ) ] ) ) # Route configuration: route data from the Source to the Pipeline and Destination route = RoutesRoute( final=False, id="my-route", name="my-route", pipeline=pipeline.id, output=s3_destination.id, filter_=f"__inputId=='{syslog_source.id}'", description="This is my new Route" ) group_url = f"{base_url}/m/{my_fleet.id}" async def main(): # Initialize Cribl client cribl = await create_cribl_client() # Verify that Fleet doesn't already exist fleet_response = cribl.groups.get( id=my_fleet.id, product=ProductsCore.EDGE ) if fleet_response.items and len(fleet_response.items) > 0: print(f"⚠️ Fleet already exists: {my_fleet.id}. Try a different Fleet ID.") return # Create Fleet cribl.groups.create( product=ProductsCore.EDGE, id=my_fleet.id, on_prem=my_fleet.on_prem, worker_remote_access=my_fleet.worker_remote_access, is_fleet=my_fleet.is_fleet, is_search=my_fleet.is_search ) print(f"✅ Fleet created: {my_fleet.id}") # Create Syslog Source cribl.sources.create( request=syslog_source, server_url=group_url ) print(f"✅ Syslog source created: {syslog_source.id}") # Create Amazon S3 Destination cribl.destinations.create( request=s3_destination, server_url=group_url ) print(f"✅ Amazon S3 Destination created: {s3_destination.id}") # Create Pipeline cribl.pipelines.create( id=pipeline.id, conf=pipeline.conf, server_url=group_url ) print(f"✅ Pipeline created: {pipeline.id}") # Add Route to Routing table routes_list_response = cribl.routes.list( server_url=group_url ) if not routes_list_response.items or len(routes_list_response.items) == 0: raise Exception("No Routes found") routes = routes_list_response.items[0] if not routes or not routes.id: raise Exception("No Routes found") routes.routes = [route] + (routes.routes or []) cribl.routes.update( id_param=routes.id, id=routes.id, routes=routes.routes, server_url=group_url ) print(f"✅ Route added: {route.id}") # Commit configuration changes commit_response = cribl.versions.commits.create( group_id=my_fleet.id, message="Commit for Edge example", effective=True, files=["."] ) if not commit_response.items or len(commit_response.items) == 0: raise Exception("Failed to commit configuration changes") version = commit_response.items[0].commit print(f"✅ Committed configuration changes to the fleet: {my_fleet.id}, commit ID: {version}") # Deploy configuration changes cribl.groups.deploy( product=ProductsCore.EDGE, id=my_fleet.id, version=version ) print(f"✅ Fleet changes deployed: {my_fleet.id}") if __name__ == "__main__": try: asyncio.run(main()) except Exception as error: print(f"❌ Something went wrong: {error}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/atree1023/snc-cribl-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server