想象一下:你的代码一行都不用改,只改一行配置,就能在"本地开发""Redis 集群""RocketMQ 集群"之间自由切换消息中间件。这不是梦想,这是真实存在的工程实践。今天就用大白话,把这个设计拆给你看。
假设你经营一个快递中转站,每天有成千上万个包裹需要分发。你的中转站需要把包裹送到不同的目的地:
一开始,中转站只有一个小推车(本地内存),包裹不多,完全够用。
后来包裹多了,你换上了快递柜(Redis),多个快递员可以同时取件。
再后来,包裹暴涨到每天百万级,你直接上了全自动分拣流水线(RocketMQ),能扛住上万件的洪峰。
关键来了——不管用什么工具,快递站的"收件 → 分拣 → 派送"这个流程从来没变过。
这就是 IoT 消息总线要做的事。
一个真实的 IoT 平台,通常长这样:
设备(传感器/空调/门锁/...) ↓ MQTT/HTTP/TCP 上报数据 网关层(Gateway)—— 负责接收设备消息 ↓ ??? 怎么把消息交给业务层 ??? 业务层(Biz)—— 负责存储、规则引擎、场景联动 ↓ ??? 怎么把控制命令交给网关 ??? 网关层(Gateway)—— 负责下发指令到设备
中间那两个 ??? 就是核心问题:网关层和业务层之间,到底怎么通信?
不行。网关和业务可能是不同的进程、不同的服务器、甚至不同的机房。直接调方法意味着强耦合,一个挂了另一个也挂。
也行,但是——如果本地开发时没有 RocketMQ 怎么办?如果临时换个 MQ 怎么办?每换一次就要改一堆代码?
所有代码都只跟接口打交道,具体用什么中间件,由配置决定。这就是整个设计的核心思想。
整个消息总线体系只有三个角色,简单到不能再简单:
javapublic interface IotMessageBus {
// 发布消息(寄件)
void post(String topic, Object message);
// 注册订阅者(告诉快递站"我要收哪个频道的包裹")
void register(IotMessageSubscriber<?> subscriber);
}
就两个方法:发消息和注册接收者。不管底层是 Spring 事件、Redis 还是 RocketMQ,对外暴露的就这两个。
javapublic interface IotMessageSubscriber<T> {
// 我要收哪个频道的消息
String getTopic();
// 我属于哪个消费组
String getGroup();
// 收到消息后干嘛
void onMessage(T message);
}
每个想接收消息的模块,只需要实现这个接口,告诉总线"我要听哪个 topic、我属于哪个组、收到后怎么处理"。
yaml# application.yaml 中只需一行
aiot.iot.message-bus.type: local # 可选:local / redis / rocketmq
这就是那个"一行配置切换三种中间件"的魔法开关。
原理:利用 Spring 自带的事件机制(ApplicationEvent),在同一个 JVM 进程内传递消息。
发消息:post(topic, message) ↓ 包装成 Spring 事件 ApplicationContext.publishEvent() ↓ Spring 同步回调 @EventListener onMessage() ↓ 按 topic 找到订阅者 subscriber.onMessage()
适合场景:
优点:零依赖、可断点调试、启动飞快。
局限:只在单个 JVM 内有效,跨服务器不行。
原理:利用 Redis 5.0+ 的 Stream 数据结构,实现跨进程的消息发布/订阅。
发消息:post(topic, message) ↓ JSON 序列化 redis XADD topic * message ← 写入 Redis Stream ↓ Stream Consumer Group 拉取 订阅者收到消息 ↓ 处理完成后 redis XACK group message ← 手动确认消费
亮点设计:
cancelOnError(false) —— 一条消息处理出错,不会导致整个消费者停摆RedisPendingMessageResendJob,把没确认的消息重新分配给存活的消费者RedisStreamMessageCleanupJob 清理老旧消息,防止内存爆炸适合场景:
原理:直接对接 Apache RocketMQ,利用其原生的集群、重试、死信机制。
发消息:post(topic, message) ↓ JSON 序列化 + 同步发送 RocketMQTemplate.syncSend(topic, message) ↓ Broker 推送给 Consumer Group DefaultMQPushConsumer 收到消息 ↓ 反序列化 + 调用 onMessage 处理成功 → CONSUME_SUCCESS 处理失败 → RECONSUME_LATER(自动重试 16 次)
亮点设计:
适合场景:
| 维度 | Local | Redis Stream | RocketMQ |
|---|---|---|---|
| 依赖 | 无 | Redis ≥ 5.0 | RocketMQ 集群 |
| 跨节点 | 不支持 | 支持 | 支持 |
| 消息可靠性 | 不保证 | 至少一次(ACK + 重推) | 至少一次(重试 + 死信) |
| 吞吐量 | 内存级 | 千级 QPS | 万级 TPS |
| 消息回溯 | 不支持 | 支持(pending list) | 支持(死信队列) |
| 适合场景 | 开发/测试 | 中小生产 | 大规模生产 |
| 运维成本 | 零 | 低(复用 Redis) | 高(独立集群) |
这部分是"零侵入"的关键。
在 META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports 文件中写上配置类的全限定名:
com.kymasf.aiot.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration
Spring Boot 启动时会自动扫描这个文件,加载里面的配置类。不需要任何手动 import 或包扫描配置。
配置类内部用条件注解实现互斥选择:
java@AutoConfiguration
@EnableConfigurationProperties(IotMessageBusProperties.class)
public class IotMessageBusAutoConfiguration {
// 1. 无论选哪种,都装配这个生产者
@Bean
public IotDeviceMessageProducer deviceMessageProducer(IotMessageBus messageBus) {
return new IotDeviceMessageProducer(messageBus);
}
// 2. type=local 时装配(不配置也默认走这个)
@Configuration
@ConditionalOnProperty(prefix = "aiot.iot.message-bus",
name = "type", havingValue = "local", matchIfMissing = true)
public static class IotLocalMessageBusConfiguration { ... }
// 3. type=rocketmq 且 classpath 有 RocketMQ 时才装配
@Configuration
@ConditionalOnProperty(prefix = "aiot.iot.message-bus",
name = "type", havingValue = "rocketmq")
@ConditionalOnClass(RocketMQTemplate.class)
public static class IotRocketMQMessageBusConfiguration { ... }
// 4. type=redis 且 classpath 有 Redis 时才装配
@Configuration
@ConditionalOnProperty(prefix = "aiot.iot.message-bus",
name = "type", havingValue = "redis")
@ConditionalOnClass(RedisTemplate.class)
public static class IotRedisMessageBusConfiguration { ... }
}
三个精妙之处:
matchIfMissing = true:不配置时默认用 Local,开发友好@ConditionalOnClass:classpath 里没有对应依赖时,相关 Bean 根本不会被创建,不会报错@ConditionalOnProperty 互斥:三种实现永远不会冲突用户把空调设为 26°C,整个消息流转过程:
空调设备 ↓ MQTT 上报 "温度=26°C" Gateway 收到 ↓ producer.sendDeviceMessage(message) ↓ messageBus.post("device.message.upstream", msg) 消息总线(RocketMQ Topic) ↓ 广播给多个 Consumer Group ├── Biz Consumer Group → 落库存储(持久化设备属性) ├── 场景联动 Group → 判断"温度≥26°C"→ 推送安全提醒 └── 数据转发 Group → 命中规则 → 转发到客户下游 HTTP
同一条消息,三个业务各自独立消费,互不干扰。
运营在管理后台点击"开启设备":
Admin 前端 ↓ 调用 API Biz 层 ↓ 查设备表,找到设备挂在哪个 Gateway ↓ producer.sendDeviceMessageToGateway(serverId, cmd) ↓ messageBus.post("device.message.downstream.{serverId}", cmd) 消息总线 ↓ 只有目标 Gateway 节点消费 Gateway 收到命令 ↓ 通过 MQTT/HTTP/TCP 推送给设备 设备执行"开启"
Biz 不需要知道设备用什么协议连接,Gateway 也不需要知道命令从哪来,中间靠消息总线解耦。
开发阶段: type=local → 不需要任何中间件,IDE 里直接跑 测试阶段: type=redis → 共用测试环境 Redis,多节点联调 上线初期: type=redis → 设备量不大,Redis 顶住,省运维成本 业务爆发: type=rocketmq → 上独立 MQ 集群,支撑万级设备并发
整个升级过程中,业务代码一行都没改。
回过头来看,这套设计用到了几个经典的工程思想:
IotMessageBus 是策略接口,三种实现是具体策略,配置项决定用哪个策略。业务代码只依赖接口,不依赖具体实现。
消息的发送方和接收方互不感知,通过 topic 这个"频道"间接通信。一对多广播、多对多消费都支持。
高层模块(Biz、Gateway)不依赖低层模块(Redis、RocketMQ),两者都依赖抽象(IotMessageBus 接口)。
要加新的消息中间件(比如 Kafka、RabbitMQ)?只需:
对现有代码零修改,完美符合"对扩展开放、对修改封闭"。
由 Spring 容器根据配置自动决定装配哪个实现,业务代码不参与决策过程。
好的架构设计,不是用了多少高深的技术,而是用最简单的抽象,解决最复杂的问题。
这个 IoT 消息总线的设计,核心就一个接口、两个方法、三个实现、一行配置。但它解决的是 IoT 领域最核心的难题:设备消息的可靠传输与多环境适配。
如果你正在做 IoT 项目,或者任何需要"多消息中间件可切换"的系统,这个设计模式可以直接拿来用。
好的代码,就像好的文章——删一个字嫌多,加一个字嫌少。
本文作者:Jarvis
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!