Skip to main content
Glama

Kafka MCP サーバー

Apache Kafka と統合して、LLM および Agentic アプリケーションの公開および消費機能を提供するメッセージ コンテキスト プロトコル (MCP) サーバー。

概要

このプロジェクトは、AIモデルが標準化されたインターフェースを介してKafkaトピックと対話できるようにするサーバーを実装します。サポート対象は以下のとおりです。

  • Kafkaトピックへのメッセージの公開

  • Kafkaトピックからのメッセージの消費

Related MCP server: Slack MCP Server

前提条件

  • Python 3.8以上

  • Apache Kafka インスタンス

  • Python の依存関係 (インストールのセクションを参照)

インストール

  1. リポジトリをクローンします。

    git clone <repository-url> cd <repository-directory>
  2. 仮想環境を作成してアクティブ化します。

    python -m venv venv source venv/bin/activate # On Windows, use: venv\Scripts\activate
  3. 必要な依存関係をインストールします。

    pip install -r requirements.txt

    requirements.txt が存在しない場合は、次のパッケージをインストールします。

    pip install aiokafka python-dotenv pydantic-settings mcp-server

構成

次の変数を含む.envファイルをプロジェクト ルートに作成します。

# Kafka Configuration KAFKA_BOOTSTRAP_SERVERS=localhost:9092 TOPIC_NAME=your-topic-name IS_TOPIC_READ_FROM_BEGINNING=False DEFAULT_GROUP_ID_FOR_CONSUMER=kafka-mcp-group # Optional: Custom Tool Descriptions # TOOL_PUBLISH_DESCRIPTION="Custom description for the publish tool" # TOOL_CONSUME_DESCRIPTION="Custom description for the consume tool"

使用法

サーバーの実行

提供されているmain.pyスクリプトを使用してサーバーを実行できます。

python main.py --transport stdio

利用可能な交通手段:

  • stdio : 標準入出力(デフォルト)

  • sse : サーバー送信イベント

Claude Desktopとの統合

この Kafka MCP サーバーを Claude Desktop で使用するには、Claude Desktop 構成ファイルに次の構成を追加します。

{ "mcpServers": { "kafka": { "command": "python", "args": [ "<PATH TO PROJECTS>/main.py" ] } } }

<PATH TO PROJECTS>プロジェクト ディレクトリへの絶対パスに置き換えます。

プロジェクト構造

  • main.py : アプリケーションのエントリポイント

  • kafka.py : Kafka コネクタの実装

  • server.py : Kafka とのやり取りのためのツールを備えた MCP サーバーの実装

  • settings.py : Pydantic を使用した構成管理

利用可能なツール

kafka-publish

構成された Kafka トピックに情報を公開します。

kafka-consume

構成された Kafka トピックから情報を消費します。

  • 注意: 一度トピックからメッセージを読み取ると、同じグループIDを使用して再度読み取ることはできません。

トピックの作成

指定されたパラメータを使用して新しい Kafka トピックを作成します。

  • オプション:

    • --topic作成するトピックの名前

    • --partitions割り当てるパーティションの数

    • --replication-factorブローカー間のレプリケーション係数

    • --config (オプション) トピックレベルの構成のオーバーライド (例: retention.ms=604800000 )

トピックを削除

既存の Kafka トピックを削除します。

  • オプション:

    • --topic削除するトピックの名前

    • --timeout (オプション) 削除が完了するまでの待機時間

リストトピック

クラスター内のすべてのトピックを一覧表示します (またはパターンでフィルタリングします)。

  • オプション:

    • --bootstrap-serverブローカーアドレス

    • --pattern (オプション) トピック名をフィルタリングする正規表現

    • --exclude-internal (オプション) 内部トピックを除外する (デフォルト: true)

トピック構成

1 つ以上のトピックの構成を表示または変更します。

  • オプション:

    • --describeトピックの現在の設定を表示する

    • --alter構成を変更します (例: --add-config retention.ms=86400000,--delete-config cleanup.policy )

    • --topicトピックの名前

トピックメタデータ

トピックまたはクラスターに関するメタデータを取得します。

  • オプション:

    • --topic (指定されている場合) このトピックのメタデータのみを取得します

    • --bootstrap-serverブローカーアドレス

    • --include-offline (オプション) オフラインのブローカーまたはパーティションを含める

-
security - not tested
A
license - permissive license
-
quality - not tested

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pavanjava/kafka_mcp_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server