Skip to main content
Glama
williajm
by williajm

MCP Kafka

Category

Status

Build & CI

CI CodeQL Pre-commit Dependency Review License Compliance

SonarCloud

Quality Gate Status Coverage Maintainability Rating Reliability Rating Security Rating

Security

Bandit Dependabot

Technology

Python 3.11-3.14 Kafka License: MIT Code style: ruff type-checked: mypy MCP

A Model Context Protocol (MCP) server for Apache Kafka that provides AI assistants with safe, controlled access to Kafka clusters.

Features

  • 12 Kafka tools for topic management, message operations, consumer groups, and cluster info

  • 2-tier access control: READ (default) and READ/WRITE modes

  • Universal Kafka support: SASL (PLAIN, SCRAM, GSSAPI) and mTLS authentication

  • Safety controls: Protected internal topics, consumer group validation, message size limits

  • Built with FastMCP: Modern MCP server implementation

Installation

# Using uv (recommended)
uv add mcp-kafka

# Using pip
pip install mcp-kafka

Quick Start

1. Configure Environment

# Basic connection
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092

# SASL authentication
export KAFKA_SECURITY_PROTOCOL=SASL_SSL
export KAFKA_SASL_MECHANISM=SCRAM-SHA-256
export KAFKA_SASL_USERNAME=your-username
export KAFKA_SASL_PASSWORD=your-password

# Enable write operations (disabled by default)
export SAFETY_ALLOW_WRITE_OPERATIONS=true

2. Run the Server

# stdio transport (default, for MCP clients)
uv run mcp-kafka

# HTTP transport (for web integrations)
uv run mcp-kafka --transport http --host 127.0.0.1 --port 8000

# Using convenience scripts
./scripts/http-read.sh      # Read-only HTTP server
./scripts/http-readwrite.sh # Read-write HTTP server

CLI Options

Option

Default

Description

--transport

stdio

Transport type: stdio or http

--host

127.0.0.1

Host to bind (HTTP only)

--port

8000

Port to bind (HTTP only)

--health-check

-

Run health check and exit

--version, -v

-

Show version and exit

3. Connect to MCP Client

Add to your MCP client configuration (e.g., Claude Desktop):

{
  "mcpServers": {
    "kafka": {
      "command": "uv",
      "args": ["run", "mcp-kafka"],
      "env": {
        "KAFKA_BOOTSTRAP_SERVERS": "localhost:9092"
      }
    }
  }
}

Available Tools

Topic Management

Tool

Access

Description

kafka_list_topics

READ

List all topics with partition counts

kafka_describe_topic

READ

Get detailed topic info and configuration

kafka_create_topic

READ/WRITE

Create a new topic with partitions, replication factor, and config

Message Operations

Tool

Access

Description

kafka_consume_messages

READ

Peek at messages (no offset commit)

kafka_produce_message

READ/WRITE

Produce a message with optional key, headers, and partition

Consumer Group Management

Tool

Access

Description

kafka_list_consumer_groups

READ

List all consumer groups

kafka_describe_consumer_group

READ

Get group details, members, and lag

kafka_get_consumer_lag

READ

Get lag per partition

kafka_reset_offsets

READ/WRITE

Reset consumer group offsets to earliest, latest, or specific offset

Cluster Information

Tool

Access

Description

kafka_cluster_info

READ

Get cluster metadata

kafka_list_brokers

READ

List all brokers

kafka_get_watermarks

READ

Get topic partition watermarks

Configuration

For detailed configuration options and examples, see CONFIGURATION.md.

Environment Variables

Kafka Connection

Variable

Default

Description

KAFKA_BOOTSTRAP_SERVERS

localhost:9092

Kafka broker addresses

KAFKA_SECURITY_PROTOCOL

PLAINTEXT

Security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL)

KAFKA_CLIENT_ID

mcp-kafka

Client identifier

KAFKA_TIMEOUT

30

Operation timeout in seconds

SASL Authentication

