作者:微信小助手
发布时间:2021-04-22T21:19:28
作者:Magic Kaito 本文已收录至我的GitHub 阅读本文大约需要 15 分钟。 我经常听到很多人讨论,关于「把 Redis 当作队列来用是否合适」的问题。 有些人表示赞成,他们认为 Redis 很轻量,用作队列很方便。 也些人则反对,认为 Redis 会「丢」数据,最好还是用「专业」的队列中间件更稳妥。 究竟哪种方案更好呢? 这篇文章,我就和你聊一聊把 Redis 当作队列,究竟是否合适这个问题。 我会从简单到复杂,一步步带你梳理其中的细节,把这个问题真正的讲清楚。 看完这篇文章后,我希望你对这个问题你会有全新的认识。 在文章的最后,我还会告诉你关于「技术选型」的思路,文章有点长,希望你可以耐心读完。 首先,我们先从最简单的场景开始讲起。 如果你的业务需求足够简单,想把 Redis 当作队列来使用,肯定最先想到的就是使用 List 这个数据类型。 因为 List 底层的实现就是一个「链表」,在头部和尾部操作元素,时间复杂度都是 O(1),这意味着它非常符合消息队列的模型。 如果把 List 当作队列,你可以这么来用。 生产者使用 LPUSH 发布消息: 消费者这一侧,使用 RPOP 拉取消息: 这个模型非常简单,也很容易理解。 但这里有个小问题,当队列中已经没有消息了,消费者在执行 RPOP 时,会返回 NULL。 而我们在编写消费者逻辑时,一般是一个「死循环」,这个逻辑需要不断地从队列中拉取消息进行处理,伪代码一般会这么写: 如果此时队列为空,那消费者依旧会频繁拉取消息,这会造成「CPU 空转」,不仅浪费 CPU 资源,还会对 Redis 造成压力。 怎么解决这个问题呢? 也很简单,当队列为空时,我们可以「休眠」一会,再去尝试拉取消息。代码可以修改成这样: 这就解决了 CPU 空转问题。 这个问题虽然解决了,但又带来另外一个问题:当消费者在休眠等待时,有新消息来了,那消费者处理新消息就会存在「延迟」。 假设设置的休眠时间是 2s,那新消息最多存在 2s 的延迟。 要想缩短这个延迟,只能减小休眠的时间。但休眠时间越小,又有可能引发 CPU 空转问题。 鱼和熊掌不可兼得。 那如何做,既能及时处理新消息,还能避免 CPU 空转呢? Redis 是否存在这样一种机制:如果队列为空,消费者在拉取消息时就「阻塞等待」,一旦有新消息过来,就通知我的消费者立即处理新消息呢? 幸运的是,Redis 确实提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP,这里的 B 指的是阻塞(Block)。 现在,你可以这样来拉取消息了: 使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL。 这个方案不错,既兼顾了效率,还避免了 CPU 空转问题,一举两得。 注意:如果设置的超时时间太长,这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,之后 Redis Server 会强制把这个客户端踢下线。所以,采用这种方案,客户端要有重连机制。 解决了消息处理不及时的问题,你可以再思考一下,这种队列模型,有什么缺点? 我们一起来分析一下: 第一个问题是功能上的,使用 List 做消息队列,它仅仅支持最简单的,一组生产者对应一组消费者,不能满足多组生产者和消费者的业务场景。 第二个问题就比较棘手了,因为从 List 中 POP 一条消息出来后,这条消息就会立即从链表中删除了。也就是说,无论消费者是否处理成功,这条消息都没办法再次消费了。 这也意味着,如果消费者在处理消息时异常宕机,那这条消息就相当于丢失了。 针对这 2 个问题怎么解决呢?我们一个个来看。 从名字就能看出来,这个模块是 Redis 专门是针对「发布/订阅」这种队列模型设计的。 它正好可以解决前面提到的第一个问题:重复消费。 即多组生产者、消费者的场景,我们来看它是如何做的。 Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。 假设你想开启 2 个消费者,同时消费同一批数据,就可以按照以下方式来实现。 首先,使用 SUBSCRIBE 命令,启动 2 个消费者,并「订阅」同一个队列。 此时,2 个消费者都会被阻塞住,等待新消息的到来。 之后,再启动一个生产者,发布一条消息。 这时,2 个消费者就会解除阻塞,收到生产者发来的新消息。 看到了么,使用 Pub/Sub 这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者,消费同一批数据的业务需求。 除此之外,Pub/Sub 还提供了「匹配订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。
本文公众号来源:水滴与银弹
从最简单的开始:List 队列
127.0.0.1:6379> LPUSH queue msg1
(integer) 1
127.0.0.1:6379> LPUSH queue msg2
(integer) 2127.0.0.1:6379> RPOP queue
"msg1"
127.0.0.1:6379> RPOP queue
"msg2"
127.0.0.1:6379> RPOP queue
(nil) // 没消息了while true:
msg = redis.rpop("queue")
// 没有消息,继续循环
if msg == null:
continue
// 处理消息
handle(msg)while true:
msg = redis.rpop("queue")
// 没有消息,休眠2s
if msg == null:
sleep(2)
continue
// 处理消息
handle(msg)
while true:
// 没消息阻塞等待,0表示不设置超时时间
msg = redis.brpop("queue", 0)
if msg == null:
continue
// 处理消息
handle(msg)
发布/订阅模型:Pub/Sub
// 2个消费者 都订阅一个队列
127.0.0.1:6379> SUBSCRIBE queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1127.0.0.1:6379> PUBLISH queue msg1
(integer) 1127.0.0.1:6379> SUBSCRIBE queue
// 收到新消息
1) "message"
2) "queue"
3) "msg1"