Skip to main content
Glama
souravch

Apache Beam MCP Server

by souravch

Apache Beam MCP Server

A Model Context Protocol (MCP) server for managing Apache Beam pipelines across different runners: Flink, Spark, Dataflow, and Direct.

Python 3.9+ MCP Version Apache Beam Docker Kubernetes

What is This?

The Apache Beam MCP Server provides a standardized API for managing Apache Beam data pipelines across different runners. It's designed for:

  • Data Engineers: Manage pipelines with a consistent API regardless of runner

  • AI/LLM Developers: Enable AI-controlled data pipelines via the MCP standard

  • DevOps Teams: Simplify pipeline operations and monitoring

Related MCP server: Engineering MCP Server

Key Features

  • Multi-Runner Support: One API for Flink, Spark, Dataflow, and Direct runners

  • MCP Compliant: Follows the Model Context Protocol for AI integration

  • Pipeline Management: Create, monitor, and control data pipelines

  • Easy to Extend: Add new runners or custom features

  • Production-Ready: Includes Docker/Kubernetes deployment, monitoring, and scaling

Quick Start

Installation

# Clone the repository
git clone https://github.com/yourusername/beam-mcp-server.git
cd beam-mcp-server

# Create a virtual environment
python -m venv beam-mcp-venv
source beam-mcp-venv/bin/activate  # On Windows: beam-mcp-venv\Scripts\activate

# Install dependencies
pip install -r requirements.txt

Start the Server

# With the Direct runner (no external dependencies)
python main.py --debug --port 8888

# With Flink runner (if you have Flink installed)
CONFIG_PATH=config/flink_config.yaml python main.py --debug --port 8888

Run Your First Job

# Create test input
echo "This is a test file for Apache Beam WordCount example" > /tmp/input.txt

# Submit a job using curl
curl -X POST http://localhost:8888/api/v1/jobs \
  -H "Content-Type: application/json" \
  -d '{
    "job_name": "test-wordcount",
    "runner_type": "direct",
    "job_type": "BATCH",
    "code_path": "examples/pipelines/wordcount.py",
    "pipeline_options": {
      "input_file": "/tmp/input.txt",
      "output_path": "/tmp/output"
    }
  }'

Docker Support

Using Pre-built Images

Pre-built Docker images are available on GitHub Container Registry:

# Pull the latest image
docker pull ghcr.io/yourusername/beam-mcp-server:latest

# Run the container
docker run -p 8888:8888 \
  -v $(pwd)/config:/app/config \
  -e GCP_PROJECT_ID=your-gcp-project \
  -e GCP_REGION=us-central1 \
  ghcr.io/yourusername/beam-mcp-server:latest

Building Your Own Image

# Build the image
./scripts/build_and_push_images.sh

# Build and push to a registry
./scripts/build_and_push_images.sh --registry your-registry --push --latest

Docker Compose

For local development with multiple services (Flink, Spark, Prometheus, Grafana):

docker-compose -f docker-compose.dev.yaml up -d

Kubernetes Deployment

The repository includes Kubernetes manifests for deploying the Beam MCP Server to Kubernetes:

# Deploy using kubectl
kubectl apply -k kubernetes/

# Deploy using Helm
helm install beam-mcp ./helm/beam-mcp-server \
  --namespace beam-mcp \
  --create-namespace

For detailed deployment instructions, see the Kubernetes Deployment Guide.

MCP Standard Endpoints

The Beam MCP Server implements all standard Model Context Protocol (MCP) endpoints, providing a comprehensive framework for AI-managed data pipelines:

/tools Endpoint

Manage AI agents and models for pipeline processing:

# Register a sentiment analysis tool
curl -X POST "http://localhost:8888/api/v1/tools/" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "sentiment-analyzer",
    "description": "Analyzes sentiment in text data",
    "type": "transformation",
    "parameters": {
      "text_column": {
        "type": "string",
        "description": "Column containing text to analyze"
      }
    }
  }'

/resources Endpoint

