Skip to main content
Glama

StreamSets MCP Server

A comprehensive Model Context Protocol (MCP) server that provides seamless integration with StreamSets Control Hub APIs, enabling complete data pipeline management and creation through conversational AI.

๐Ÿš€ Features

Pipeline Management (Read Operations)

  • Job Management: List, start, stop, and monitor job execution

  • Pipeline Operations: Browse, search, and analyze pipeline configurations

  • Connection Management: Manage data connections and integrations

  • Metrics & Analytics: Comprehensive performance and usage analytics

  • Enterprise Integration: Deployment management, security audits, and alerts

Pipeline Building (Write Operations) ๐Ÿ†•

  • Interactive Pipeline Creation: Build pipelines through conversation

  • Stage Library: Access to 25+ StreamSets stages (Origins, Processors, Destinations, Executors)

  • Visual Flow Management: Connect stages with data and event streams

  • Persistent Sessions: Pipeline builders persist across conversations

  • Smart Validation: Automatic validation of pipeline logic and connections

๐Ÿ“Š API Coverage

44 Tools covering 9 StreamSets Services:

  • Job Runner API (11 tools) - Job lifecycle management

  • Pipeline Repository API (7 tools) - Pipeline CRUD operations

  • Connection API (4 tools) - Data connection management

  • Provisioning API (5 tools) - Infrastructure and deployment

  • Notification API (2 tools) - Alert and notification management

  • Topology API (1 tool) - System topology information

  • Metrics APIs (7 tools) - Performance and usage analytics

  • Security API (1 tool) - Security audit trails

  • Pipeline Builder (6 tools) - Interactive pipeline creation

๐Ÿ—๏ธ Pipeline Builder Capabilities

Create Complete Data Pipelines

# 1. Initialize a new pipeline builder sdc_create_pipeline_builder title="My ETL Pipeline" engine_type="data_collector" # 2. Browse available stages sdc_list_available_stages category="origins" # 3. Add stages to your pipeline sdc_add_pipeline_stage pipeline_id="pipeline_builder_1" stage_label="Dev Raw Data Source" sdc_add_pipeline_stage pipeline_id="pipeline_builder_1" stage_label="Expression Evaluator" sdc_add_pipeline_stage pipeline_id="pipeline_builder_1" stage_label="Trash" # 4. Connect stages with data flows sdc_connect_pipeline_stages pipeline_id="pipeline_builder_1" source_stage_id="stage_1" target_stage_id="stage_2" sdc_connect_pipeline_stages pipeline_id="pipeline_builder_1" source_stage_id="stage_2" target_stage_id="stage_3" # 5. Visualize your pipeline flow sdc_get_pipeline_flow pipeline_id="pipeline_builder_1" # 6. Build and publish (coming soon) # sdc_build_pipeline pipeline_id="pipeline_builder_1" # sdc_publish_pipeline pipeline_id="pipeline_builder_1"

Persistent Pipeline Sessions

  • Cross-Conversation: Continue building pipelines across multiple conversations

  • Auto-Save: All changes automatically saved to disk

  • Session Management: List, view, and delete pipeline builder sessions

  • Storage Location: ~/.streamsets_mcp/pipeline_builders/

๐Ÿ› ๏ธ Installation

Prerequisites

  • Python 3.8+

  • StreamSets Control Hub account with API credentials

  • Claude Desktop (for MCP integration)

Setup

  1. Clone the repository

    git clone https://github.com/yourusername/streamsets-mcp-server.git cd streamsets-mcp-server
  2. Install dependencies

    pip install -r requirements.txt
  3. Configure environment variables

    export STREAMSETS_HOST_PREFIX="https://your-instance.streamsets.com" export STREAMSETS_CRED_ID="your-credential-id" export STREAMSETS_CRED_TOKEN="your-auth-token"
  4. Test the server

    python streamsets_server.py

Docker Deployment

Setup for MCP Integration

# Build the image docker build -t streamsets-mcp-server . # Create persistent volume for pipeline builders docker volume create streamsets-pipeline-data

Manual Testing

# Test run with volume persistence docker run --rm -it \ -e STREAMSETS_HOST_PREFIX="https://your-instance.streamsets.com" \ -e STREAMSETS_CRED_ID="your-credential-id" \ -e STREAMSETS_CRED_TOKEN="your-auth-token" \ -v streamsets-pipeline-data:/data \ streamsets-mcp-server

Claude Desktop Integration

Option 1: Direct Python (Local Development)

{ "mcpServers": { "streamsets": { "command": "python", "args": ["/path/to/streamsets_server.py"], "env": { "STREAMSETS_HOST_PREFIX": "https://your-instance.streamsets.com", "STREAMSETS_CRED_ID": "your-credential-id", "STREAMSETS_CRED_TOKEN": "your-auth-token" } } } }

Option 2: Docker with Persistence (Production)

