项目简介

Spark History Server MCP是一个基于Model Context Protocol (MCP) 构建的后端服务,旨在将AI代理(如LLM)与Apache Spark History Server的数据连接起来。它通过提供一套标准化的工具,让AI能够理解、查询和分析Spark的历史作业数据,从而实现智能化的性能洞察、瓶颈识别和故障分析。

主要功能点

  • 应用信息查询: 获取Spark应用程序的基本元数据和概览。
  • 作业与阶段分析: 深入分析Spark作业和阶段的性能,包括识别最慢的作业和阶段。
  • 执行器与资源分析: 监控资源利用率、执行器性能和资源分配模式。
  • 配置与环境: 查询Spark运行时配置、环境变量和设置。
  • SQL与查询分析: 分析SQL查询性能和执行计划。
  • 性能与瓶颈识别: 智能识别性能瓶颈并提供优化建议。
  • 对比分析: 对比不同Spark作业的环境和性能,检测回归问题。

安装步骤

  1. 克隆仓库
    git clone https://github.com/kubeflow/mcp-apache-spark-history-server.git
    cd mcp-apache-spark-history-server
  2. 安装依赖:建议使用 'uv' 包管理器:
    • 安装 'uv' (如果未安装):'brew install go-task' (macOS, 其他系统请参考uv官方文档)
    • 安装项目依赖:
      uv pip install -r requirements.txt
  3. 启动测试环境 (可选)
    • 启动模拟Spark History Server (带示例数据):
      task start-spark-bg
    • 启动MCP服务器:
      task start-mcp-bg
  4. 直接运行MCP服务器:
    • 在虚拟环境中:
      python3 -m venv spark-mcp
      source spark-mcp/bin/activate
      pip install mcp-apache-spark-history-server
      python3 -m spark_history_mcp.core.main
      # 完成后可执行 deactivate 退出虚拟环境
    • 或使用 'uvx' (无需预安装模块):
      uvx --from mcp-apache-spark-history-server spark-mcp

服务器配置

MCP客户端需要配置与MCP服务器建立连接。以下是一个MCP客户端可能使用的配置示例,其中包含连接到此Spark History Server MCP服务器所需的信息:

{
    "server_name": "spark_history_server_mcp",
    "command": "python3",
    "args": [
        "-m",
        "spark_history_mcp.core.main"
    ],
    "config": {
        "mcp": {
            "transports": ["streamable-http"],
            "port": 18888,
            "debug": true
        },
        "servers": {
            "local": {
                "url": "http://your-spark-history-server:18080",
                "default": true,
                "auth": {
                    "username": "user",
                    "password": "pass"
                }
            },
            "production": {
                "url": "http://prod-spark-history:18080",
                "auth": {
                    "username": "prod_user",
                    "password": "prod_pass"
                }
            }
        }
    },
    "environment": {
        "SHS_MCP_PORT": "18888",
        "SHS_MCP_DEBUG": "true",
        "SHS_MCP_TRANSPORT": "streamable-http",
        "SHS_SERVERS_LOCAL_URL": "http://localhost:18080",
        "SHS_SERVERS_LOCAL_AUTH_USERNAME": "local_user",
        "SHS_SERVERS_LOCAL_AUTH_PASSWORD": "local_password",
        "SHS_SERVERS_LOCAL_DEFAULT": "true",
        "SHS_SERVERS_PRODUCTION_URL": "http://prod-spark-history:18080",
        "SHS_SERVERS_PRODUCTION_AUTH_USERNAME": "prod_user",
        "SHS_SERVERS_PRODUCTION_AUTH_PASSWORD": "prod_password"
    }
}

参数注释:

  • 'server_name': 客户端为该MCP服务器实例定义的名称。
  • 'command': 启动MCP服务器的命令,通常是'python3'。
  • 'args': 传递给'command'的参数,用于启动 'spark_history_mcp' 模块。
  • 'config': MCP服务器的内部配置,包括:
    • 'mcp': MCP协议相关的配置,如'transports'(支持的传输协议,如'streamable-http'或'stdio')、'port'(MCP服务器监听端口)和'debug'模式。
    • 'servers': 配置连接的Apache Spark History Server实例列表。每个实例可以有自己的'url'、'auth'(用户名/密码或Token)、'default'标志(标记默认使用的Spark服务器)以及'emr_cluster_arn'(用于AWS EMR集成)。
  • 'environment': 通过环境变量覆盖或补充'config'中的配置,遵循'SHS_'前缀和嵌套结构(例如'SHS_MCP_PORT'对应'mcp.port')。

基本使用方法

一旦MCP服务器运行起来,任何兼容MCP协议的客户端(如AI代理、LLM集成框架)都可以连接到它。客户端将通过JSON-RPC协议发送请求,调用服务器暴露的工具来查询Spark数据。例如,LLM客户端可以提出自然语言查询,MCP服务器会将其转换为对Spark History Server REST API的调用,并将结果返回给LLM。

  • 示例LLM查询:
    • "显示2025年6月27日凌晨1点到2点之间的所有Spark应用程序。"
    • "我的ETL作业为什么比平时慢?"
    • "比较今天和昨天的批处理作业表现。"
    • "查找Spark应用程序'spark-cc4d115f011443d787f03a71a476a745'中最慢的SQL查询。"

信息

分类

AI与计算