项目简介

MonsterMQ是一个功能丰富的MQTT消息代理(broker),支持MQTT 3.1.1协议,具备无限消息存储、QoS支持、持久会话和WebSocket连接。它通过Hazelcast实现集群化和高可用性,并支持多种数据库后端进行数据存储。其核心亮点之一是内置的MCP服务器,专为AI模型提供MQTT数据的上下文访问和集成。

主要功能点

  • 企业级MQTT代理: 完整支持MQTT 3.1.1协议,具备QoS 0, 1, 2,持久会话,WebSocket和TLS/SSL安全连接。
  • 水平扩展与高可用: 基于Hazelcast的集群功能,实现多节点负载均衡和自动故障转移。
  • 多数据库后端支持: 可配置PostgreSQL, SQLite, CrateDB, MongoDB, Memory, Hazelcast, Kafka等作为会话、保留消息、消息归档和消息存储的后端。
  • AI集成(MCP服务器): 内置Model Context Protocol (MCP) 服务器,允许AI模型查询实时和历史MQTT数据,进行主题分析和执行自定义查询。
  • SparkplugB扩展: 自动处理和扩展SparkplugB消息,支持工业物联网应用。

安装步骤

推荐使用Docker Compose进行快速部署,以下是配置示例:

  1. 创建 'docker-compose.yml' 文件 在您的项目目录下创建一个名为 'docker-compose.yml' 的文件,并粘贴以下内容:

    services:
      postgres:
        image: timescale/timescaledb:latest-pg16
        container_name: postgres
        restart: unless-stopped
        ports:
          - 5432:5432
        volumes:
          - ./db:/var/lib/postgresql/data
        environment:
          POSTGRES_USER: system
          POSTGRES_PASSWORD: manager
          POSTGRES_DB: monster
    
      monstermq:
        image: rocworks/monstermq:latest
        container_name: monstermq
        restart: unless-stopped
        ports:
          - 1883:1883    # MQTT TCP端口
          - 8883:8883    # MQTT TLS端口
          - 9000:9000    # WebSocket端口
          - 9001:9001    # WebSocket TLS端口
          - 3000:3000    # MCP服务器端口
        volumes:
          - ./config.yaml:/app/config.yaml
        command: ["-config", "config.yaml", "-log", "INFO"]
        depends_on:
          - postgres
  2. 创建 'config.yaml' 文件 在同一目录下创建一个名为 'config.yaml' 的文件,并粘贴以下内容:

    TCP: 1883
    WS: 9000
    TCPS: 8883
    WSS: 9001
    MaxMessageSizeKb: 512
    
    SessionStoreType: POSTGRES
    RetainedStoreType: POSTGRES
    
    QueuedMessagesEnabled: true
    
    ArchiveGroups:
      - Name: MCP
        Enabled: true
        TopicFilter: [ "#" ]
        RetainedOnly: false
        LastValType: POSTGRES
        ArchiveType: POSTGRES
    
    Postgres:
      Url: jdbc:postgresql://postgres:5432/monster
      User: system
      Pass: manager
    
    MCP:
      Enabled: true
      Port: 3000
  3. 启动服务 在包含 'docker-compose.yml' 和 'config.yaml' 文件的目录下,打开终端并运行以下命令:

    docker-compose up -d

    这将启动一个PostgreSQL数据库和一个MonsterMQ实例,其中包含了启用的MCP服务器。

服务器配置

MCP客户端需要通过网络连接到MonsterMQ的MCP服务器。以下是一个示例配置信息,用于指导MCP客户端如何连接到MonsterMQ的MCP服务器。请根据您的实际部署环境调整'port'值。

{
  "server_name": "MonsterMQ_MCP_Server",
  "command": "docker run -p 3000:3000 --network host -v /path/to/your/config.yaml:/app/config.yaml rocworks/monstermq:latest",
  "args": [
    "-config", "config.yaml",
    "-log", "INFO"
    // 注意:这里的command和args是示例,用于说明如何启动服务。
    // 实际的启动参数应根据您的Docker或本地部署方式进行调整。
    // 最重要的是确保config.yaml中MCP:Enabled为true且端口与MCP客户端配置的port一致。
  ],
  "port": 3000, // MCP服务器监听的端口,通常为3000
  "description": "MonsterMQ提供MQTT数据上下文的MCP服务器。"
}
  • 'server_name': 给MCP服务器实例起一个易于识别的名称,例如“MonsterMQ_MCP_Server”。
  • 'command': 启动MonsterMQ服务的命令行或Docker命令。此示例使用Docker命令,并假设您已准备好包含MCP配置的'config.yaml'文件。'--network host' 和 '-v' 参数根据您的Docker环境和配置文件的实际位置可能需要调整。
  • 'args': 传递给MonsterMQ启动命令的额外参数。例如,指定配置文件路径和日志级别。这些参数会追加到'command'后面。
  • 'port': MCP服务器监听的端口,通常默认为3000。
  • 'description': 对该MCP服务器的简要说明,例如“MonsterMQ提供MQTT数据上下文的MCP服务器。”

基本使用方法

一旦MonsterMQ的MCP服务器运行起来,AI模型或其他MCP客户端可以通过指定其网络地址和端口(例如 'http://localhost:3000')来连接并与之交互。 您可以使用Python等语言的MCP客户端库来连接,并执行以下操作:

  1. 连接MCP服务器:

    import mcp_client
    # 连接到在本地3000端口运行的MCP服务器
    client = mcp_client.connect("http://localhost:3000")
  2. 查询实时MQTT主题数据:

    # 查询所有以"sensors/"开头的当前主题值
    current_data = client.query_topics("sensors/#")
    print("当前传感器数据:", current_data)
  3. 获取历史MQTT数据:

    import datetime
    # 获取"sensors/temperature"主题在过去一小时内的历史数据
    end_time = datetime.datetime.now(datetime.timezone.utc)
    start_time = end_time - datetime.timedelta(hours=1)
    history = client.get_history("sensors/temperature",
                               start_time=start_time.isoformat(),
                               limit=100) # 限制获取100条数据
    print("历史温度数据:", history)
  4. 执行自定义SQL查询(PostgreSQL/SQLite后端特有):

    # 执行一个SQL查询,获取过去一小时内所有传感器主题的平均值
    results = client.execute_query("""
        SELECT topic, AVG(payload::float) as avg_value
        FROM mcparchive
        WHERE topic LIKE 'sensors/%'
        AND time > NOW() - INTERVAL '1 hour'
        GROUP BY topic
    """)
    print("传感器主题平均值:", results)

信息

分类

AI与计算