{ "mcpServers": { "streamsets": { "command": "docker", "args": [ "run", "--rm", "-i", "-v", "streamsets-pipeline-data:/data", "-e", "STREAMSETS_HOST_PREFIX=https://your-instance.streamsets.com", "-e", "STREAMSETS_CRED_ID=your-credential-id", "-e", "STREAMSETS_CRED_TOKEN=your-auth-token", "streamsets-mcp-server" ] } } }

๐Ÿ“– Usage Examples

Job Management

# List all jobs sdc_list_jobs organization="your-org" status="ACTIVE" # Get detailed job information sdc_get_job_details job_id="your-job-id" # Start/stop jobs sdc_start_job job_id="your-job-id" sdc_stop_job job_id="your-job-id" # Bulk operations sdc_start_multiple_jobs job_ids="job1,job2,job3"

Pipeline Operations

# Search pipelines sdc_search_pipelines search_query="name==ETL*" # Get pipeline details sdc_get_pipeline_details pipeline_id="your-pipeline-id" # Export/import pipelines sdc_export_pipelines commit_ids="commit1,commit2"

Metrics & Analytics

# Job performance metrics sdc_get_job_metrics job_id="your-job-id" # System health overview sdc_get_job_count_by_status # Executor infrastructure metrics sdc_get_executor_metrics executor_type="COLLECTOR" label="prod" # Security audit trails sdc_get_security_audit_metrics org_id="your-org" audit_type="login"

๐Ÿ”ง Configuration

Environment Variables

Required (StreamSets Authentication)

  • STREAMSETS_HOST_PREFIX - StreamSets Control Hub URL

  • STREAMSETS_CRED_ID - API Credential ID

  • STREAMSETS_CRED_TOKEN - Authentication Token

Optional (Pipeline Builder Persistence)

  • PIPELINE_STORAGE_PATH - Custom storage directory for pipeline builders

Pipeline Builder Storage

Pipeline builders are automatically persisted across conversations and container restarts:

Storage Locations (Priority Order)

  1. Custom Path: PIPELINE_STORAGE_PATH environment variable

  2. Docker Volume: /data/pipeline_builders (when running in Docker)

  3. Default Path: ~/.streamsets_mcp/pipeline_builders/

Configuration Options

  • Format: Pickle files for session persistence

  • Management: Automatic file management with error handling

  • Fallback: Memory-only mode if no writable storage available

Docker Persistence

When using Docker, pipeline builders persist in named volumes:

# Data persists in Docker volume 'streamsets-pipeline-data' docker volume create streamsets-pipeline-data # Run with persistent volume docker run --rm -it -v streamsets-pipeline-data:/data streamsets-mcp-server

Troubleshooting

  • No Persistence: Check storage directory permissions

  • Docker Issues: Ensure volume mounts are configured correctly

  • Memory Mode: Server logs will indicate if persistence is disabled

๐Ÿ“š Documentation

  • API Reference: See CLAUDE.md for detailed tool documentation

  • Stage Library: Built-in documentation for 25+ StreamSets stages

  • Configuration: custom.yaml for MCP server registry

  • Swagger Specs: API specifications in /swagger/ directory

๐Ÿงช Development

Project Structure

streamsets-mcp-server/ โ”œโ”€โ”€ streamsets_server.py # Main MCP server implementation โ”œโ”€โ”€ custom.yaml # MCP server configuration โ”œโ”€โ”€ CLAUDE.md # Comprehensive documentation โ”œโ”€โ”€ requirements.txt # Python dependencies โ”œโ”€โ”€ Dockerfile # Container deployment โ”œโ”€โ”€ swagger/ # API specifications โ””โ”€โ”€ README.md # This file

Adding New Tools

  1. Define tool function with @mcp.tool() decorator

  2. Add comprehensive error handling and logging

  3. Update custom.yaml with tool metadata

  4. Document in CLAUDE.md

Testing

# Syntax validation python -m py_compile streamsets_server.py # Tool count verification grep -c "@mcp.tool()" streamsets_server.py

๐Ÿค Contributing

  1. Fork the repository

  2. Create a feature branch (git checkout -b feature/amazing-feature)

  3. Commit your changes (git commit -m 'Add amazing feature')

  4. Push to the branch (git push origin feature/amazing-feature)

  5. Open a Pull Request

๐Ÿ“ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ™ Acknowledgments

  • StreamSets for the comprehensive Control Hub APIs

  • Anthropic for the Model Context Protocol framework

  • FastMCP for the Python MCP server implementation

๐Ÿ“ง Support

For issues and questions:

  • Create an issue on GitHub

  • Check the documentation in CLAUDE.md

  • Review the API specifications in /swagger/


Transform your data pipeline workflows with conversational AI! ๐Ÿš€

-
security - not tested
F
license - not found
-
quality - not tested

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bracta/streamsets-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server