项目简介
MCP数据工程代理服务是一个基于Model Context Protocol (MCP) 构建的后端应用,旨在为大型语言模型(LLM)代理提供与数据基础设施(如数据库、文件存储、消息队列和工作流管理系统)交互的能力。它通过标准化的JSON-RPC协议,允许LLM客户端调用预定义的工具,执行数据连接测试、DDL获取、数据采样、文件操作、Kafka消息获取以及Airflow任务管理等功能,从而实现数据集成和ETL流程的自动化。
主要功能点
- 数据库交互: 支持连接测试、表存在性检查、获取表DDL(数据定义语言)和数据采样。兼容所有'SQLAlchemy'支持的数据库。
- 文件存储交互: 支持S3兼容存储和SMB协议的文件存储,可进行连接测试、目录/文件存在性检查、文件列表获取和文件内容采样。支持CSV、TSV、JSON、XML、Parquet等数据格式。
- Kafka消息队列: 提供Kafka连接测试和获取消息负载(payload)的功能,用于实时数据流的初步探索。
- Web资源访问: 支持URL可达性测试和获取网页内容样本,便于LLM代理探索外部网络资源。
- GitLab集成: 提供获取Airflow DAG模板的功能,并支持通过GitLab创建ETL任务(issues),实现数据管道的自动化部署。
- Airflow管理: 能够获取Airflow连接、DAG、任务和变量列表,辅助LLM代理管理数据工作流。
- 数据工程表单: 定义了从数据库、文件存储和互联网源到数据湖的ETL流程配置表单,指导LLM代理收集必要信息。
- 内部资源查询: 提供了一个工具,用于查询预配置的内部开发和数据基础设施服务的详细信息(如URL、凭证)。
安装步骤
本项目已容器化,并提供了Dockerfile用于构建Docker镜像。
-
克隆仓库:
git clone https://github.com/AnatoliyAksenov/chat-app-mcp.git cd chat-app-mcp -
配置环境: 在构建Docker镜像前,需要根据您的环境设置以下环境变量(或在Dockerfile中作为'--build-arg'传入):
- 'HTTP_PROXY', 'HTTPS_PROXY': 如果您的环境需要通过代理访问互联网。
- 'GITLAB_URL', 'GITLAB_TOKEN', 'PROJECT_PATH': 用于GitLab API访问。
- 'AIRFLOW_URL', 'AIRFLOW_USER', 'AIRFLOW_PASSWORD': 用于Apache Airflow API访问。
- 'KROKI_URL': 用于Ditaa图表渲染服务(如'http://localhost:8008')。
- 'STORAGE_URL', 'INTERNAL_STORAGE_URL': 用于图片存储服务(如MinIO,'http://localhost:9001')。
-
构建Docker镜像:
docker build -t chat-app-mcp:0.0.1 .您也可以在构建时传递'build-arg',例如:
docker build -t chat-app-mcp:0.0.1 \ --build-arg HTTPS_PROXY=http://10.0.0.7:3128 \ --build-arg KROKI_URL=http://localhost:8008 \ --build-arg STORAGE_URL=http://localhost:9001 \ --build-arg INTERNAL_STORAGE_URL=http://localhost:9001 \ --build-arg GITLAB_URL=https://gitlab.it-brew-lct2025.ru \ --build-arg GITLAB_TOKEN=<your_gitlab_token> \ --build-arg PROJECT_PATH=data-engineering/airflow \ --build-arg AIRFLOW_URL=https://airflow.it-brew-lct2025.ru \ --build-arg AIRFLOW_USER=mentor \ --build-arg AIRFLOW_PASSWORD=Lct2025 . -
运行Docker容器:
docker run -p 8001:8001 chat-app-mcp:0.0.1服务器将在'http://127.0.0.1:8001'上运行。
服务器配置(供MCP客户端使用)
MCP服务器启动后,将通过'http://127.0.0.1:8001/mcp'提供JSON-RPC服务,并通过'http://127.0.0.1:8001/sse'提供SSE服务。MCP客户端可以按照以下JSON格式进行配置以连接此服务器:
{ "server_name": "MCP数据工程服务", "command": ["python", "app.py"], "args": [], "host": "127.0.0.1", "port": 8001, "path": "/mcp", "transport": "streamable-http", "description": "连接到MCP数据工程代理服务器,提供与数据库、文件存储、Kafka和Airflow交互的工具。", "endpoints": { "json_rpc": { "url": "http://127.0.0.1:8001/mcp", "protocol": "json-rpc", "description": "标准的JSON-RPC接口,用于调用服务器工具和获取能力声明。" }, "sse_events": { "url": "http://127.0.0.1:8001/sse", "protocol": "sse", "description": "Server-Sent Events接口,用于接收服务器实时通知。" } } }
基本使用方法
一旦MCP服务器运行并客户端连接成功,LLM代理即可通过JSON-RPC调用服务器上注册的工具。例如,LLM代理可以通过发送一个MCP请求来调用'test_db_connection'工具来检查数据库连接:
{ "jsonrpc": "2.0", "method": "test_db_connection", "params": { "connection_url": "postgresql://user:password@host:port/database" }, "id": "123" }
或者,调用'get_form_properties_etl_database_to_datalake'工具获取ETL表单结构,然后根据用户输入填充数据后,调用'create_task'工具在GitLab中创建ETL任务:
{ "jsonrpc": "2.0", "method": "get_form_properties_etl_database_to_datalake", "params": {}, "id": "124" }
收到表单结构后,LLM代理将收集用户输入,并构建'create_task'的请求:
{ "jsonrpc": "2.0", "method": "create_task", "params": { "task_title": "新建订单数据到数据湖ETL", "task_description": "将PostgreSQL中的订单数据同步到数据湖。", "form_data": [ {"field": "DAG ID", "value": "orders_to_datalake_daily"}, {"field": "source_database", "value": "postgresql://devuser:[email protected]:5432/orders"}, {"field": "database_available", "value": "Success"}, {"field": "source_table", "value": "public.orders"}, {"field": "interval", "value": "0 0 * * *"}, {"field": "target_table", "value": "raw_orders.orders"}, {"field": "target_storing_type", "value": "partition"}, {"field": "capture_changes_column", "value": "ins_date"} ] }, "id": "125" }
信息
分类
数据库与文件