Manage datasets and other pipeline resources:

# Register a dataset
curl -X POST "http://localhost:8888/api/v1/resources/" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Customer Transactions",
    "description": "Daily customer transaction data",
    "resource_type": "dataset",
    "location": "gs://analytics-data/transactions/*.csv"
  }'

/contexts Endpoint

Define execution environments for pipelines:

# Create a Dataflow execution context
curl -X POST "http://localhost:8888/api/v1/contexts/" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Dataflow Prod",
    "description": "Production Dataflow environment",
    "context_type": "dataflow",
    "parameters": {
      "region": "us-central1",
      "project": "beam-analytics-prod"
    }
  }'

These MCP standard endpoints integrate seamlessly with Beam's core functionality to provide a complete solution for managing data pipelines. For detailed examples and use cases, see the MCP Protocol Compliance.

Documentation

Python Client Example

import requests

# Get available runners
headers = {"MCP-Session-ID": "my-session-123"}
runners = requests.get("http://localhost:8888/api/v1/runners", headers=headers).json()

# Create a job
job = requests.post(
    "http://localhost:8888/api/v1/jobs",
    headers=headers,
    json={
        "job_name": "wordcount-example",
        "runner_type": "flink",
        "job_type": "BATCH",
        "code_path": "examples/pipelines/wordcount.py",
        "pipeline_options": {
            "parallelism": 2,
            "input_file": "/tmp/input.txt",
            "output_path": "/tmp/output"
        }
    }
).json()

# Monitor job status
job_id = job["data"]["job_id"]
status = requests.get(f"http://localhost:8888/api/v1/jobs/{job_id}", headers=headers).json()

CI/CD Pipeline

The repository includes a GitHub Actions workflow for continuous integration and deployment:

  • CI: Runs tests, linting, and type checking on every pull request

  • CD: Builds and pushes Docker images on every push to main/master

  • Deployment: Automatically deploys to development and production environments

Monitoring and Observability

The Beam MCP Server includes built-in support for monitoring and observability:

  • Prometheus Metrics: Exposes metrics at /metrics endpoint

  • Grafana Dashboards: Pre-configured dashboards for monitoring

  • Health Checks: Provides health check endpoint at /health

  • Logging: Structured JSON logging for easy integration with log aggregation systems

Contributing

We welcome contributions! See our Contributing Guide for details.

To run the tests:

# Run the regression tests
./scripts/run_regression_tests.sh

License

This project is licensed under the Apache License 2.0.

MCP Implementation Status

The MCP (Model Context Protocol) implementation is divided into phases:

Phase 1: Core Connection Lifecycle (COMPLETED)

  • ✅ Connection initialization

  • ✅ Connection state management

  • ✅ Basic capability negotiation

  • ✅ HTTP transport with SSE

  • ✅ JSON-RPC message handling

  • ✅ Error handling

Phase 2: Full Capability Negotiation (COMPLETED)

  • ✅ Enhanced capability compatibility checking

  • ✅ Semantic version compatibility for features

  • ✅ Support levels for features (required, preferred, optional, experimental)

  • ✅ Capability property validation

  • ✅ Capability-based API endpoint control

  • ✅ Feature router integration with FastAPI

Phase 3: Advanced Message Handling (COMPLETED)

  • ✅ Structured message types

  • ✅ Message validation

  • ✅ Improved error handling

  • ✅ Batch message processing

Phase 4: Production Optimization (TODO)

  • ⬜ Performance optimizations

  • ⬜ Monitoring and metrics

  • ⬜ Advanced security features

  • ⬜ High availability support

When building clients to interact with the MCP server, you must follow the Model Context Protocol. For details, see the MCP Protocol Compliance.

A
license - permissive license
-
quality - not tested
D
maintenance

Maintenance

Maintainers
Response time
Release cycle
Releases (12mo)
Commit activity

Resources

Unclaimed servers have limited discoverability.

Looking for Admin?

If you are the server author, to access and configure the admin panel.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/souravch/beam-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server