blog-articles/Programmer/全能的 redis - Message Queue.md
2021-04-11 12:22:25 +08:00

3.3 KiB
Raw Blame History

全能的 Redis - Message Queue

前言

众所周知专业的消息队列有很多但是Redis 又不是不行XD 如果可以,在生产环境中还是选择 RabbitMQ、Kafka 之类的服务,它们生态好,符合大部分项目的需求。而我选择 Redis 作为消息队列,主要还是因为不希望我的个人项目引入太多依赖,毕竟我自己没有多少服务器资源,还是得向苹果公司学习,以环保为重 /doge。

我经历的 Redis 消息队列

这不是第一次,也不是最后一次。之前开发过的项目,有些对消息队列的需求还是很少且很弱的,只是为了解决一个问题才借助 redis 的部分特性实现一些 MQ 的特性,或多或少会有一些瑕疵,但是现在看来还是十分合理的选择。

阻塞列表 (Blocking List)

阻塞列表其实是还是普通列表,只是在出栈的时候使用了阻塞的方式来等待值的到来。

相关的方法有以下几个:

  • BLPOPBRPOP:阻塞式弹出,用于消费时取出队列中的值
  • BRPOPLPUSH:在同一个队列中,把最右边的消息重新插入到左边,用来重新加入队列
  • LPUSHRPUSH: 向队列添加消息

拥有上面几个命令,我们就可以借助 Redis 实现一个简单的消息队列。 我们定义消息队列队头在左(Left),队尾在右(Right),消息遵循 FIFO 原则进行消费。

// TODO: 添加示例代码

  pStart=>start: Start
  pEnd=>end: End
  lpush=>operation: LPUSH
  
  pStart->lpush->pEnd

上图是生产者发布消息的流程图。流程非常简单,只要往 redis 存放数据就成了。

  cStart=>start: Start
  cEnd=>end: End
  brpop=>operation: BRPOP
  consume=>operation: 执行任务
  consumeSucceeded=>condition: 任务执行是否成功?
  repush=>operation: LPUSH
  isExit=>condition: 是否退出?
  
  cStart->brpop->cEnd
  brpop->consume->consumeSucceeded
  consumeSucceeded(no)->repush->brpop
  consumeSucceeded(yes)->isExit(yes)->cEnd
  consumeSucceeded(yes)->isExit(no)->brpop

上图是消费者消费消息的流程图。阻塞地读取消息,然后拿着消息执行任务。如果执行成功,那么再去读取消息;否则,把消息重新插入到消息队列中。

这个方案很简单轻量,但有一个缺点,生产者可以和其他 redis 操作共用一个 redis 连接,但是消费者必须独立使用一个连接。主要是因为消费者在阻塞等待消息的这段时间,其他 redis 将会一同阻塞。同样的,如果要使用这个方案,最好给 BRPOP 一个超时时间,避免意外情况导致程序卡死在这。

阻塞列表 + BRPOPLPUSH

上面的方案有一定的问题,就是消费者如果在读取消息和完成任务之间出现了 Crash 之类的情况导致失败的任务没有重新回到消息队列进而导致消息丢失。如果消息丢了就丢了没啥事那问题还不算大但是对消息可靠性有一定要求的话可以增加一个“working queue”消费者读取任务时将任务转移到 working queue 中,待任务完成后再将任务移除。

生产者的工作流程和之前一样主要是消费者这边增加了一个 working queue。

顺带一提Redis 6.2 开始还加了一个 BLMOVE,提供了不同列表之间的头、尾元素移动到另一个列表的头、尾。