Kafka MCP Server
Click on "Install Server".
Wait a few minutes for the server to deploy. Once ready, it will show a "Started" state.
In the chat, type
@followed by the MCP server name and your instructions, e.g., "@Kafka MCP Servertail the last 10 messages from the orders topic"
That's it! The server will respond to your query, and you can continue using it as needed.
Here is a step-by-step guide with screenshots.
Kafka MCP Server
Overview
FastMCP-based MCP server that connects to Kafka and exposes MCP tools for common broker, topic, consumer, and retention operations.
Features
List/create/delete topics and inspect topic configs.
Tail recent messages or collect a short live stream.
List consumer groups and compute group lag.
Inspect cluster broker metadata.
Store and manage named Kafka users locally.
Requirements
Python 3.10+ (Makefile defaults to 3.13).
uv (recommended) or any Python environment manager.
A Kafka broker (local via Podman or your own cluster).
Quick start (uv)
Create and activate a virtual environment:
uv python install 3.13
uv venv .venv --python 3.13
source .venv/bin/activate
Install dependencies:
uv sync
Run the server:
uv run python -m app.main
Default MCP SSE endpoint: http://localhost:8000/mcp
Makefile shortcuts
Start server: make start
Start Kafka (Podman): make kafka-start
Stop Kafka: make kafka-stop
Kafka logs: make kafka-logs
Run smoke test: make test-smoke
Configuration
Server settings come from FastMCP ServerSettings. The defaults are:
Host: 0.0.0.0
Port: 8000
FastMCP supports environment overrides such as:
FASTMCP_SERVER_HOST
FASTMCP_SERVER_PORT
Smoke tests and clients can use:
MCP_URL (default http://localhost:8000/mcp)
KAFKA_BOOTSTRAP_SERVERS (default localhost:9092)
KAFKA_TEST_TOPIC (default mcp_smoke)
KAFKA_TEST_GROUP (default mcp_smoke_group)
Kafka connection payload
Most tools require a Kafka connection object with the following fields:
bootstrap_servers (required)
security_protocol (default PLAINTEXT)
sasl_mechanism, sasl_username, sasl_password (optional)
ssl_cafile, ssl_certfile, ssl_keyfile (optional)
oauth_token (required when sasl_mechanism is OAUTHBEARER)
MCP tool catalog
Each tool name below is the MCP command. Parameter shapes match the schemas in app/schemas.py.
health- Returns{"status":"ok"}.Params: none
list_topics- Lists topics with partition and replication info.Params:
connection
create_topic- Creates a topic with optional configs.Params:
connection,payload(name,num_partitions,replication_factor,configs)
delete_topic- Deletes a topic by name.Params:
connection,name
topic_configs- Fetches topic configuration values.Params:
connection,name
topic_retention- Returnsretention.msfor a topic.Params:
connection,name
tail_messages- Reads the most recent messages for a topic.Params:
connection,name,payload(limit)
live_messages- Collects a short live stream of messages.Params:
connection,name,payload(max_messages,duration_seconds,poll_interval_ms)
list_consumer_groups- Lists consumer groups with state and member count.Params:
connection
consumer_group_lag- Computes lag per partition for a group.Params:
connection,group_id
cluster_info- Returns broker and controller metadata.Params:
connection
list_kafka_users- Lists locally stored user entries.Params: none
upsert_kafka_user- Creates or updates a local user entry.Params:
user(username,sasl_mechanism,note)
delete_kafka_user- Deletes a local user entry.Params:
username
MCP command examples (Python)
from mcp import ClientSession
from mcp.client.sse import sse_client
MCP_URL = "http://localhost:8000/mcp"
connection = {
"bootstrap_servers": "localhost:9092",
"security_protocol": "PLAINTEXT",
"sasl_mechanism": None,
"sasl_username": None,
"sasl_password": None,
"ssl_cafile": None,
"ssl_certfile": None,
"ssl_keyfile": None,
"oauth_token": None,
}
async with sse_client(MCP_URL) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
await session.call_tool("list_topics", {"connection": connection})
await session.call_tool(
"create_topic",
{
"connection": connection,
"payload": {
"name": "example-topic",
"num_partitions": 1,
"replication_factor": 1,
"configs": {},
},
},
)Local storage
Kafka user entries are stored at app/data/users.json.
Testing
Smoke test all MCP tools end-to-end:
Start Kafka: make kafka-start
Start the server: make start
In a new terminal: make test-smoke
LocalAI (optional)
LocalAI is not required for Kafka MCP usage, but this repo includes helper targets for running models locally:
Install LocalAI and a model: make localai-install && make localai-model
Start LocalAI: make localai-start
This server cannot be installed
Resources
Unclaimed servers have limited discoverability.
Looking for Admin?
If you are the server author, to access and configure the admin panel.
Latest Blog Posts
MCP directory API
We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/jilanisayyad/kafka-mcp-server'
If you have feedback or need assistance with the MCP directory API, please join our Discord server