Kafka MCP 服务器
与 Apache Kafka 集成的消息上下文协议 (MCP) 服务器为 LLM 和 Agentic 应用程序提供发布和使用功能。
概述
该项目实现了一个服务器,允许 AI 模型通过标准化接口与 Kafka 主题进行交互。它支持:
将消息发布到 Kafka 主题
从 Kafka 主题消费消息
先决条件
Python 3.8+
Apache Kafka 实例
Python 依赖项(参见安装部分)
安装
克隆存储库:
git clone <repository-url> cd <repository-directory>创建虚拟环境并激活它:
python -m venv venv source venv/bin/activate # On Windows, use: venv\Scripts\activate安装所需的依赖项:
pip install -r requirements.txt如果不存在 requirements.txt,请安装以下包:
pip install aiokafka python-dotenv pydantic-settings mcp-server
配置
在项目根目录中创建一个包含以下变量的.env
文件:
用法
运行服务器
您可以使用提供的main.py
脚本运行服务器:
可用的交通方式:
stdio
:标准输入/输出(默认)sse
:服务器发送事件
与 Claude Desktop 集成
要将此 Kafka MCP 服务器与 Claude Desktop 一起使用,请将以下配置添加到您的 Claude Desktop 配置文件中:
将<PATH TO PROJECTS>
替换为项目目录的绝对路径。
项目结构
main.py
:应用程序的入口点kafka.py
:Kafka连接器实现server.py
:MCP 服务器实现,包含与 Kafka 交互的工具settings.py
:使用 Pydantic 进行配置管理
可用工具
kafka 发布
将信息发布到配置的 Kafka 主题。
kafka-consume
从配置的 Kafka 主题中使用信息。
注意:一旦从主题读取了一条消息,就不能使用相同的 groupid 再次读取它
创建主题
使用指定参数创建一个新的 Kafka 主题。
选项:
--topic
要创建的主题的名称--partitions
要分配的分区数--replication-factor
跨代理的复制因子--config
(可选)主题级配置覆盖(例如,retention.ms=604800000
)
删除主题
删除现有的 Kafka 主题。
选项:
--topic
要删除的主题的名称--timeout
(可选)等待删除完成的时间
列表主题
列出集群中的所有主题(或按模式过滤)。
选项:
--bootstrap-server
代理地址--pattern
(可选)用于过滤主题名称的正则表达式--exclude-internal
(可选) 排除内部主题 (默认值:true)
主题配置
显示或更改一个或多个主题的配置。
选项:
--describe
显示主题的当前配置--alter
修改配置(例如,--add-config retention.ms=86400000,--delete-config cleanup.policy
)--topic
主题名称
主题元数据
检索有关主题或集群的元数据。
选项:
--topic
(如果提供)仅获取此主题的元数据--bootstrap-server
代理地址--include-offline
(可选)包括离线的代理或分区
This server cannot be installed
remote-capable server
The server can be hosted and run remotely because it primarily relies on remote services or has no dependency on the local environment.
使 AI 模型能够通过标准化接口发布和使用来自 Apache Kafka 主题的消息,从而可以轻松地将 Kafka 消息传递与 LLM 和代理应用程序集成。
Related Resources
Related MCP Servers
- -securityFlicense-qualityProvides AI agents with comprehensive Twitter functionality through the Model Context Protocol standard, enabling reading tweets, posting content, managing interactions, and accessing timeline data with robust error handling.Last updated -117
- AsecurityAlicenseAqualityA server implementing Model Context Protocol that enables AI assistants to interact with Slack API through a standardized interface, providing tools for messaging, channel management, user information retrieval, and more.Last updated -9103Apache 2.0
- -securityFlicense-qualityAn implementation of the Model Context Protocol that connects AI agents to Kakao Official Accounts, allowing users to send various message templates through the Kakao Developers API.Last updated -10
- -securityAlicense-qualityEnables AI assistants to interact with Slack workspaces through the Model Context Protocol, providing tools for reading/sending messages, managing channels, and accessing Slack API functionality.