Supports containerized deployment of the StreamSets MCP server using Docker with environment variable configuration
Built using Python 3.8+ as the runtime environment for the MCP server implementation
Includes Swagger API specifications for documenting the StreamSets Control Hub API endpoints
Uses YAML configuration files for MCP server registry settings and tool metadata
Click on "Install Server".
Wait a few minutes for the server to deploy. Once ready, it will show a "Started" state.
In the chat, type
@followed by the MCP server name and your instructions, e.g., "@StreamSets MCP Servershow me all running jobs and their current status"
That's it! The server will respond to your query, and you can continue using it as needed.
Here is a step-by-step guide with screenshots.
StreamSets MCP Server
A comprehensive Model Context Protocol (MCP) server that provides seamless integration with StreamSets Control Hub APIs, enabling complete data pipeline management and creation through conversational AI.
π Features
Pipeline Management (Read Operations)
Job Management: List, start, stop, and monitor job execution
Pipeline Operations: Browse, search, and analyze pipeline configurations
Connection Management: Manage data connections and integrations
Metrics & Analytics: Comprehensive performance and usage analytics
Enterprise Integration: Deployment management, security audits, and alerts
Pipeline Building (Write Operations) π
Interactive Pipeline Creation: Build pipelines through conversation
Stage Library: Access to 25+ StreamSets stages (Origins, Processors, Destinations, Executors)
Visual Flow Management: Connect stages with data and event streams
Persistent Sessions: Pipeline builders persist across conversations
Smart Validation: Automatic validation of pipeline logic and connections
π API Coverage
44 Tools covering 9 StreamSets Services:
Job Runner API (11 tools) - Job lifecycle management
Pipeline Repository API (7 tools) - Pipeline CRUD operations
Connection API (4 tools) - Data connection management
Provisioning API (5 tools) - Infrastructure and deployment
Notification API (2 tools) - Alert and notification management
Topology API (1 tool) - System topology information
Metrics APIs (7 tools) - Performance and usage analytics
Security API (1 tool) - Security audit trails
Pipeline Builder (6 tools) - Interactive pipeline creation
ποΈ Pipeline Builder Capabilities
Create Complete Data Pipelines
# 1. Initialize a new pipeline builder
sdc_create_pipeline_builder title="My ETL Pipeline" engine_type="data_collector"
# 2. Browse available stages
sdc_list_available_stages category="origins"
# 3. Add stages to your pipeline
sdc_add_pipeline_stage pipeline_id="pipeline_builder_1" stage_label="Dev Raw Data Source"
sdc_add_pipeline_stage pipeline_id="pipeline_builder_1" stage_label="Expression Evaluator"
sdc_add_pipeline_stage pipeline_id="pipeline_builder_1" stage_label="Trash"
# 4. Connect stages with data flows
sdc_connect_pipeline_stages pipeline_id="pipeline_builder_1" source_stage_id="stage_1" target_stage_id="stage_2"
sdc_connect_pipeline_stages pipeline_id="pipeline_builder_1" source_stage_id="stage_2" target_stage_id="stage_3"
# 5. Visualize your pipeline flow
sdc_get_pipeline_flow pipeline_id="pipeline_builder_1"
# 6. Build and publish (coming soon)
# sdc_build_pipeline pipeline_id="pipeline_builder_1"
# sdc_publish_pipeline pipeline_id="pipeline_builder_1"Persistent Pipeline Sessions
Cross-Conversation: Continue building pipelines across multiple conversations
Auto-Save: All changes automatically saved to disk
Session Management: List, view, and delete pipeline builder sessions
Storage Location:
~/.streamsets_mcp/pipeline_builders/
π οΈ Installation
Prerequisites
Python 3.8+
StreamSets Control Hub account with API credentials
Claude Desktop (for MCP integration)
Setup
Clone the repository
git clone https://github.com/yourusername/streamsets-mcp-server.git cd streamsets-mcp-serverInstall dependencies
pip install -r requirements.txtConfigure environment variables
export STREAMSETS_HOST_PREFIX="https://your-instance.streamsets.com" export STREAMSETS_CRED_ID="your-credential-id" export STREAMSETS_CRED_TOKEN="your-auth-token"Test the server
python streamsets_server.py
Docker Deployment
Setup for MCP Integration
# Build the image
docker build -t streamsets-mcp-server .
# Create persistent volume for pipeline builders
docker volume create streamsets-pipeline-dataManual Testing
# Test run with volume persistence
docker run --rm -it \
-e STREAMSETS_HOST_PREFIX="https://your-instance.streamsets.com" \
-e STREAMSETS_CRED_ID="your-credential-id" \
-e STREAMSETS_CRED_TOKEN="your-auth-token" \
-v streamsets-pipeline-data:/data \
streamsets-mcp-serverClaude Desktop Integration
Option 1: Direct Python (Local Development)
{
"mcpServers": {
"streamsets": {
"command": "python",
"args": ["/path/to/streamsets_server.py"],
"env": {
"STREAMSETS_HOST_PREFIX": "https://your-instance.streamsets.com",
"STREAMSETS_CRED_ID": "your-credential-id",
"STREAMSETS_CRED_TOKEN": "your-auth-token"
}
}
}
}Option 2: Docker with Persistence (Production)
{
"mcpServers": {
"streamsets": {
"command": "docker",
"args": [
"run", "--rm", "-i",
"-v", "streamsets-pipeline-data:/data",
"-e", "STREAMSETS_HOST_PREFIX=https://your-instance.streamsets.com",
"-e", "STREAMSETS_CRED_ID=your-credential-id",
"-e", "STREAMSETS_CRED_TOKEN=your-auth-token",
"streamsets-mcp-server"
]
}
}
}π Usage Examples
Job Management
# List all jobs
sdc_list_jobs organization="your-org" status="ACTIVE"
# Get detailed job information
sdc_get_job_details job_id="your-job-id"
# Start/stop jobs
sdc_start_job job_id="your-job-id"
sdc_stop_job job_id="your-job-id"
# Bulk operations
sdc_start_multiple_jobs job_ids="job1,job2,job3"Pipeline Operations
# Search pipelines
sdc_search_pipelines search_query="name==ETL*"
# Get pipeline details
sdc_get_pipeline_details pipeline_id="your-pipeline-id"
# Export/import pipelines
sdc_export_pipelines commit_ids="commit1,commit2"Metrics & Analytics
# Job performance metrics
sdc_get_job_metrics job_id="your-job-id"
# System health overview
sdc_get_job_count_by_status
# Executor infrastructure metrics
sdc_get_executor_metrics executor_type="COLLECTOR" label="prod"
# Security audit trails
sdc_get_security_audit_metrics org_id="your-org" audit_type="login"π§ Configuration
Environment Variables
Required (StreamSets Authentication)
STREAMSETS_HOST_PREFIX- StreamSets Control Hub URLSTREAMSETS_CRED_ID- API Credential IDSTREAMSETS_CRED_TOKEN- Authentication Token
Optional (Pipeline Builder Persistence)
PIPELINE_STORAGE_PATH- Custom storage directory for pipeline builders
Pipeline Builder Storage
Pipeline builders are automatically persisted across conversations and container restarts:
Storage Locations (Priority Order)
Custom Path:
PIPELINE_STORAGE_PATHenvironment variableDocker Volume:
/data/pipeline_builders(when running in Docker)Default Path:
~/.streamsets_mcp/pipeline_builders/
Configuration Options
Format: Pickle files for session persistence
Management: Automatic file management with error handling
Fallback: Memory-only mode if no writable storage available
Docker Persistence
When using Docker, pipeline builders persist in named volumes:
# Data persists in Docker volume 'streamsets-pipeline-data'
docker volume create streamsets-pipeline-data
# Run with persistent volume
docker run --rm -it -v streamsets-pipeline-data:/data streamsets-mcp-serverTroubleshooting
No Persistence: Check storage directory permissions
Docker Issues: Ensure volume mounts are configured correctly
Memory Mode: Server logs will indicate if persistence is disabled
π Documentation
API Reference: See
CLAUDE.mdfor detailed tool documentationStage Library: Built-in documentation for 25+ StreamSets stages
Configuration:
custom.yamlfor MCP server registrySwagger Specs: API specifications in
/swagger/directory
π§ͺ Development
Project Structure
streamsets-mcp-server/
βββ streamsets_server.py # Main MCP server implementation
βββ custom.yaml # MCP server configuration
βββ CLAUDE.md # Comprehensive documentation
βββ requirements.txt # Python dependencies
βββ Dockerfile # Container deployment
βββ swagger/ # API specifications
βββ README.md # This fileAdding New Tools
Define tool function with
@mcp.tool()decoratorAdd comprehensive error handling and logging
Update
custom.yamlwith tool metadataDocument in
CLAUDE.md
Testing
# Syntax validation
python -m py_compile streamsets_server.py
# Tool count verification
grep -c "@mcp.tool()" streamsets_server.pyπ€ Contributing
Fork the repository
Create a feature branch (
git checkout -b feature/amazing-feature)Commit your changes (
git commit -m 'Add amazing feature')Push to the branch (
git push origin feature/amazing-feature)Open a Pull Request
π License
This project is licensed under the MIT License - see the LICENSE file for details.
π Acknowledgments
StreamSets for the comprehensive Control Hub APIs
Anthropic for the Model Context Protocol framework
FastMCP for the Python MCP server implementation
π§ Support
For issues and questions:
Create an issue on GitHub
Check the documentation in
CLAUDE.mdReview the API specifications in
/swagger/
Transform your data pipeline workflows with conversational AI! π