编辑
2026-06-14
项目实战经验
00

目录

一个开关切换三种消息中间件:聊聊 IoT 消息总线的设计哲学
一、先讲个故事:快递站的烦恼
二、IoT 系统的核心难题
方案一:直接方法调用?
方案二:各自对接中间件?
方案三(我们的选择):抽象一层"消息总线"接口
三、核心设计:三个角色
角色 1:消息总线(IotMessageBus)—— 快递中转站
角色 2:订阅者(IotMessageSubscriber)—— 收件人
角色 3:配置开关(IotMessageBusProperties)—— 总开关
四、三种实现:一个比一个能打
实现 1:Local —— 本地开发的"小推车"
实现 2:Redis Stream —— 中小集群的"快递柜"
实现 3:RocketMQ —— 生产级"全自动流水线"
三种实现对比一览
五、自动装配:引入依赖就生效
Spring Boot 3.x 自动装配机制
互斥装配:三个实现只有一个生效
六、真实业务场景走读
场景一:设备上报属性 → 触发场景联动
场景二:后台下发控制命令
场景三:部署形态平滑升级
七、设计精华总结
1. 策略模式
2. 发布-订阅模式
3. 依赖倒置原则
4. 开闭原则
5. 控制反转
八、写在最后

一个开关切换三种消息中间件:聊聊 IoT 消息总线的设计哲学

想象一下:你的代码一行都不用改,只改一行配置,就能在"本地开发""Redis 集群""RocketMQ 集群"之间自由切换消息中间件。这不是梦想,这是真实存在的工程实践。今天就用大白话,把这个设计拆给你看。

一、先讲个故事:快递站的烦恼

假设你经营一个快递中转站,每天有成千上万个包裹需要分发。你的中转站需要把包裹送到不同的目的地:

  • 有的包裹要送去仓库(存储)
  • 有的包裹要触发警报(比如易燃品报警)
  • 有的包裹要转发给客户指定的下游

一开始,中转站只有一个小推车(本地内存),包裹不多,完全够用。

后来包裹多了,你换上了快递柜Redis),多个快递员可以同时取件。

再后来,包裹暴涨到每天百万级,你直接上了全自动分拣流水线RocketMQ),能扛住上万件的洪峰。

关键来了——不管用什么工具,快递站的"收件 → 分拣 → 派送"这个流程从来没变过。

这就是 IoT 消息总线要做的事。


二、IoT 系统的核心难题

一个真实的 IoT 平台,通常长这样:

设备(传感器/空调/门锁/...) ↓ MQTT/HTTP/TCP 上报数据 网关层(Gateway)—— 负责接收设备消息 ↓ ??? 怎么把消息交给业务层 ??? 业务层(Biz)—— 负责存储、规则引擎、场景联动 ↓ ??? 怎么把控制命令交给网关 ??? 网关层(Gateway)—— 负责下发指令到设备

中间那两个 ??? 就是核心问题:网关层和业务层之间,到底怎么通信?

方案一:直接方法调用?

不行。网关和业务可能是不同的进程、不同的服务器、甚至不同的机房。直接调方法意味着强耦合,一个挂了另一个也挂。

方案二:各自对接中间件?

也行,但是——如果本地开发时没有 RocketMQ 怎么办?如果临时换个 MQ 怎么办?每换一次就要改一堆代码?

方案三(我们的选择):抽象一层"消息总线"接口

所有代码都只跟接口打交道,具体用什么中间件,由配置决定。这就是整个设计的核心思想。


三、核心设计:三个角色

整个消息总线体系只有三个角色,简单到不能再简单:

角色 1:消息总线(IotMessageBus)—— 快递中转站

java
public interface IotMessageBus { // 发布消息(寄件) void post(String topic, Object message); // 注册订阅者(告诉快递站"我要收哪个频道的包裹") void register(IotMessageSubscriber<?> subscriber); }

就两个方法:发消息注册接收者。不管底层是 Spring 事件、Redis 还是 RocketMQ,对外暴露的就这两个。

角色 2:订阅者(IotMessageSubscriber)—— 收件人

java
public interface IotMessageSubscriber<T> { // 我要收哪个频道的消息 String getTopic(); // 我属于哪个消费组 String getGroup(); // 收到消息后干嘛 void onMessage(T message); }

每个想接收消息的模块,只需要实现这个接口,告诉总线"我要听哪个 topic、我属于哪个组、收到后怎么处理"。

角色 3:配置开关(IotMessageBusProperties)—— 总开关

yaml
# application.yaml 中只需一行 aiot.iot.message-bus.type: local # 可选:local / redis / rocketmq

这就是那个"一行配置切换三种中间件"的魔法开关。


四、三种实现:一个比一个能打

实现 1:Local —— 本地开发的"小推车"

原理:利用 Spring 自带的事件机制(ApplicationEvent),在同一个 JVM 进程内传递消息。

发消息:post(topic, message) ↓ 包装成 Spring 事件 ApplicationContext.publishEvent() ↓ Spring 同步回调 @EventListener onMessage() ↓ 按 topic 找到订阅者 subscriber.onMessage()

适合场景

