项目简介

Kafka MCP Server 是一个基于 Model Context Protocol (MCP) 构建的服务器,它允许LLM(大型语言模型)应用与 Apache Kafka 消息队列进行交互。该服务器提供了一组工具,使LLM能够发布消息到 Kafka topic,以及从 Kafka topic 消费消息。通过标准化的 MCP 协议,LLM应用可以方便地集成 Kafka 的消息处理能力。

主要功能点

  • 发布消息到 Kafka Topic: 允许LLM应用将信息作为消息发布到指定的 Kafka topic,用于下游应用或服务消费。
  • 消费 Kafka Topic 消息: 允许LLM应用从指定的 Kafka topic 消费消息,获取外部数据或事件信息。
  • 可配置的 Kafka 连接: 通过环境变量灵活配置 Kafka 服务器地址、Topic 名称、消费组 ID 等连接参数。
  • 支持多种传输协议: 支持 Stdio (标准输入输出) 和 SSE (Server-Sent Events) 两种 MCP 服务器传输协议。

安装步骤

  1. 克隆仓库

    git clone https://github.com/pavanjava/kafka_mcp_server
    cd kafka_mcp_server
  2. 创建并激活虚拟环境

    python -m venv venv
    source venv/bin/activate  # Linux/macOS
    # venv\Scripts\activate  # Windows
  3. 安装依赖

    pip install -r requirements.txt

    pip install aiokafka python-dotenv pydantic-settings mcp-server

服务器配置

为了将 Kafka MCP Server 集成到 MCP 客户端(例如 Claude Desktop),您需要配置客户端以启动并连接到此服务器。以下是 Claude Desktop 配置文件中 'mcpServers' 部分的配置示例,请根据您的实际情况修改:

{
    "mcpServers": {
        "kafka": {
            "command": "python",
            "args": [
                "<仓库绝对路径>/main.py",
                "--transport", "stdio"  // 可选 "sse",默认为 "stdio"
            ]
        }
    }
}

配置参数说明

  • '"kafka"': 为该 MCP 服务器自定义的名称,在客户端中用于标识和调用。
  • '"command": "python"': 启动服务器的命令,这里使用 Python 解释器。
  • '"args"': 传递给启动命令的参数列表。
    • '"<仓库绝对路径>/main.py"': 指向 'main.py' 脚本的绝对路径,请替换为 Kafka MCP Server 仓库在您本地文件系统中的实际路径。
    • '"--transport", "stdio"': 指定 MCP 服务器使用的传输协议。'stdio' 表示使用标准输入输出进行通信,'sse' 表示使用 Server-Sent Events。根据您的需求选择,默认为 'stdio'。

环境变量配置

在项目根目录下创建 '.env' 文件,并根据需要配置以下 Kafka 连接参数:

KAFKA_BOOTSTRAP_SERVERS=localhost:9092  # Kafka 服务器地址,例如:localhost:9092 或 broker1:9092,broker2:9092
TOPIC_NAME=your-topic-name             # Kafka Topic 名称,指定 LLM 应用交互的 Topic
IS_TOPIC_READ_FROM_BEGINNING=False      # 是否从 Topic 的起始位置开始消费消息,默认为 False (从最新位置开始)
DEFAULT_GROUP_ID_FOR_CONSUMER=kafka-mcp-group # 消费者组 ID,用于消费者消息,默认为 kafka-mcp-group

基本使用方法

  1. 启动服务器: 配置完成后,在 Kafka MCP Server 仓库目录下,运行以下命令启动服务器:

    python main.py --transport stdio

    或使用 SSE 协议:

    python main.py --transport sse
  2. 在 MCP 客户端中使用: 在您的 MCP 客户端应用中(例如 Claude Desktop),配置并连接到 Kafka MCP Server。客户端可以通过调用 'kafka-publish' 工具向 Kafka Topic 发布消息,或调用 'kafka-consume' 工具从 Kafka Topic 消费消息。具体工具的使用方法请参考客户端应用的相关文档。

注意: 首次从 Topic 消费消息后,如果使用相同的 'group_id',则无法再次消费已读取的消息。Kafka 的消费行为由 'group_id' 和 offset 管理。

信息

分类

数据库与文件