3.3 KiB
全能的 Redis - Message Queue
前言
众所周知,专业的消息队列有很多,但是,Redis 又不是不行?XD 如果可以,在生产环境中还是选择 RabbitMQ、Kafka 之类的服务,它们生态好,符合大部分项目的需求。而我选择 Redis 作为消息队列,主要还是因为不希望我的个人项目引入太多依赖,毕竟我自己没有多少服务器资源,还是得向苹果公司学习,以环保为重 /doge。
我经历的 Redis 消息队列
这不是第一次,也不是最后一次。之前开发过的项目,有些对消息队列的需求还是很少且很弱的,只是为了解决一个问题才借助 redis 的部分特性实现一些 MQ 的特性,或多或少会有一些瑕疵,但是现在看来还是十分合理的选择。
阻塞列表 (Blocking List)
阻塞列表其实是还是普通列表,只是在出栈的时候使用了阻塞的方式来等待值的到来。
相关的方法有以下几个:
BLPOP
、BRPOP
:阻塞式弹出,用于消费时取出队列中的值BRPOPLPUSH
:在同一个队列中,把最右边的消息重新插入到左边,用来重新加入队列LPUSH
、RPUSH
: 向队列添加消息
拥有上面几个命令,我们就可以借助 Redis 实现一个简单的消息队列。 我们定义消息队列队头在左(Left),队尾在右(Right),消息遵循 FIFO 原则进行消费。
// TODO: 添加示例代码
pStart=>start: Start
pEnd=>end: End
lpush=>operation: LPUSH
pStart->lpush->pEnd
上图是生产者发布消息的流程图。流程非常简单,只要往 redis 存放数据就成了。
cStart=>start: Start
cEnd=>end: End
brpop=>operation: BRPOP
consume=>operation: 执行任务
consumeSucceeded=>condition: 任务执行是否成功?
repush=>operation: LPUSH
isExit=>condition: 是否退出?
cStart->brpop->cEnd
brpop->consume->consumeSucceeded
consumeSucceeded(no)->repush->brpop
consumeSucceeded(yes)->isExit(yes)->cEnd
consumeSucceeded(yes)->isExit(no)->brpop
上图是消费者消费消息的流程图。阻塞地读取消息,然后拿着消息执行任务。如果执行成功,那么再去读取消息;否则,把消息重新插入到消息队列中。
这个方案很简单轻量,但有一个缺点,生产者可以和其他 redis 操作共用一个 redis 连接,但是消费者必须独立使用一个连接。主要是因为消费者在阻塞等待消息的这段时间,其他 redis 将会一同阻塞。同样的,如果要使用这个方案,最好给 BRPOP
一个超时时间,避免意外情况导致程序卡死在这。
阻塞列表 + BRPOPLPUSH
上面的方案有一定的问题,就是消费者如果在读取消息和完成任务之间出现了 Crash 之类的情况,导致失败的任务没有重新回到消息队列,进而导致消息丢失。如果消息丢了就丢了没啥事,那问题还不算大,但是对消息可靠性有一定要求的话,可以增加一个“working queue”,消费者读取任务时,将任务转移到 working queue 中,待任务完成后再将任务移除。
生产者的工作流程和之前一样,主要是消费者这边增加了一个 working queue。
顺带一提,Redis 6.2 开始还加了一个 BLMOVE
,提供了不同列表之间的头、尾元素移动到另一个列表的头、尾。