  • 本地开发调试(不用装 Redis/RocketMQ)
  • 单元测试(秒启动,无外部依赖)
  • 几十台设备的小型 Demo 部署

优点:零依赖、可断点调试、启动飞快。

局限:只在单个 JVM 内有效,跨服务器不行。

实现 2:Redis Stream —— 中小集群的"快递柜"

原理:利用 Redis 5.0+ 的 Stream 数据结构,实现跨进程的消息发布/订阅。

发消息:post(topic, message) ↓ JSON 序列化 redis XADD topic * message ← 写入 Redis Stream ↓ Stream Consumer Group 拉取 订阅者收到消息 ↓ 处理完成后 redis XACK group message ← 手动确认消费

亮点设计

  1. 手动 ACK(确认消费):消息处理成功后才告诉 Redis"我收到了",如果处理过程中崩溃,消息不会丢失
  2. 异常不中断cancelOnError(false) —— 一条消息处理出错,不会导致整个消费者停摆
  3. Pending 重推:配合定时任务 RedisPendingMessageResendJob,把没确认的消息重新分配给存活的消费者
  4. 消息清理:定期用 RedisStreamMessageCleanupJob 清理老旧消息,防止内存爆炸

适合场景

  • SIT/UAT 测试环境(多节点联调)
  • 中小型生产环境(设备万级、QPS 千级)
  • 已经部署了 Redis 但不想额外维护 MQ 的团队

实现 3:RocketMQ —— 生产级"全自动流水线"

原理:直接对接 Apache RocketMQ,利用其原生的集群、重试、死信机制。

发消息:post(topic, message) ↓ JSON 序列化 + 同步发送 RocketMQTemplate.syncSend(topic, message) ↓ Broker 推送给 Consumer Group DefaultMQPushConsumer 收到消息 ↓ 反序列化 + 调用 onMessage 处理成功 → CONSUME_SUCCESS 处理失败 → RECONSUME_LATER(自动重试 16 次)

亮点设计

  1. 自动重试:消费失败后,RocketMQ 会按阶梯退避策略自动重推(1s → 5s → 10s → 30s → ...),最多 16 次
  2. 死信队列(DLQ):重试 16 次还失败的消息进入死信队列,不会堵塞正常消费
  3. 负载均衡:同一个 Consumer Group 下多个实例自动分担消息
  4. 广播消费:不同 Group 可以各自独立消费同一条消息(设备落库、场景联动、数据转发并行处理)

适合场景

  • 大规模生产环境(设备十万级以上)
  • 需要高可用、主备容灾的核心业务
  • 高峰期 QPS 上万的 IoT 平台

三种实现对比一览

维度LocalRedis StreamRocketMQ
依赖Redis ≥ 5.0RocketMQ 集群
跨节点不支持支持支持
消息可靠性不保证至少一次(ACK + 重推)至少一次(重试 + 死信)
吞吐量内存级千级 QPS万级 TPS
消息回溯不支持支持(pending list)支持(死信队列)
适合场景开发/测试中小生产大规模生产
运维成本低(复用 Redis)高(独立集群)

五、自动装配:引入依赖就生效

这部分是"零侵入"的关键。

Spring Boot 3.x 自动装配机制

META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports 文件中写上配置类的全限定名:

com.kymasf.aiot.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration

Spring Boot 启动时会自动扫描这个文件,加载里面的配置类。不需要任何手动 import 或包扫描配置。

互斥装配:三个实现只有一个生效

配置类内部用条件注解实现互斥选择:

java
@AutoConfiguration @EnableConfigurationProperties(IotMessageBusProperties.class) public class IotMessageBusAutoConfiguration { // 1. 无论选哪种,都装配这个生产者 @Bean public IotDeviceMessageProducer deviceMessageProducer(IotMessageBus messageBus) { return new IotDeviceMessageProducer(messageBus); } // 2. type=local 时装配(不配置也默认走这个) @Configuration @ConditionalOnProperty(prefix = "aiot.iot.message-bus", name = "type", havingValue = "local", matchIfMissing = true) public static class IotLocalMessageBusConfiguration { ... } // 3. type=rocketmq 且 classpath 有 RocketMQ 时才装配 @Configuration @ConditionalOnProperty(prefix = "aiot.iot.message-bus", name = "type", havingValue = "rocketmq") @ConditionalOnClass(RocketMQTemplate.class) public static class IotRocketMQMessageBusConfiguration { ... } // 4. type=redis 且 classpath 有 Redis 时才装配 @Configuration @ConditionalOnProperty(prefix = "aiot.iot.message-bus", name = "type", havingValue = "redis") @ConditionalOnClass(RedisTemplate.class) public static class IotRedisMessageBusConfiguration { ... } }

三个精妙之处

