使用说明
项目简介
Beam MCP Server 是一个基于 Model Context Protocol (MCP) 标准构建的应用后端,它专注于提供统一的API接口,用于管理跨多种 Apache Beam Runner (Flink, Spark, Dataflow, Direct) 的数据管道。该服务器旨在简化数据工程师和AI/LLM开发人员的数据管道操作,并为LLM应用提供安全、可扩展的上下文服务框架。
主要功能点
- 多 Runner 支持: 使用统一的API管理 Flink, Spark, Dataflow 和 Direct Runner 上的数据管道。
- MCP 协议兼容: 遵循 Model Context Protocol 标准,易于与 AI/LLM 应用集成。
- 数据管道管理: 提供创建、监控和控制数据管道的功能。
- 易于扩展: 方便添加新的 Runner 或自定义功能。
- 生产就绪: 包含 Docker/Kubernetes 部署配置、监控和扩展能力。
安装步骤
- 克隆仓库
git clone https://github.com/souravch/beam-mcp-server.git cd beam-mcp-server - 创建虚拟环境 (推荐)
python -m venv beam-mcp-venv source beam-mcp-venv/bin/activate # 或 beam-mcp-venv\Scripts\activate (Windows) - 安装依赖
pip install -r requirements.txt
服务器配置
MCP客户端需要配置MCP服务器的启动命令及其参数,以便建立连接。以下是基于仓库信息生成的配置示例 (JSON 格式):
{ "server name": "beam-mcp-server", "command": "python", "args": [ "main.py", "--port", "8888", "--debug" ], "description": "启动 Beam MCP Server (Debug模式,默认端口 8888)", "notes": "可以根据需要调整端口 (port) 和配置文件路径 (--config)。对于生产环境,请移除 --debug 参数。" }
基本使用方法
-
启动服务器
使用 Direct Runner 启动 (无外部依赖):
python main.py --debug --port 8888使用 Flink Runner 启动 (需要预先安装 Flink):
CONFIG_PATH=config/flink_config.yaml python main.py --debug --port 8888 -
运行示例任务 (WordCount)
创建测试输入文件:
echo "This is a test file for Apache Beam WordCount example" > /tmp/input.txt使用 curl 提交任务:
curl -X POST http://localhost:8888/api/v1/jobs \ -H "Content-Type: application/json" \ -d '{ "job_name": "test-wordcount", "runner_type": "direct", "job_type": "BATCH", "code_path": "examples/pipelines/wordcount.py", "pipeline_options": { "input_file": "/tmp/input.txt", "output_path": "/tmp/output" } }' -
访问 MCP 标准端点
-
'/tools' 端点: 管理 AI Agent 和模型,例如注册情感分析工具:
curl -X POST "http://localhost:8888/api/v1/tools/" \ -H "Content-Type: application/json" \ -d '{ "name": "sentiment-analyzer", "description": "Analyzes sentiment in text data", "type": "transformation", "parameters": { "text_column": { "type": "string", "description": "Column containing text to analyze" } } }' -
'/resources' 端点: 管理数据集等资源,例如注册数据集:
curl -X POST "http://localhost:8888/api/v1/resources/" \ -H "Content-Type: application/json" \ -d '{ "name": "Customer Transactions", "description": "Daily customer transaction data", "resource_type": "dataset", "location": "gs://analytics-data/transactions/*.csv" }' -
'/contexts' 端点: 定义执行环境,例如创建 Dataflow 执行环境:
curl -X POST "http://localhost:8888/api/v1/contexts/" \ -H "Content-Type: application/json" \ -d '{ "name": "Dataflow Prod", "description": "Production Dataflow environment", "context_type": "dataflow", "parameters": { "region": "us-central1", "project": "beam-analytics-prod" } }'
-
信息
分类
AI与计算