项目简介

Kafka MCP 服务器是一个实现了 Model Context Protocol (MCP) 的后端应用。它充当 LLM 客户端(如 AI 助手、支持工具的聊天应用)与 Apache Kafka 集群之间的桥梁,使得 LLM 能够以结构化和安全的方式执行常见的 Kafka 操作和获取集群信息。

主要功能点

  • 连接 Kafka: 支持连接到 Kafka 集群,并支持多种安全认证机制(如 SASL, TLS)。
  • 消息操作: 允许 LLM 发送消息到指定 Topic (Produce) 和从指定 Topic 消费消息 (Consume)。
  • Topic 管理: 提供查看 Topic 列表、获取 Topic 详细信息(分区、副本、ISR、配置)的能力。
  • 消费者组管理: 能够列出所有消费者组,查看特定消费者组的状态、成员及分区的消费偏移量和 Lag。
  • 集群监控: 提供集群概览、健康检查、以及详细的 Under-Replicated 分区和消费者 Lag 报告。
  • 配置查询: 支持查询 Topic 或 Broker 的配置信息。
  • 标准化接口: 通过 MCP 协议提供统一的 JSON-RPC 接口,方便各种 LLM 客户端集成。

安装步骤

该服务器需要 Go 1.21 或更高版本。

使用 Homebrew (macOS 或 Linux):

打开终端并运行以下命令:

  1. 添加仓库 Tap:
    brew tap tuannvm/mcp
  2. 安装服务器:
    brew install kafka-mcp-server
    更新到最新版本:
    brew update && brew upgrade kafka-mcp-server

从源代码构建:

  1. 克隆仓库:
    git clone https://github.com/tuannvm/kafka-mcp-server.git
    cd kafka-mcp-server
  2. 构建可执行文件:
    go build -o kafka-mcp-server ./cmd/server
    构建完成后,会在当前目录生成一个名为 'kafka-mcp-server' 的可执行文件。

服务器配置 (为 MCP 客户端提供)

MCP 客户端需要知道如何启动或连接到 Kafka MCP 服务器。常见的配置方式是通过在客户端的配置文件(通常是 JSON 格式)中指定服务器的启动命令和必要的环境变量。

以下是配置的关键信息,您需要将其添加到您的 MCP 客户端的配置中。具体的配置格式取决于您的客户端应用,但通常会包含以下要素:

  • 服务器名称 (server name): 您为这个 MCP 服务器起的一个唯一标识符,例如 '"kafka"'。
  • 命令 (command): 客户端用来启动 MCP 服务器进程的命令。如果您使用 Homebrew 安装,通常是 '"kafka-mcp-server"'。如果您从源代码构建,这里需要填写您构建出的可执行文件的完整路径,例如 '"/path/to/your/kafka-mcp-server"'。
  • 参数 (args): 启动命令的额外参数。对于 Kafka MCP 服务器,通常不需要额外的命令行参数,所以可以留空。
  • 环境变量 (env): 这是配置 Kafka 连接细节的关键。您需要在这里设置连接 Kafka 集群所需的环境变量。重要的环境变量包括:
    • 'KAFKA_BROKERS': Kafka 集群的 Broker 地址列表,用逗号分隔 (例如 '"localhost:9092"')。
    • 'KAFKA_CLIENT_ID': 用于连接 Kafka 的客户端 ID (例如 '"my-llm-app"')。
    • 'MCP_TRANSPORT': MCP 协议的传输方式,通常设置为 '"stdio"'(表示通过标准输入/输出通信)。
    • 'KAFKA_SASL_MECHANISM': SASL 认证机制 (例如 '"plain"', '"scram-sha-512"' 或 '""' 表示禁用)。
    • 'KAFKA_SASL_USER': SASL 用户名。
    • 'KAFKA_SASL_PASSWORD': SASL 密码。
    • 'KAFKA_TLS_ENABLE': 是否启用 TLS (设置为 '"true"' 或 '"false"')。
    • 'KAFKA_TLS_INSECURE_SKIP_VERIFY': 是否跳过 TLS 证书验证 (设置为 '"true"' 或 '"false"')。注意: 跳过证书验证不安全,只用于开发/测试环境。

示例配置 (请参考您客户端的具体格式进行调整,这不是直接的代码):

设定 MCP 服务器名称为 'kafka',启动命令为 'kafka-mcp-server',并配置连接本地 Kafka 集群的环境变量。

要连接远程或安全的 Kafka 集群,请相应地修改 'KAFKA_BROKERS', 'KAFKA_SASL_*', 'KAFKA_TLS_ENABLE' 等环境变量。

注意: 如果您的 MCP 客户端支持通过 URL 连接(例如 HTTP+SSE),您可以配置 'url' 参数,并单独在后台启动 Kafka MCP 服务器,设置 'MCP_TRANSPORT=http' 并指定 HTTP 监听地址。但目前主要的客户端集成示例使用的是 'stdio' 传输。

基本使用方法

  1. 安装并启动 Kafka 集群: 确保您有一个正在运行的 Apache Kafka 集群可供连接。
  2. 安装 Kafka MCP 服务器: 按照上述安装步骤安装服务器。
  3. 配置您的 MCP 客户端: 打开您支持 MCP 的 LLM 客户端应用(如 Cursor, Claude Desktop, Windsurf, ChatWise 等),找到其 MCP 或工具配置选项。按照上文“服务器配置”部分说明,添加 Kafka MCP 服务器的配置信息,指定服务器名称、启动命令和必要的 Kafka 连接环境变量。
  4. 重启客户端: 根据客户端的要求,可能需要重启应用才能加载新的 MCP 服务器配置。
  5. 与 LLM 交互: 配置成功后,您的 LLM 客户端应该能发现 Kafka MCP 服务器提供的工具、资源和 Prompt。您现在可以通过向 LLM 发送自然语言请求来与 Kafka 进行交互,例如:
    • "列出集群中的所有 Topics。"
    • "查看一下 'orders' Topic 的详细信息。"
    • "发送一条消息 'hello' 到 'test-topic',Key 是 'greeting'。"
    • "获取消费者组 'order-processor' 的消费 Lag 报告。"
    • "检查一下 Kafka 集群的整体健康状况。"

LLM 客户端会解析您的请求,确定需要调用 Kafka MCP 服务器的哪个工具或资源,构造相应的 MCP 请求发送给服务器。服务器执行相应的 Kafka 操作后,将结果返回给客户端,客户端再将结果呈现给您或用于辅助生成回复。

信息

分类

AI与计算