使用说明

项目简介

Beam MCP Server 是一个基于 Model Context Protocol (MCP) 标准构建的应用后端,它专注于提供统一的API接口,用于管理跨多种 Apache Beam Runner (Flink, Spark, Dataflow, Direct) 的数据管道。该服务器旨在简化数据工程师和AI/LLM开发人员的数据管道操作,并为LLM应用提供安全、可扩展的上下文服务框架。

主要功能点

  • 多 Runner 支持: 使用统一的API管理 Flink, Spark, Dataflow 和 Direct Runner 上的数据管道。
  • MCP 协议兼容: 遵循 Model Context Protocol 标准,易于与 AI/LLM 应用集成。
  • 数据管道管理: 提供创建、监控和控制数据管道的功能。
  • 易于扩展: 方便添加新的 Runner 或自定义功能。
  • 生产就绪: 包含 Docker/Kubernetes 部署配置、监控和扩展能力。

安装步骤

  1. 克隆仓库
    git clone https://github.com/souravch/beam-mcp-server.git
    cd beam-mcp-server
  2. 创建虚拟环境 (推荐)
    python -m venv beam-mcp-venv
    source beam-mcp-venv/bin/activate  # 或 beam-mcp-venv\Scripts\activate (Windows)
  3. 安装依赖
    pip install -r requirements.txt

服务器配置

MCP客户端需要配置MCP服务器的启动命令及其参数,以便建立连接。以下是基于仓库信息生成的配置示例 (JSON 格式):

{
  "server name": "beam-mcp-server",
  "command": "python",
  "args": [
    "main.py",
    "--port", "8888",
    "--debug"
  ],
  "description": "启动 Beam MCP Server (Debug模式,默认端口 8888)",
  "notes": "可以根据需要调整端口 (port) 和配置文件路径 (--config)。对于生产环境,请移除 --debug 参数。"
}

基本使用方法

  1. 启动服务器

    使用 Direct Runner 启动 (无外部依赖):

    python main.py --debug --port 8888

    使用 Flink Runner 启动 (需要预先安装 Flink):

    CONFIG_PATH=config/flink_config.yaml python main.py --debug --port 8888
  2. 运行示例任务 (WordCount)

    创建测试输入文件:

    echo "This is a test file for Apache Beam WordCount example" > /tmp/input.txt

    使用 curl 提交任务:

    curl -X POST http://localhost:8888/api/v1/jobs \
      -H "Content-Type: application/json" \
      -d '{
        "job_name": "test-wordcount",
        "runner_type": "direct",
        "job_type": "BATCH",
        "code_path": "examples/pipelines/wordcount.py",
        "pipeline_options": {
          "input_file": "/tmp/input.txt",
          "output_path": "/tmp/output"
        }
      }'
  3. 访问 MCP 标准端点

    • '/tools' 端点: 管理 AI Agent 和模型,例如注册情感分析工具:

      curl -X POST "http://localhost:8888/api/v1/tools/" \
        -H "Content-Type: application/json" \
        -d '{
          "name": "sentiment-analyzer",
          "description": "Analyzes sentiment in text data",
          "type": "transformation",
          "parameters": {
            "text_column": {
              "type": "string",
              "description": "Column containing text to analyze"
            }
          }
        }'
    • '/resources' 端点: 管理数据集等资源,例如注册数据集:

      curl -X POST "http://localhost:8888/api/v1/resources/" \
        -H "Content-Type: application/json" \
        -d '{
          "name": "Customer Transactions",
          "description": "Daily customer transaction data",
          "resource_type": "dataset",
          "location": "gs://analytics-data/transactions/*.csv"
        }'
    • '/contexts' 端点: 定义执行环境,例如创建 Dataflow 执行环境:

      curl -X POST "http://localhost:8888/api/v1/contexts/" \
        -H "Content-Type: application/json" \
        -d '{
          "name": "Dataflow Prod",
          "description": "Production Dataflow environment",
          "context_type": "dataflow",
          "parameters": {
            "region": "us-central1",
            "project": "beam-analytics-prod"
          }
        }'

信息

分类

AI与计算