Variable

Default

Description

KAFKA_SASL_MECHANISM

-

SASL mechanism (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI)

KAFKA_SASL_USERNAME

-

SASL username

KAFKA_SASL_PASSWORD

-

SASL password

KAFKA_SASL_KERBEROS_SERVICE_NAME

kafka

Kerberos service name

KAFKA_SASL_KERBEROS_KEYTAB

-

Path to Kerberos keytab

KAFKA_SASL_KERBEROS_PRINCIPAL

-

Kerberos principal

SSL/TLS

Variable

Default

Description

KAFKA_SSL_CA_LOCATION

-

CA certificate path

KAFKA_SSL_CERTIFICATE_LOCATION

-

Client certificate path

KAFKA_SSL_KEY_LOCATION

-

Client key path

KAFKA_SSL_KEY_PASSWORD

-

Client key password

Safety Controls

Variable

Default

Description

SAFETY_ALLOW_WRITE_OPERATIONS

false

Enable READ/WRITE tools

SAFETY_MAX_MESSAGE_SIZE

1048576

Max message size in bytes (1MB)

SAFETY_MAX_CONSUME_MESSAGES

100

Max messages per consume request

SAFETY_TOPIC_BLOCKLIST

-

Comma-separated blocked topic patterns

Security Controls

Variable

Default

Description

SECURITY_RATE_LIMIT_ENABLED

true

Enable rate limiting

SECURITY_RATE_LIMIT_RPM

60

Max requests per minute

SECURITY_AUDIT_LOG_ENABLED

true

Enable audit logging

SECURITY_AUDIT_LOG_FILE

mcp_audit.log

Audit log file path

SECURITY_OAUTH_ENABLED

false

Enable OAuth/OIDC authentication

SECURITY_OAUTH_ISSUER

-

OAuth issuer URL (e.g., https://auth.example.com)

SECURITY_OAUTH_AUDIENCE

-

Expected JWT audience claim

SECURITY_OAUTH_JWKS_URL

-

JWKS endpoint URL (auto-derived from issuer if not set)

Access Control

MCP Kafka uses a simple 2-tier access control system:

READ Access (Default)

  • List topics, consumer groups, and brokers

  • Describe topics and consumer groups

  • Consume messages (read-only peek)

  • Get cluster info and watermarks

READ/WRITE Access

Requires SAFETY_ALLOW_WRITE_OPERATIONS=true:

  • Create topics

  • Produce messages

  • Reset consumer group offsets

Protected Resources

The following resources are always protected:

  • Internal topics: __consumer_offsets, __transaction_state

  • Internal consumer groups: Groups starting with __

  • Topics in blocklist: Configured via SAFETY_TOPIC_BLOCKLIST

Development

Prerequisites

  • Python 3.11+

  • uv package manager

Setup

# Clone repository
git clone https://github.com/williajm/mcp_kafka.git
cd mcp_kafka

# Install dependencies
uv sync --all-extras

# Run tests
uv run pytest

# Run linting
uv run ruff check src/ tests/
uv run ruff format --check src/ tests/

# Run type checking
uv run mypy src/

Local Kafka for Testing

A Docker Compose environment is provided for local development:

# Start Kafka
docker compose -f docker/docker-compose.yml up -d

# Create test topics
docker compose -f docker/docker-compose.yml exec kafka \
  kafka-topics --create --topic test-topic --partitions 3 --replication-factor 1 \
  --bootstrap-server localhost:9092

# Stop Kafka
docker compose -f docker/docker-compose.yml down

License

MIT License - see LICENSE file for details.

A
license - permissive license
-
quality - not tested
A
maintenance

Maintenance

Maintainers
5hResponse time
4wRelease cycle
2Releases (12mo)

Resources

Unclaimed servers have limited discoverability.

Looking for Admin?

If you are the server author, to access and configure the admin panel.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/williajm/mcp_kafka'

If you have feedback or need assistance with the MCP directory API, please join our Discord server