  1. matchIfMissing = true:不配置时默认用 Local,开发友好
  2. @ConditionalOnClass:classpath 里没有对应依赖时,相关 Bean 根本不会被创建,不会报错
  3. @ConditionalOnProperty 互斥:三种实现永远不会冲突

六、真实业务场景走读

场景一:设备上报属性 → 触发场景联动

用户把空调设为 26°C,整个消息流转过程:

空调设备 ↓ MQTT 上报 "温度=26°C" Gateway 收到 ↓ producer.sendDeviceMessage(message) ↓ messageBus.post("device.message.upstream", msg) 消息总线(RocketMQ Topic) ↓ 广播给多个 Consumer Group ├── Biz Consumer Group → 落库存储(持久化设备属性) ├── 场景联动 Group → 判断"温度≥26°C"→ 推送安全提醒 └── 数据转发 Group → 命中规则 → 转发到客户下游 HTTP

同一条消息,三个业务各自独立消费,互不干扰。

场景二:后台下发控制命令

运营在管理后台点击"开启设备":

Admin 前端 ↓ 调用 API Biz 层 ↓ 查设备表,找到设备挂在哪个 Gateway ↓ producer.sendDeviceMessageToGateway(serverId, cmd) ↓ messageBus.post("device.message.downstream.{serverId}", cmd) 消息总线 ↓ 只有目标 Gateway 节点消费 Gateway 收到命令 ↓ 通过 MQTT/HTTP/TCP 推送给设备 设备执行"开启"

Biz 不需要知道设备用什么协议连接,Gateway 也不需要知道命令从哪来,中间靠消息总线解耦。

场景三:部署形态平滑升级

开发阶段: type=local → 不需要任何中间件,IDE 里直接跑 测试阶段: type=redis → 共用测试环境 Redis,多节点联调 上线初期: type=redis → 设备量不大,Redis 顶住,省运维成本 业务爆发: type=rocketmq → 上独立 MQ 集群,支撑万级设备并发

整个升级过程中,业务代码一行都没改。


七、设计精华总结

回过头来看,这套设计用到了几个经典的工程思想:

1. 策略模式

IotMessageBus 是策略接口,三种实现是具体策略,配置项决定用哪个策略。业务代码只依赖接口,不依赖具体实现。

2. 发布-订阅模式

消息的发送方和接收方互不感知,通过 topic 这个"频道"间接通信。一对多广播、多对多消费都支持。

3. 依赖倒置原则

高层模块(Biz、Gateway)不依赖低层模块(Redis、RocketMQ),两者都依赖抽象(IotMessageBus 接口)。

4. 开闭原则

要加新的消息中间件(比如 Kafka、RabbitMQ)?只需:

  1. 写一个新的实现类
  2. 在 AutoConfiguration 里加一个内部类

对现有代码零修改,完美符合"对扩展开放、对修改封闭"。

5. 控制反转

由 Spring 容器根据配置自动决定装配哪个实现,业务代码不参与决策过程。


八、写在最后

好的架构设计,不是用了多少高深的技术,而是用最简单的抽象,解决最复杂的问题

这个 IoT 消息总线的设计,核心就一个接口、两个方法、三个实现、一行配置。但它解决的是 IoT 领域最核心的难题:设备消息的可靠传输与多环境适配

如果你正在做 IoT 项目,或者任何需要"多消息中间件可切换"的系统,这个设计模式可以直接拿来用。

好的代码,就像好的文章——删一个字嫌多,加一个字嫌少。

本文作者:Jarvis

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!