消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。

我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢。
即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一。
也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。
一个消息 M 发送到了消息中间件,消息投递到了消费程序 A。A 接受到了消息,然后进行消费。
但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。
程序 A 接受到这个消息 M 并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以它还会继续投递。
这时候对于应用程序 A 来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。
这在 RockectMQ 的场景来看,就是同一个 messageId 的消息重复投递下来了。
基于消息的投递可靠(消息不丢)是优先级更高的,所以消息不重的任务就会转移到应用程序自我实现,这也是为什么 RocketMQ 的文档里强调的,消费逻辑需要自我实现幂等。
背后的逻辑其实就是:不丢和不重是矛盾的(在分布式场景下),但消息重复是有解决方案的,而消息丢失是很麻烦的。
关于 RocketMQ 消息重复的场景,官方文档上给出了这三种情况:
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列RocketMQ版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。
3.负载均衡时消息重复(包括但不限于网络抖动、Broker重启以及消费者应用重启)
当消息队列RocketMQ版的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息。
简单的消息去重解决方案
假设我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存。
insert
into t_order
values .....
update t_inv
set
count =
count
-1
where good_id =
'good123';
select * from t_order where order_no = 'order123'
if(order != null) {
return ;//消息重复,直接返回
}
这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。
并发重复消息
假设这个消费的所有代码加起来需要 1 秒,有重复的消息在这 1 秒内(假设 100 毫秒)内到达。
那么很可能,上面去重代码里面会发现,数据依然是空的,因为上一条消息还没消费完,还没成功更新订单状态。
具体一点就是两个线程在间隔非常短甚至是同时执行这个逻辑:
select * from t_order where order_no = 'order123'
if(order != null) {
return ;//消息重复,直接返回
}
那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到非幂等安全的业务代码中,从而引发重复消费的问题,如主键冲突抛出异常、库存被重复扣减而没释放等。
要解决上面并发场景下的消息幂等问题,一个可取的方案是开启事务把 select 改成 select for update 语句,把记录进行锁定:
select *
from t_order
where order_no =
'THIS_ORDER_NO'
for update
//开启事务
if(order.status !=
null) {
return ;
//消息重复,直接返回
}