面试官:给我一个避免消息重复消费的解决方案?

作者:微信小助手

发布时间:2021-09-17T12:09:54

消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。

我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢。
即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一。
也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。
一个消息 M 发送到了消息中间件,消息投递到了消费程序 A。A 接受到了消息,然后进行消费。
但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。
然而这种可靠的特性会导致消息可能被多次地投递。
还是刚刚这个例子。
程序 A 接受到这个消息 M 并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以它还会继续投递。
这时候对于应用程序 A 来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。
这在 RockectMQ 的场景来看,就是同一个 messageId 的消息重复投递下来了。
基于消息的投递可靠(消息不丢)是优先级更高的,所以消息不重的任务就会转移到应用程序自我实现,这也是为什么 RocketMQ 的文档里强调的,消费逻辑需要自我实现幂等。
背后的逻辑其实就是:不丢和不重是矛盾的(在分布式场景下),但消息重复是有解决方案的,而消息丢失是很麻烦的。
关于 RocketMQ 消息重复的场景,官方文档上给出了这三种情况:
1.发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。
2.投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列RocketMQ版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。
3.负载均衡时消息重复(包括但不限于网络抖动、Broker重启以及消费者应用重启)
当消息队列RocketMQ版的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息。
那么,有什么解决方案呢?

 简单的消息去重解决方案

假设我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存。
  
insert  into t_order  values .....
update t_inv  set  count =  count -1  where good_id =  'good123';
要实现消息的幂等,我们可能会采取这样的方案:
  
select * from t_order where order_no = 'order123'
if(order != null) {
    return ;//消息重复,直接返回
}
这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。

 并发重复消息

假设这个消费的所有代码加起来需要 1 秒,有重复的消息在这 1 秒内(假设 100 毫秒)内到达。
例如生产者快速重发,Broker 重启等。
那么很可能,上面去重代码里面会发现,数据依然是空的,因为上一条消息还没消费完,还没成功更新订单状态。
具体一点就是两个线程在间隔非常短甚至是同时执行这个逻辑:
  
select * from t_order where order_no = 'order123'
然后发现都没有查到数据,于是走入到这个逻辑中:
  
if(order != null) {
    return ;//消息重复,直接返回
}
那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到非幂等安全的业务代码中,从而引发重复消费的问题,如主键冲突抛出异常、库存被重复扣减而没释放等。
要解决上面并发场景下的消息幂等问题,一个可取的方案是开启事务把 select 改成 select for update 语句,把记录进行锁定:
  
select *  from t_order  where order_no =  'THIS_ORDER_NO'  for update  //开启事务
if(order.status !=  null) {
     return ; //消息重复,直接返回
}