Skip to main content
Glama

SSB MCP Server

by BrooksIan
README.md38.4 kB
# SSB MCP Server ![SSB Home Dashboard](images/SSB_Home_Branded.jpg) *The main SSB interface showing the home dashboard with available streams and jobs, featuring the SSB MCP Server integration with prominent branding.* Model Context Protocol server providing comprehensive access to SQL Stream Builder (SSB) with support for both direct SSB access and Apache Knox integration. **Works with both standalone SSB deployments and Cloudera Data Platform (CDP) SSB deployments** - provides full access to SSB functionality through Claude Desktop. ## Features - **Multiple Authentication Methods**: - **Direct SSB Authentication**: Basic auth for standalone SSB deployments - **Knox Integration**: Bearer tokens, cookies, and passcode tokens for CDP deployments - **Read-only by default** - Safe exploration of SSB streams and configuration - **Comprehensive SSB API coverage** with **80+ MCP tools** for complete SSB management: - **Advanced Job Management**: Event history, state management, job copying, data source cloning - **Monitoring & Diagnostics**: System health, performance counters, SQL analysis - **Enhanced Table Management**: Detailed tables, hierarchical structure, validation, creation - **Connector & Format Management**: Data formats, connector details, JAR information - **User & Project Management**: Settings, projects, user information, project creation - **API Key Management**: Key lifecycle management, creation, deletion, details - **Environment Management**: Environment switching, configuration, creation - **Sync & Configuration**: Project export/import, sync management, validation - **UDF Management**: UDF lifecycle, execution, artifacts, custom functions - **Stream Management**: List, create, update, delete, start, stop SQL streams - **Query Execution**: Execute SQL queries and get real-time results with sampling - **Sample Data Access**: Retrieve streaming data samples from running jobs - **Job Management**: Monitor job status, get job details, and manage job lifecycle - **Schema Discovery**: Explore table schemas and available tables - **Function Management**: List and inspect user-defined functions - **Connector Management**: Explore available connectors - **Kafka Integration**: List and inspect Kafka topics - **Cluster Monitoring**: Get cluster info and health status - **Performance Metrics**: Monitor stream performance and metrics ## Quick Start ### For Standalone SSB Deployments (Docker Compose) 1. **Start SSB services:** ```bash git clone https://github.com/your-org/ssb-mcp-server.git cd ssb-mcp-server docker-compose up -d ``` 2. **Configure Claude Desktop** - Edit `~/Library/Application Support/Claude/claude_desktop_config.json`: ```json { "mcpServers": { "ssb-mcp-server": { "command": "/FULL/PATH/TO/SSB-MCP-Server/run_mcp_server.sh", "args": [], "cwd": "/FULL/PATH/TO/SSB-MCP-Server" } } } ``` 3. **Restart Claude Desktop** and start interacting with your SSB streams! ### For CDP SSB deployments Your SSB API base URL will typically be: ``` https://<your-ssb-host>/ssb/api/v1 ``` Get your Knox JWT token from the CDP UI and use it with the configurations below. ![Knox Token Generation](images/KnoxToken_BaseURL.png) ## Setup ### Option 1: Claude Desktop (Local) 1. **Clone and install:** ```bash git clone https://github.com/your-org/ssb-mcp-server.git cd ssb-mcp-server python3 -m venv .venv source .venv/bin/activate pip install -e . ``` 2. **Configure Claude Desktop** - Edit `~/Library/Application Support/Claude/claude_desktop_config.json`: ```json { "mcpServers": { "ssb-mcp-server": { "command": "/FULL/PATH/TO/SSB-MCP-Server/.venv/bin/python", "args": [ "-m", "ssb_mcp_server.server" ], "env": { "MCP_TRANSPORT": "stdio", "SSB_API_BASE": "https://ssb-gateway.yourshere.cloudera.site/ssb/api/v1", "KNOX_TOKEN": "<your_knox_bearer_token>", "SSB_READONLY": "true" } } } } ``` 3. **Restart Claude Desktop** and start asking questions about your SSB streams! ### Option 2: Cloudera AI Agent Studio For use with Cloudera AI Agent Studio, configure using environment variables: ```json { "mcpServers": { "ssb-mcp-server": { "command": "uvx", "args": [ "--from", "git+https://github.com/your-org/ssb-mcp-server@main", "run-server" ], "env": { "MCP_TRANSPORT": "stdio", "SSB_BASE_URL": "https://your-ssb-host:443", "SSB_TENANT": "your-tenant-name", "KNOX_TOKEN": "your_jwt_token_here", "MVE_USERNAME": "your_mve_username", "MVE_PASSWORD": "your_mve_password", "SSB_READONLY": "false" } } } } ``` **Configuration Template**: Use `config/cloudera_ai_agent_studio_template.json` as a starting point. **Environment Variables for Cloudera AI Agent Studio**: ```bash export SSB_BASE_URL="https://your-ssb-host:443" export SSB_TENANT="your-tenant-name" export KNOX_TOKEN="your_jwt_token_here" export MVE_USERNAME="your_mve_username" export MVE_PASSWORD="your_mve_password" export SSB_READONLY="false" ``` ## Configuration Options Configuration can be done via environment variables or JSON configuration files. Environment variables take precedence over JSON configuration. ### JSON Configuration File The MCP server can load configuration from `config/cloud_ssb_config.json`: ```json { "cloud_ssb": { "base_url": "https://your-ssb-host:443", "tenant": "your-tenant-name", "jwt_token": "your_jwt_token_here", "ssb_readonly": false, "knox_verify_ssl": true, "http_timeout_seconds": 60, "http_max_retries": 3, "http_rate_limit_rps": 5, "mve_username": "your_mve_username", "mve_password": "your_mve_password" } } ``` ### Environment Variables All configuration can also be done via environment variables: ### Direct SSB Authentication (Standalone) | Variable | Required | Description | |----------|----------|-------------| | `SSB_API_BASE` | Yes | Full SSB API URL (e.g., `http://localhost:18121`) | | `SSB_USER` | Yes | SSB username (e.g., `admin`) | | `SSB_PASSWORD` | Yes | SSB password (e.g., `admin`) | | `SSB_READONLY` | No | Read-only mode (default: `false`) | | `TIMEOUT_SECONDS` | No | HTTP timeout in seconds (default: `30`) | ### Knox Authentication (CDP) | Variable | Required | Description | |----------|----------|-------------| | `SSB_BASE_URL` | Yes* | Base URL for SSB host (e.g., `https://your-host:443`) | | `SSB_TENANT` | Yes* | Tenant name (e.g., `your-tenant-name`) | | `KNOX_TOKEN` | Yes* | Knox JWT token for authentication | | `KNOX_COOKIE` | No | Alternative: provide full cookie string instead of token | | `KNOX_PASSCODE_TOKEN` | No | Alternative: Knox passcode token (auto-exchanged for JWT) | | `KNOX_USER` | No | Knox username for basic auth | | `KNOX_PASSWORD` | No | Knox password for basic auth | | `KNOX_VERIFY_SSL` | No | Verify SSL certificates (default: `true`) | | `KNOX_CA_BUNDLE` | No | Path to CA certificate bundle | | `SSB_READONLY` | No | Read-only mode (default: `true`) | | `TIMEOUT_SECONDS` | No | HTTP timeout in seconds (default: `30`) | | `MVE_USERNAME` | No | MVE API username for Basic authentication | | `MVE_PASSWORD` | No | MVE API password for Basic authentication | ### Legacy Configuration (Still Supported) | Variable | Required | Description | |----------|----------|-------------| | `KNOX_GATEWAY_URL` | No | Legacy: Full Knox gateway URL | | `SSB_API_BASE` | No | Legacy: Full SSB API URL | | `MVE_API_BASE` | No | Legacy: Full MVE API URL | \* Either `SSB_API_BASE` (for direct) or `KNOX_GATEWAY_URL` (for Knox) is required ## Example Functionality The SSB MCP Server provides comprehensive access to SQL Stream Builder through Claude Desktop. Here are some visual examples of the functionality: **Standard Brand Box Versions:** - ![SSB Home Branded](images/SSB_Home.png) - Top positioned with yellow text ## Available Tools ### 🔧 Advanced Job Management - `get_job_events(job_id)` - Get detailed job event history and timeline - `get_job_state(job_id)` - Get comprehensive job state information - `get_job_mv_endpoints(job_id)` - Get materialized view endpoints for a job - `create_job_mv_endpoint(job_id, mv_config)` - Create or update materialized view endpoint - `copy_job(job_id)` - Duplicate an existing job - `copy_data_source(data_source_id)` - Clone a data source ### 📊 Monitoring & Diagnostics - `get_diagnostic_counters()` - Get system performance counters and diagnostics - `get_heartbeat()` - Check system health and connectivity - `analyze_sql(sql_query)` - Analyze SQL query without execution (syntax, performance) ### 🗂️ Enhanced Table Management - `list_tables_detailed()` - Get comprehensive table information - `get_table_tree()` - Get hierarchical table structure organized by catalog - `validate_data_source(data_source_config)` - Validate data source configuration - `create_table_detailed(table_config)` - Create table with full configuration - `get_table_details(table_id)` - Get detailed information about a specific table ### 🔌 Connector & Format Management - `list_data_formats()` - List all available data formats - `get_data_format_details(format_id)` - Get detailed information about a specific data format - `create_data_format(format_config)` - Create a new data format - `get_connector_jar(connector_type)` - Get connector JAR information - `get_connector_type_details(connector_type)` - Get detailed connector type information - `get_connector_details(connector_id)` - Get detailed connector information ### 👤 User & Project Management - `get_user_settings()` - Get user preferences and settings - `update_user_settings(settings)` - Update user configuration - `list_projects()` - List available projects - `get_project_details(project_id)` - Get project information - `create_project(project_config)` - Create a new project - `get_user_info()` - Get current user information ### 🔑 API Key Management - `list_api_keys()` - List user API keys - `create_api_key(key_config)` - Create new API key - `delete_api_key(key_id)` - Delete API key - `get_api_key_details(key_id)` - Get API key information ### 🌍 Environment Management - `list_environments()` - List available environments - `activate_environment(env_id)` - Activate/switch to an environment - `get_environment_details(env_id)` - Get environment configuration - `create_environment(env_config)` - Create new environment - `deactivate_environment()` - Deactivate current environment ### 🔄 Sync & Configuration - `get_sync_config()` - Get sync configuration - `update_sync_config(config)` - Update sync configuration - `delete_sync_config()` - Delete sync configuration - `validate_sync_config(project)` - Validate sync configuration for a project - `export_project(project)` - Export project configuration - `import_project(project, config)` - Import project configuration ### 📈 UDF Management - `list_udfs_detailed()` - Get comprehensive UDF information - `run_udf(udf_id, parameters)` - Execute UDF function - `get_udf_artifacts()` - Get UDF artifacts and dependencies - `create_udf(udf_config)` - Create custom UDF - `update_udf(udf_id, udf_config)` - Update UDF configuration - `get_udf_details(udf_id)` - Get detailed UDF information - `get_udf_artifact_details(artifact_id)` - Get UDF artifact details - `get_udf_artifact_by_type(artifact_type)` - Get UDF artifacts by type ### Stream Management - `list_streams()` - List all SQL streams (jobs) - `get_stream(stream_name)` - Get details of a specific stream - `create_stream(stream_name, sql_query, description?)` - Create new stream (write mode) - `update_stream(stream_name, sql_query, description?)` - Update an existing stream - `delete_stream(stream_name)` - Delete a stream - `start_stream(stream_name)` - Start a stream - `stop_stream(stream_name)` - Stop a stream ### Query Execution & Sample Data - `execute_query(sql_query, limit?)` - Execute SQL query and create SSB job - `execute_query_with_sampling(sql_query, sample_interval, sample_count, window_size, sample_all_messages)` - Execute query with custom sampling - `get_job_status(job_id)` - Get status of a specific SSB job - `get_job_sample(sample_id)` - Get sample data from a job execution - `get_job_sample_by_id(job_id)` - Get sample data from a job by job ID - `list_jobs_with_samples()` - List all jobs with their sample information ### Job Management & Control - `stop_job(job_id, savepoint)` - Stop a specific SSB job - `execute_job(job_id, sql_query)` - Execute/restart a job with new SQL - `restart_job_with_sampling(job_id, sql_query, sample_interval, sample_all_messages)` - Restart job with sampling options - `configure_sampling(sample_id, sample_interval, sample_count, window_size, sample_all_messages)` - Configure sampling parameters ### Data Sources & Schema - `list_tables()` - List all available tables (data sources) - `get_table_schema(table_name)` - Get table schema information - `create_kafka_table(table_name, topic, kafka_connector_type, bootstrap_servers, format_type, scan_startup_mode, additional_properties?)` - Create new table with local-kafka enforcement - `register_kafka_table(table_name, topic, schema_fields?, use_ssb_prefix?, catalog?, database?)` - Register Kafka table in Flink catalog (makes it queryable) - `validate_kafka_connector(kafka_connector_type)` - Validate that a connector type is local-kafka ### Functions and Connectors - `list_udfs()` - List user-defined functions - `get_udf(udf_name)` - Get UDF details - `list_connectors()` - List available connectors - `get_connector(connector_name)` - Get connector details ### Kafka Integration - `list_topics()` - List Kafka topics - `get_topic(topic_name)` - Get topic details ### Cluster Management - `get_cluster_info()` - Get cluster information - `get_cluster_health()` - Get cluster health status - `get_ssb_info()` - Get SSB version and system info ## Example Usage Once configured, you can ask Claude questions like: ### Basic Information - "What version of SSB am I running?" - "List all my SQL streams" - "What tables are available for querying?" - "What connectors are available?" - "List all Kafka topics" - "How is the cluster health?" ![Claude Show Tables](images/ClaudeShowTables.png) *Claude can show all available tables in the SSB environment, including both built-in and custom tables.* ### Query Execution & Data Access - "Execute this query: SELECT * FROM NVDA" - "Create a job with sample all messages: SELECT * FROM NVDA" - "Show me the status of job 1234" ![Claude List Jobs](images/ClaudeListJobs.png) *Claude can list all running SSB jobs, showing their status, creation time, and details.* - "Get sample data from job 1234" - "List all jobs with their sample information" - "Show me the latest data from the ReadNVDA job" ### Job Management & Control - "Stop job 1234" - "Restart job 1234 with sample all messages" - "Restart job 1234 with custom sampling (500ms interval)" - "Configure sampling for job 1234 to sample all messages" ![Claude Created Job](images/ClaudeCreatedJob.png) *Claude can create new SSB jobs by executing SQL queries, with full job management capabilities.* ### Stream Management - "Create a new stream called 'sales_analysis' with this SQL: SELECT * FROM sales WHERE amount > 1000" - "Show me the details of the 'user_events' stream" - "What's the status of my 'sales_stream'?" ### Kafka Table Management - "Create a local Kafka table called 'user_events' from topic 'user-events'" - "Register a Kafka table in the Flink catalog to make it queryable" - "Create a local Kafka table with JSON format" ![Claude Created Table](images/ClaudeCreatedTable.png) *Claude can create new virtual tables connected to Kafka topics, with proper schema and connector configuration.* - "Validate if 'local-kafka' is a valid connector type" - "Create a virtual table for real-time data streaming" ### Advanced Job Management - "Show me the event history for job 1234" - "Get the detailed state of job 1234" - "Copy job 1234 to create a new job" - "Clone the data source for table 'user_events'" - "Get materialized view endpoints for job 1234" - "Create a materialized view endpoint for job 1234" ### Materialized Views - "Get materialized view endpoints for job 1234" - "Create a materialized view endpoint for job 1234" **⚠️ Important Limitation**: Materialized Views (MVs) must be created through the SSB UI interface. The MCP server can retrieve data from existing materialized views but cannot create new ones programmatically. To create a materialized view: 1. Navigate to your job in the SSB UI 2. Go to the Materialized Views section 3. Configure and create the MV through the UI 4. Use the MCP server to query the created MV data ## ⚠️ Known Limitations ### Job Restart Capabilities **Limited Restart Functionality**: The SSB API provides limited job restart capabilities: **✅ What Works:** - `stop_job(job_id, savepoint=True)` - Stops jobs with savepoints - `restart_job_with_sampling(job_id, sql_query, ...)` - Creates new jobs with updated SQL - `PUT /jobs/{id}` - Updates job configuration **❌ What Doesn't Work:** - Direct job restart via dedicated restart endpoints - `start_stream(stream_name)` - Stream start endpoints return 404 - `stop_stream(stream_name)` - Stream stop endpoints return 404 - `execute_job(job_id, sql_query)` - Database connection issues **Recommended Restart Strategy:** ```python # For restarting a job with new SQL def restart_siem_job(job_id, new_sql): try: # Stop the existing job client.stop_job(job_id, savepoint=True) # Create new job with updated SQL result = client.restart_job_with_sampling(job_id, new_sql) return result except Exception as e: print(f"Restart failed: {e}") ``` ### Virtual Tables and System Catalogs **Limited Table Discovery**: The SSB environment has limited traditional table support: **✅ What's Available:** - **System Catalogs**: 2 catalogs (`default_catalog`, `ssb`) - **Functions**: 206 built-in functions for data processing - **Materialized Views**: Accessible via MVE API (not SQL) **❌ What's Not Available:** - Traditional database tables (SHOW TABLES returns empty) - Standard information_schema queries - Direct SQL access to materialized views - System tables for metadata discovery **Data Access Methods:** - **Direct SQL**: Limited to functions and basic queries - **MVE API**: For materialized view data (requires Basic Auth) - **Job Execution**: For creating and managing data streams ### Authentication and Configuration **Configuration Loading**: The MCP server now supports both JSON configuration files and environment variables: **✅ JSON Configuration**: Update `config/cloud_ssb_config.json` and restart the MCP server **✅ Environment Variables**: Set environment variables (takes precedence over JSON) **✅ MVE API Credentials**: Configure `mve_username` and `mve_password` in JSON or environment variables **⚠️ Important**: For Cloudera AI Agent Studio, use environment variables: ```bash export KNOX_GATEWAY_URL="your_gateway_url" export KNOX_TOKEN="your_jwt_token" export SSB_API_BASE="your_api_base" export MVE_USERNAME="your_mve_username" export MVE_PASSWORD="your_mve_password" ``` **Token Expiration**: JWT tokens expire and require manual refresh. The MCP server doesn't automatically refresh tokens. ### Cloud Environment Limitations **Endpoint Availability**: Some endpoints may not be available in cloud environments: **❌ Cloud Limitations:** - `list_projects()` - May return "Request method 'GET' not supported" - `get_heartbeat()` - May return empty responses - `create_stream()` and `execute_query()` - May timeout with 500 errors - `SHOW JOBS` - Not supported in cloud environment **✅ Workarounds:** - Use `list_streams()` instead of `list_projects()` - Skip heartbeat checks in cloud environments - Handle timeouts gracefully with retry logic - Use job management tools instead of SHOW commands ### Materialized View Engine (MVE) API **Separate Authentication**: MVE API requires different authentication than the main SSB API: **Authentication Requirements:** - **SSB API**: Bearer token authentication - **MVE API**: Basic authentication (username:password) - **Credentials**: Must be provided separately for MVE access **Access Pattern:** ```python # MVE API access requires Basic Auth credentials = 'username:password' encoded_credentials = base64.b64encode(credentials.encode()).decode() headers = {'Authorization': f'Basic {encoded_credentials}'} ``` ### Monitoring & Diagnostics - "Check the system heartbeat and health" - "Show me the diagnostic counters" - "Analyze this SQL query for performance: SELECT * FROM NVDA WHERE close > 100" - "What's the current system performance?" ### Enhanced Table Management - "Show me detailed information about all tables" - "Get the hierarchical table structure by catalog" - "Validate this data source configuration" - "Create a new table with full configuration" - "Get detailed information about table 'user_events'" ![Claude Get Table Info](images/ClaudeGetTableInfo.png) *Claude can provide detailed information about specific tables, including their schema and configuration.* ### User & Project Management - "Show me my user settings and preferences" - "Update my user settings to enable dark mode" - "List all available projects" - "Create a new project called 'analytics'" - "Get details about project 'ffffffff'" - "Show me my user information" ### API Key Management - "List all my API keys" - "Create a new API key for external access" - "Delete API key 'key123'" - "Get details about API key 'key123'" ### Environment Management - "List all available environments" - "Switch to environment 'production'" - "Create a new environment called 'staging'" - "Get details about environment 'dev'" - "Deactivate the current environment" ### Sync & Configuration - "Show me the current sync configuration" - "Update the sync configuration for Git integration" - "Export project 'analytics' configuration" - "Import project configuration from Git" - "Validate sync configuration for project 'test'" ### UDF Management - "List all user-defined functions with details" - "Run UDF 'custom_aggregate' with parameters" - "Create a new UDF for data transformation" - "Update UDF 'my_function' configuration" - "Get UDF artifacts and dependencies" ### Sample Data Examples The MCP server can retrieve real-time streaming data samples with different sampling modes: ![Claude Job Sample](images/ClaudeJobSample.png) *Claude can retrieve real-time sample data from running jobs, showing actual streaming data.* **Periodic Sampling (default):** ```json { "records": [ { "___open": "185.0919", "___high": "185.1200", "___low": "184.9400", "___close": "184.9700", "___volume": "61884", "eventTimestamp": "2025-10-08T18:34:10.915Z" } ], "job_status": "RUNNING", "end_of_samples": false, "message": "Retrieved 1 sample records" } ``` **Sample All Messages Mode:** ```json { "sampling_mode": "sample_all_messages", "sample_interval": 0, "sample_count": 10000, "window_size": 10000, "message": "Job created with comprehensive sampling enabled" } ``` ### SQL Query Features - **Automatic Semicolon Handling**: All SQL queries are automatically terminated with semicolons - **Flexible Sampling**: Choose between periodic sampling or sample all messages - **Job Control**: Start, stop, and restart jobs with different configurations - **Real-time Data**: Access streaming data samples from running jobs ## Advanced Features ### Sample All Messages For comprehensive data sampling, use the `sample_all_messages=True` option: ```python # Create job with sample all messages execute_query_with_sampling("SELECT * FROM NVDA", sample_all_messages=True) # Restart job with sample all messages restart_job_with_sampling(1234, "SELECT * FROM NVDA", sample_all_messages=True) ``` **Configuration:** - `sample_interval: 0` (sample immediately) - `sample_count: 10000` (high count to capture all messages) - `window_size: 10000` (large window for comprehensive sampling) ### Custom Sampling Configuration Fine-tune sampling behavior for your specific needs: ```python # Custom sampling with 500ms interval execute_query_with_sampling("SELECT * FROM NVDA", sample_interval=500, sample_count=500, window_size=500) # Configure existing job sampling configure_sampling("sample_id", sample_interval=200, sample_count=1000, window_size=1000) ``` ### Job Management Complete job lifecycle management: ```python # Stop job with savepoint stop_job(1234, savepoint=True) # Restart job with new SQL execute_job(1234, "SELECT * FROM NEW_TABLE") # Restart with sampling options restart_job_with_sampling(1234, "SELECT * FROM NVDA", sample_interval=1000, sample_all_messages=False) ``` ### Kafka Table Creation Create tables that are restricted to local-kafka connector only: ```python # Step 1: Create data source (creates configuration) create_kafka_table("user_events", "user-events") # Uses local-kafka by default # Step 2: Register table in Flink catalog (makes it queryable) register_kafka_table("user_events", "user-events") # Creates ssb_user_events in ssb.ssb_default (falls back to default_catalog.ssb_default) # Advanced: Custom schema registration custom_schema = [ {"name": "id", "type": "STRING"}, {"name": "name", "type": "STRING"}, {"name": "timestamp", "type": "TIMESTAMP"} ] register_kafka_table("custom_table", "custom-topic", custom_schema) # Creates ssb_custom_table # Without ssb_ prefix register_kafka_table("raw_data", "raw-topic", use_ssb_prefix=False) # Creates raw_data (no prefix) # Custom catalog and database register_kafka_table("custom_table", "custom-topic", catalog="default_catalog", database="default_database") # Local Kafka with custom settings create_kafka_table("local_data", "local-topic", "local-kafka", "localhost:9092", "json", "earliest-offset") register_kafka_table("local_data", "local-topic") # Creates ssb_local_data # Validate connector types validate_kafka_connector("local-kafka") # Returns validation details validate_kafka_connector("kafka") # Returns error - only local-kafka allowed ``` **Two-Step Process**: Creating a Kafka table requires two steps: 1. **Create Data Source**: Use `create_kafka_table()` to create the data source configuration 2. **Register in Catalog**: Use `register_kafka_table()` to make the table queryable **Automatic Registration**: The `register_kafka_table()` function: - Uses DDL to register the table in the Flink catalog (default: `ssb.ssb_default`, falls back to `default_catalog.ssb_default`) - Configurable catalog and database parameters for flexible namespace control - Automatic catalog fallback when requested catalog is not available - Creates tables in the same database as existing tables like NVDA by default - Automatically adds `ssb_` prefix to table names (configurable) - Automatically creates a schema based on the topic data - Verifies the table is available for querying by checking the correct database context - Returns confirmation of successful registration with full table name, catalog, and database info **Naming Conventions**: - **Default**: Tables get `ssb_` prefix automatically (e.g., `user_events` → `ssb_user_events`) - **Override**: Use `use_ssb_prefix=False` to disable prefix - **Existing**: Tables already starting with `ssb_` are not modified **Namespace Configuration**: - **Default**: Tables created in `ssb.ssb_default` namespace (falls back to `default_catalog.ssb_default` if `ssb` catalog unavailable) - **Custom Catalog**: Use `catalog` parameter to specify different catalog - **Custom Database**: Use `database` parameter to specify different database - **Automatic Fallback**: System automatically falls back to `default_catalog` if requested catalog is not available - **Full Control**: Both catalog and database are configurable for maximum flexibility **Verification**: - Use `SHOW TABLES;` to confirm tables are available for querying - Tables are created in `default_catalog.ssb_default` namespace (same as NVDA) - Use full namespace (`default_catalog.ssb_default.TABLE_NAME`) or switch database context with `USE default_catalog.ssb_default;` - All virtual Kafka tables are co-located with existing tables for easy querying **Supported Connector:** - `local-kafka` - Local Kafka connector (only option for virtual tables) **Supported Formats:** - `json` - JSON format (default) - `csv` - CSV format - `avro` - Apache Avro format - Custom format strings ## Docker Compose Setup The repository includes a complete Docker Compose setup for local development and testing: ### Services Included - **PostgreSQL**: Database for SSB metadata - **Kafka**: Message streaming platform - **Flink**: Stream processing engine - **NiFi**: Data flow management - **Qdrant**: Vector database - **SSB SSE**: SQL Stream Builder Streaming SQL Engine - **SSB MVE**: SQL Stream Builder Materialized View Engine - **Apache Knox**: Gateway for secure access (optional) ### Starting the Environment ```bash # Start all services docker-compose up -d # Check service status docker-compose ps # View logs docker-compose logs -f ssb-sse ``` ### Access Points - **SSB SSE**: http://localhost:18121 - **SSB MVE**: http://localhost:18131 - **Flink Job Manager**: http://localhost:8081 - **NiFi**: http://localhost:8080 - **Knox Gateway**: https://localhost:8444 (if enabled) ## Write Operations By default, the server runs in read-only mode for CDP deployments and write-enabled for standalone deployments. To change this: 1. Set `SSB_READONLY=false` (enable writes) or `SSB_READONLY=true` (read-only) 2. Restart the MCP server Write operations include: - Creating, updating, and deleting streams - Executing SQL queries that create jobs - Managing job lifecycle (start, stop, restart) - Configuring sampling parameters - Job control and management - Creating Kafka-only tables (enforced validation) ## Comprehensive Capabilities The SSB MCP Server now provides **80+ MCP tools** covering **80%+ of the SSB API**, making it the most comprehensive SSB management platform available through Claude Desktop. ### 📊 Coverage Statistics - **Total MCP Tools**: 80+ (up from 33) - **API Coverage**: 80%+ (up from 20%) - **Functional Categories**: 15 (up from 6) - **Available Endpoints**: 67+ (up from 15) ### 🎯 Key Capabilities #### **Complete SSB Management** - **Job Lifecycle**: Create, monitor, control, copy, and manage jobs - **Data Management**: Tables, schemas, validation, and hierarchical organization - **System Monitoring**: Health checks, diagnostics, and performance tracking - **User Management**: Settings, projects, environments, and API keys - **DevOps Integration**: Sync, export/import, and configuration management #### **Advanced Features** - **Real-time Sampling**: Flexible data sampling with "sample all messages" option - **SQL Analysis**: Query analysis without execution for performance optimization - **Materialized Views**: Create and manage materialized view endpoints - **Custom UDFs**: User-defined function management and execution - **Environment Control**: Multi-environment support with switching capabilities - **Project Management**: Full project lifecycle with export/import capabilities #### **Enterprise Ready** - **Security**: API key management and user authentication - **Monitoring**: Comprehensive system health and performance tracking - **Scalability**: Support for multiple projects and environments - **Integration**: Git sync, configuration management, and DevOps workflows - **Flexibility**: Configurable catalogs, databases, and naming conventions ### 🚀 Use Cases #### **Data Engineers** - Stream processing job management and monitoring - Real-time data sampling and analysis - Table schema management and validation - Performance optimization and troubleshooting #### **DevOps Engineers** - Environment management and configuration - Project export/import and version control - System monitoring and health checks - API key management and security #### **Data Scientists** - Custom UDF development and execution - Data format management and validation - Query analysis and optimization - Real-time data exploration #### **Platform Administrators** - User and project management - System diagnostics and monitoring - Connector and format management - Sync configuration and validation ## Testing The SSB MCP Server includes a comprehensive test suite. See [Testing/README.md](Testing/README.md) for detailed testing documentation, including: - Quick functionality tests - Comprehensive test suite covering all 80+ MCP tools - Cloud SSB test protocols - Test configuration and best practices - Detailed test results and analysis **Quick Start:** ```bash cd Testing && python run_tests.py --quick ``` ## Troubleshooting ### Common Issues 1. **"Unauthorized" errors**: Check your authentication credentials - For direct SSB: Verify `SSB_USER` and `SSB_PASSWORD` - For Knox: Verify `KNOX_TOKEN` or `KNOX_USER`/`KNOX_PASSWORD` 2. **"Connection refused" errors**: Ensure SSB services are running - Check `docker-compose ps` for service status - Verify port mappings in docker-compose.yml 3. **"No sample data available"**: Jobs may need time to produce data - Check job status with `get_job_status(job_id)` - Verify the job is running and has sample configuration - Try using `sample_all_messages=True` for comprehensive sampling 4. **Job restart failures**: If job restart fails - Use `restart_job_with_sampling()` instead of `execute_job()` - Check if the job is in a state that allows restarting - Create a new job if restart is not possible - Note: `start_stream()` and `stop_stream()` methods don't work (404 errors) 5. **SSL certificate errors**: For Knox deployments - Set `KNOX_VERIFY_SSL=false` for self-signed certificates - Or provide proper CA bundle with `KNOX_CA_BUNDLE` 6. **Kafka table creation errors**: If table creation fails - Verify that only local-kafka connector is used (enforced) - Check that the Kafka topic exists and is accessible - Ensure bootstrap servers are correctly configured - Use `validate_kafka_connector()` to check connector validity 7. **Virtual tables not available for querying**: After creating Kafka tables - **Important**: Creating a data source doesn't automatically make it available for querying - Data sources need to be manually registered in the Flink catalog through the SSB UI - Use `SHOW TABLES;` to see which tables are actually available for querying - Only tables that appear in `SHOW TABLES;` can be queried via SQL 8. **Configuration not updating**: After updating `config/cloud_ssb_config.json` - **Important**: Environment variables take precedence over JSON configuration - For JSON config: Update `config/cloud_ssb_config.json` and restart the MCP server - For environment variables: `export KNOX_TOKEN="new_token"` and restart - Check which method is being used by your deployment 9. **Materialized View access errors**: When accessing MVE API - MVE API requires Basic authentication (username:password) - SSB API uses Bearer token authentication - Use different credentials for MVE API access - Check that the job is running before accessing its MV 10. **Empty table lists**: When running `SHOW TABLES` - SSB environment has limited traditional table support - Use `list_streams()` to see available jobs - Access materialized views via MVE API, not SQL - Focus on job-based data processing rather than table queries ### Debug Mode Enable debug logging by setting environment variable: ```bash export MCP_LOG_LEVEL=DEBUG ``` ## Security - All sensitive data (passwords, tokens, secrets) is automatically redacted in responses - Large collections are truncated to prevent overwhelming the LLM - Read-only mode is enabled by default for CDP deployments to prevent accidental modifications - Direct SSB authentication uses basic auth over HTTP (suitable for local development) - SQL queries are automatically sanitized with proper semicolon termination - Kafka table creation enforces local-kafka-only connector for data security ## Summary The SSB MCP Server is now a **comprehensive management platform** for SQL Stream Builder, providing Claude Desktop with access to virtually all SSB functionality through **80+ MCP tools**. ### 🎯 **What You Get:** - **Complete SSB Control**: Manage jobs, tables, users, projects, and environments - **Advanced Monitoring**: System health, diagnostics, and performance tracking - **Real-time Data**: Flexible sampling and streaming data access - **Enterprise Features**: API keys, sync, export/import, and multi-environment support - **Developer Tools**: UDF management, SQL analysis, and connector details - **DevOps Integration**: Project management, configuration sync, and Git workflows ### 🚀 **Key Benefits:** - **80%+ API Coverage**: Access to virtually all SSB functionality - **80+ MCP Tools**: Comprehensive toolset for every use case - **15 Functional Categories**: Organized, discoverable capabilities - **Enterprise Ready**: Security, monitoring, and scalability features - **User Friendly**: Natural language interaction through Claude Desktop - **Flexible**: Supports both standalone and CDP deployments ### 📈 **Perfect For:** - **Data Engineers**: Stream processing, job management, real-time analysis - **DevOps Teams**: Environment management, monitoring, configuration sync - **Data Scientists**: Custom UDFs, query analysis, data exploration - **Platform Admins**: User management, system monitoring, security The SSB MCP Server transforms Claude Desktop into a powerful SSB management interface, enabling natural language interaction with your entire SQL Stream Builder environment! 🎉 ## License Apache License 2.0

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BrooksIan/SSB-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server