项目简介
Kafka MCP Server 是一个基于 Model Context Protocol (MCP) 构建的服务器,它允许LLM(大型语言模型)应用与 Apache Kafka 消息队列进行交互。该服务器提供了一组工具,使LLM能够发布消息到 Kafka topic,以及从 Kafka topic 消费消息。通过标准化的 MCP 协议,LLM应用可以方便地集成 Kafka 的消息处理能力。
主要功能点
- 发布消息到 Kafka Topic: 允许LLM应用将信息作为消息发布到指定的 Kafka topic,用于下游应用或服务消费。
- 消费 Kafka Topic 消息: 允许LLM应用从指定的 Kafka topic 消费消息,获取外部数据或事件信息。
- 可配置的 Kafka 连接: 通过环境变量灵活配置 Kafka 服务器地址、Topic 名称、消费组 ID 等连接参数。
- 支持多种传输协议: 支持 Stdio (标准输入输出) 和 SSE (Server-Sent Events) 两种 MCP 服务器传输协议。
安装步骤
-
克隆仓库
git clone https://github.com/pavanjava/kafka_mcp_server cd kafka_mcp_server -
创建并激活虚拟环境
python -m venv venv source venv/bin/activate # Linux/macOS # venv\Scripts\activate # Windows -
安装依赖
pip install -r requirements.txt或
pip install aiokafka python-dotenv pydantic-settings mcp-server
服务器配置
为了将 Kafka MCP Server 集成到 MCP 客户端(例如 Claude Desktop),您需要配置客户端以启动并连接到此服务器。以下是 Claude Desktop 配置文件中 'mcpServers' 部分的配置示例,请根据您的实际情况修改:
{ "mcpServers": { "kafka": { "command": "python", "args": [ "<仓库绝对路径>/main.py", "--transport", "stdio" // 可选 "sse",默认为 "stdio" ] } } }
配置参数说明:
- '"kafka"': 为该 MCP 服务器自定义的名称,在客户端中用于标识和调用。
- '"command": "python"': 启动服务器的命令,这里使用 Python 解释器。
- '"args"': 传递给启动命令的参数列表。
- '"<仓库绝对路径>/main.py"': 指向 'main.py' 脚本的绝对路径,请替换为 Kafka MCP Server 仓库在您本地文件系统中的实际路径。
- '"--transport", "stdio"': 指定 MCP 服务器使用的传输协议。'stdio' 表示使用标准输入输出进行通信,'sse' 表示使用 Server-Sent Events。根据您的需求选择,默认为 'stdio'。
环境变量配置:
在项目根目录下创建 '.env' 文件,并根据需要配置以下 Kafka 连接参数:
KAFKA_BOOTSTRAP_SERVERS=localhost:9092 # Kafka 服务器地址,例如:localhost:9092 或 broker1:9092,broker2:9092 TOPIC_NAME=your-topic-name # Kafka Topic 名称,指定 LLM 应用交互的 Topic IS_TOPIC_READ_FROM_BEGINNING=False # 是否从 Topic 的起始位置开始消费消息,默认为 False (从最新位置开始) DEFAULT_GROUP_ID_FOR_CONSUMER=kafka-mcp-group # 消费者组 ID,用于消费者消息,默认为 kafka-mcp-group
基本使用方法
-
启动服务器: 配置完成后,在 Kafka MCP Server 仓库目录下,运行以下命令启动服务器:
python main.py --transport stdio或使用 SSE 协议:
python main.py --transport sse -
在 MCP 客户端中使用: 在您的 MCP 客户端应用中(例如 Claude Desktop),配置并连接到 Kafka MCP Server。客户端可以通过调用 'kafka-publish' 工具向 Kafka Topic 发布消息,或调用 'kafka-consume' 工具从 Kafka Topic 消费消息。具体工具的使用方法请参考客户端应用的相关文档。
注意: 首次从 Topic 消费消息后,如果使用相同的 'group_id',则无法再次消费已读取的消息。Kafka 的消费行为由 'group_id' 和 offset 管理。
信息
分类
数据库与文件