项目简介

MCP数据工程代理服务是一个基于Model Context Protocol (MCP) 构建的后端应用,旨在为大型语言模型(LLM)代理提供与数据基础设施(如数据库、文件存储、消息队列和工作流管理系统)交互的能力。它通过标准化的JSON-RPC协议,允许LLM客户端调用预定义的工具,执行数据连接测试、DDL获取、数据采样、文件操作、Kafka消息获取以及Airflow任务管理等功能,从而实现数据集成和ETL流程的自动化。

主要功能点

  • 数据库交互: 支持连接测试、表存在性检查、获取表DDL(数据定义语言)和数据采样。兼容所有'SQLAlchemy'支持的数据库。
  • 文件存储交互: 支持S3兼容存储和SMB协议的文件存储,可进行连接测试、目录/文件存在性检查、文件列表获取和文件内容采样。支持CSV、TSV、JSON、XML、Parquet等数据格式。
  • Kafka消息队列: 提供Kafka连接测试和获取消息负载(payload)的功能,用于实时数据流的初步探索。
  • Web资源访问: 支持URL可达性测试和获取网页内容样本,便于LLM代理探索外部网络资源。
  • GitLab集成: 提供获取Airflow DAG模板的功能,并支持通过GitLab创建ETL任务(issues),实现数据管道的自动化部署。
  • Airflow管理: 能够获取Airflow连接、DAG、任务和变量列表,辅助LLM代理管理数据工作流。
  • 数据工程表单: 定义了从数据库、文件存储和互联网源到数据湖的ETL流程配置表单,指导LLM代理收集必要信息。
  • 内部资源查询: 提供了一个工具,用于查询预配置的内部开发和数据基础设施服务的详细信息(如URL、凭证)。

安装步骤

本项目已容器化,并提供了Dockerfile用于构建Docker镜像。

  1. 克隆仓库:

    git clone https://github.com/AnatoliyAksenov/chat-app-mcp.git
    cd chat-app-mcp
  2. 配置环境: 在构建Docker镜像前,需要根据您的环境设置以下环境变量(或在Dockerfile中作为'--build-arg'传入):

    • 'HTTP_PROXY', 'HTTPS_PROXY': 如果您的环境需要通过代理访问互联网。
    • 'GITLAB_URL', 'GITLAB_TOKEN', 'PROJECT_PATH': 用于GitLab API访问。
    • 'AIRFLOW_URL', 'AIRFLOW_USER', 'AIRFLOW_PASSWORD': 用于Apache Airflow API访问。
    • 'KROKI_URL': 用于Ditaa图表渲染服务(如'http://localhost:8008')。
    • 'STORAGE_URL', 'INTERNAL_STORAGE_URL': 用于图片存储服务(如MinIO,'http://localhost:9001')。
  3. 构建Docker镜像:

    docker build -t chat-app-mcp:0.0.1 .

    您也可以在构建时传递'build-arg',例如:

    docker build -t chat-app-mcp:0.0.1 \
    --build-arg HTTPS_PROXY=http://10.0.0.7:3128 \
    --build-arg KROKI_URL=http://localhost:8008 \
    --build-arg STORAGE_URL=http://localhost:9001 \
    --build-arg INTERNAL_STORAGE_URL=http://localhost:9001 \
    --build-arg GITLAB_URL=https://gitlab.it-brew-lct2025.ru \
    --build-arg GITLAB_TOKEN=<your_gitlab_token> \
    --build-arg PROJECT_PATH=data-engineering/airflow \
    --build-arg AIRFLOW_URL=https://airflow.it-brew-lct2025.ru \
    --build-arg AIRFLOW_USER=mentor \
    --build-arg AIRFLOW_PASSWORD=Lct2025 .
  4. 运行Docker容器:

    docker run -p 8001:8001 chat-app-mcp:0.0.1

    服务器将在'http://127.0.0.1:8001'上运行。

服务器配置(供MCP客户端使用)

MCP服务器启动后,将通过'http://127.0.0.1:8001/mcp'提供JSON-RPC服务,并通过'http://127.0.0.1:8001/sse'提供SSE服务。MCP客户端可以按照以下JSON格式进行配置以连接此服务器:

{
  "server_name": "MCP数据工程服务",
  "command": ["python", "app.py"],
  "args": [],
  "host": "127.0.0.1",
  "port": 8001,
  "path": "/mcp",
  "transport": "streamable-http",
  "description": "连接到MCP数据工程代理服务器,提供与数据库、文件存储、Kafka和Airflow交互的工具。",
  "endpoints": {
    "json_rpc": {
      "url": "http://127.0.0.1:8001/mcp",
      "protocol": "json-rpc",
      "description": "标准的JSON-RPC接口,用于调用服务器工具和获取能力声明。"
    },
    "sse_events": {
      "url": "http://127.0.0.1:8001/sse",
      "protocol": "sse",
      "description": "Server-Sent Events接口,用于接收服务器实时通知。"
    }
  }
}

基本使用方法

一旦MCP服务器运行并客户端连接成功,LLM代理即可通过JSON-RPC调用服务器上注册的工具。例如,LLM代理可以通过发送一个MCP请求来调用'test_db_connection'工具来检查数据库连接:

{
  "jsonrpc": "2.0",
  "method": "test_db_connection",
  "params": {
    "connection_url": "postgresql://user:password@host:port/database"
  },
  "id": "123"
}

或者,调用'get_form_properties_etl_database_to_datalake'工具获取ETL表单结构,然后根据用户输入填充数据后,调用'create_task'工具在GitLab中创建ETL任务:

{
  "jsonrpc": "2.0",
  "method": "get_form_properties_etl_database_to_datalake",
  "params": {},
  "id": "124"
}

收到表单结构后,LLM代理将收集用户输入,并构建'create_task'的请求:

{
  "jsonrpc": "2.0",
  "method": "create_task",
  "params": {
    "task_title": "新建订单数据到数据湖ETL",
    "task_description": "将PostgreSQL中的订单数据同步到数据湖。",
    "form_data": [
      {"field": "DAG ID", "value": "orders_to_datalake_daily"},
      {"field": "source_database", "value": "postgresql://devuser:[email protected]:5432/orders"},
      {"field": "database_available", "value": "Success"},
      {"field": "source_table", "value": "public.orders"},
      {"field": "interval", "value": "0 0 * * *"},
      {"field": "target_table", "value": "raw_orders.orders"},
      {"field": "target_storing_type", "value": "partition"},
      {"field": "capture_changes_column", "value": "ins_date"}
    ]
  },
  "id": "125"
}

信息

分类

数据库与文件