项目简介
MonsterMQ是一个功能丰富的MQTT消息代理(broker),支持MQTT 3.1.1协议,具备无限消息存储、QoS支持、持久会话和WebSocket连接。它通过Hazelcast实现集群化和高可用性,并支持多种数据库后端进行数据存储。其核心亮点之一是内置的MCP服务器,专为AI模型提供MQTT数据的上下文访问和集成。
主要功能点
- 企业级MQTT代理: 完整支持MQTT 3.1.1协议,具备QoS 0, 1, 2,持久会话,WebSocket和TLS/SSL安全连接。
- 水平扩展与高可用: 基于Hazelcast的集群功能,实现多节点负载均衡和自动故障转移。
- 多数据库后端支持: 可配置PostgreSQL, SQLite, CrateDB, MongoDB, Memory, Hazelcast, Kafka等作为会话、保留消息、消息归档和消息存储的后端。
- AI集成(MCP服务器): 内置Model Context Protocol (MCP) 服务器,允许AI模型查询实时和历史MQTT数据,进行主题分析和执行自定义查询。
- SparkplugB扩展: 自动处理和扩展SparkplugB消息,支持工业物联网应用。
安装步骤
推荐使用Docker Compose进行快速部署,以下是配置示例:
-
创建 'docker-compose.yml' 文件 在您的项目目录下创建一个名为 'docker-compose.yml' 的文件,并粘贴以下内容:
services: postgres: image: timescale/timescaledb:latest-pg16 container_name: postgres restart: unless-stopped ports: - 5432:5432 volumes: - ./db:/var/lib/postgresql/data environment: POSTGRES_USER: system POSTGRES_PASSWORD: manager POSTGRES_DB: monster monstermq: image: rocworks/monstermq:latest container_name: monstermq restart: unless-stopped ports: - 1883:1883 # MQTT TCP端口 - 8883:8883 # MQTT TLS端口 - 9000:9000 # WebSocket端口 - 9001:9001 # WebSocket TLS端口 - 3000:3000 # MCP服务器端口 volumes: - ./config.yaml:/app/config.yaml command: ["-config", "config.yaml", "-log", "INFO"] depends_on: - postgres -
创建 'config.yaml' 文件 在同一目录下创建一个名为 'config.yaml' 的文件,并粘贴以下内容:
TCP: 1883 WS: 9000 TCPS: 8883 WSS: 9001 MaxMessageSizeKb: 512 SessionStoreType: POSTGRES RetainedStoreType: POSTGRES QueuedMessagesEnabled: true ArchiveGroups: - Name: MCP Enabled: true TopicFilter: [ "#" ] RetainedOnly: false LastValType: POSTGRES ArchiveType: POSTGRES Postgres: Url: jdbc:postgresql://postgres:5432/monster User: system Pass: manager MCP: Enabled: true Port: 3000 -
启动服务 在包含 'docker-compose.yml' 和 'config.yaml' 文件的目录下,打开终端并运行以下命令:
docker-compose up -d这将启动一个PostgreSQL数据库和一个MonsterMQ实例,其中包含了启用的MCP服务器。
服务器配置
MCP客户端需要通过网络连接到MonsterMQ的MCP服务器。以下是一个示例配置信息,用于指导MCP客户端如何连接到MonsterMQ的MCP服务器。请根据您的实际部署环境调整'port'值。
{ "server_name": "MonsterMQ_MCP_Server", "command": "docker run -p 3000:3000 --network host -v /path/to/your/config.yaml:/app/config.yaml rocworks/monstermq:latest", "args": [ "-config", "config.yaml", "-log", "INFO" // 注意:这里的command和args是示例,用于说明如何启动服务。 // 实际的启动参数应根据您的Docker或本地部署方式进行调整。 // 最重要的是确保config.yaml中MCP:Enabled为true且端口与MCP客户端配置的port一致。 ], "port": 3000, // MCP服务器监听的端口,通常为3000 "description": "MonsterMQ提供MQTT数据上下文的MCP服务器。" }
- 'server_name': 给MCP服务器实例起一个易于识别的名称,例如“MonsterMQ_MCP_Server”。
- 'command': 启动MonsterMQ服务的命令行或Docker命令。此示例使用Docker命令,并假设您已准备好包含MCP配置的'config.yaml'文件。'--network host' 和 '-v' 参数根据您的Docker环境和配置文件的实际位置可能需要调整。
- 'args': 传递给MonsterMQ启动命令的额外参数。例如,指定配置文件路径和日志级别。这些参数会追加到'command'后面。
- 'port': MCP服务器监听的端口,通常默认为3000。
- 'description': 对该MCP服务器的简要说明,例如“MonsterMQ提供MQTT数据上下文的MCP服务器。”
基本使用方法
一旦MonsterMQ的MCP服务器运行起来,AI模型或其他MCP客户端可以通过指定其网络地址和端口(例如 'http://localhost:3000')来连接并与之交互。 您可以使用Python等语言的MCP客户端库来连接,并执行以下操作:
-
连接MCP服务器:
import mcp_client # 连接到在本地3000端口运行的MCP服务器 client = mcp_client.connect("http://localhost:3000") -
查询实时MQTT主题数据:
# 查询所有以"sensors/"开头的当前主题值 current_data = client.query_topics("sensors/#") print("当前传感器数据:", current_data) -
获取历史MQTT数据:
import datetime # 获取"sensors/temperature"主题在过去一小时内的历史数据 end_time = datetime.datetime.now(datetime.timezone.utc) start_time = end_time - datetime.timedelta(hours=1) history = client.get_history("sensors/temperature", start_time=start_time.isoformat(), limit=100) # 限制获取100条数据 print("历史温度数据:", history) -
执行自定义SQL查询(PostgreSQL/SQLite后端特有):
# 执行一个SQL查询,获取过去一小时内所有传感器主题的平均值 results = client.execute_query(""" SELECT topic, AVG(payload::float) as avg_value FROM mcparchive WHERE topic LIKE 'sensors/%' AND time > NOW() - INTERVAL '1 hour' GROUP BY topic """) print("传感器主题平均值:", results)
信息
分类
AI与计算