基于 redis 的有序集合结构实现可靠的延迟消息队列中间件。提供 ACK 和重试机制;只需要 Redis 和消费者即可运行,无需其它组件;开箱即用无需复杂配置;
我们先看看以下业务场景:
上述场景最简单直接的解决方案是定时扫表。我们假设 10 分钟未支付则关闭订单、定时任务设置为 5 分钟一次,那么一个订单最晚会在 15 分钟关闭。高达 5 分钟的误差是业务难以接受的。另一方面频繁的扫表可能消耗过多数据库资源,影响线上交易吞吐量。
此外还有朋友使用 Redis 的过期通知、时间轮、Java 的 DelayQueue 等方式实现延时任务。我们在之前的文章中讨论过他们的缺陷:比如使用 Redis 过期通知不保证准时、发送即忘不保证送达,时间轮缺乏持久化机制容易丢失等。
总结一下,我们对于延时队列的要求有下列几条(从重要到不重要排列):
最合适的解决方案是使用 Pulsa、RocketMQ 等专业消息队列的延时投递功能。不过引入新的中间件通常存在各种非技术方面的麻烦。Redis 作为广泛使用的中间件,何不用 Redis 来制作延时队列呢?
使用有序集合结构实现延时队列的方法已经广为人知,无非是将消息作为有序集合的 member 投递时间戳作为 score,使用 zrangebyscore 命令搜索已到投递时间的消息然后将其发给消费者。
除了基本的延时投递之外我们的消息队列具有下列优势:
本文的完整代码实现在hdt3213/delayqueue,可以直接 go get github.com/hdt3213/delayqueue
完成安装。
具体使用也非常简单,只需要注册处理消息的回调函数并调用 start() 即可:
package main
import (
"github.com/go-redis/redis/v8"
"github.com/hdt3213/delayqueue"
"strconv"
"time"
)
func main() {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
// 注册处理消息的回调函数
// 返回 true 表示已成功消费,返回 false 消息队列会重新投递次消息
return true
})
// 发送延时消息
for i := 0; i < 10; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
if err != nil {
panic(err)
}
}
// 启动消费协程
done := queue.StartConsume()
// 阻塞等待消费协程退出
<-done
}
由于数据存储在 redis 中所以我们最多能保证在 redis 无故障且消息队列相关 key 未被外部篡改的情况下不会丢失消息。
消息队列涉及几个关键的 redis 数据结构:
流程如下图所示:
由于我们允许分布式地部署多个消费者,每个消费者都在定时执行 lua 脚本,所以多个消费者可能处于上述流程中不同状态,我们无法预知(或控制)上图中五个操作发生的先后顺序,也无法控制有多少实例正在执行同一个操作。
因此我们需要保证上图中五个操作满足三个条件:
只要满足这三个条件,我们就可以部署多个实例且不需要使用分布式锁等技术来进行状态同步。
是不是听起来有点吓人?😂 其实简单的很,让我们一起来详细看看吧~
pending2ReadyScript 使用 zrangebyscore 扫描已到投递时间的消息ID并把它们移动到 ready 中:
-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 从 pending key 中找出已到投递时间的消息
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- 将他们放入 ready key 中
for _,v in ipairs(msgs) do
table.insert(args2, v)
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 从 pending key 中删除已投递的消息
ready2UnackScript 从 ready 或者 retry 中取出一条消息发送给消费者并放入 unack 中,类似于 RPopLPush:
-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg
unack2RetryScript 从 retry 中找出所有已到重试时间的消息并把它们移动到 unack 中:
-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 找到已到重试时间的消息
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查询剩余重试次数
for i,v in ipairs(retryCounts) do
local k = msgs[i]
if tonumber(v) > 0 then -- 剩余次数大于 0
redis.call("HIncrBy", KEYS[2], k, -1) -- 减少剩余重试次数
redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中
else -- 剩余重试次数为 0
redis.call("HDel", KEYS[2], k) -- 删除重试次数记录
redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待后续删除
end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 将已处理的消息从 unack key 中删除
因为 redis 要求 lua 脚本必须在执行前在 KEYS 参数中声明自己要访问的 key, 而我们将每个 msg 有一个独立的 key,我们在执行 unack2RetryScript 之前是不知道哪些 msg key 需要被删除。所以 lua 脚本只将需要删除的消息记在 garbage key 中,脚本执行完后再通过 del 命令将他们删除:
func (q *DelayQueue) garbageCollect() error {
ctx := context.Background()
msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
if err != nil {
return fmt.Errorf("smembers failed: %v", err)
}
if len(msgIds) == 0 {
return nil
}
// allow concurrent clean
msgKeys := make([]string, 0, len(msgIds))
for _, idStr := range msgIds {
msgKeys = append(msgKeys, q.genMsgKey(idStr))
}
err = q.redisCli.Del(ctx, msgKeys...).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("del msgs failed: %v", err)
}
err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("remove from garbage key failed: %v", err)
}
return nil
}
之前提到的 lua 脚本都是原子性执行的,不会有其它命令插入其中。 gc 函数由 3 条 redis 命令组成,在执行过程中可能会有其它命令插入执行过程中,不过考虑到一条消息进入垃圾回收流程之后不会复活所以不需要保证 3 条命令原子性。
ack 只需要将消息彻底删除即可:
func (q *DelayQueue) ack(idStr string) error {
ctx := context.Background()
err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
if err != nil {
return fmt.Errorf("remove from unack failed: %v", err)
}
// msg key has ttl, ignore result of delete
_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
q.redisCli.HDel(ctx, q.retryCountKey, idStr)
return nil
}
否定确认只需要将 unack key 中消息的重试时间改为现在,随后执行的 unack2RetryScript 会立即将它移动到 retry key
func (q *DelayQueue) nack(idStr string) error {
ctx := context.Background()
// update retry time as now, unack2Retry will move it to retry immediately
err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
Member: idStr,
Score: float64(time.Now().Unix()),
}).Err()
if err != nil {
return fmt.Errorf("negative ack failed: %v", err)
}
return nil
}
消息队列的核心逻辑是每秒执行一次的 consume 函数,它负责调用上述脚本将消息转移到正确的集合中并回调 consumer 来消费消息:
func (q *DelayQueue) consume() error {
// 执行 pending2ready,将已到时间的消息转移到 ready
err := q.pending2Ready()
if err != nil {
return err
}
// 循环调用 ready2Unack 拉取消息进行消费
var fetchCount uint
for {
idStr, err := q.ready2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
// 将 nack 或超时的消息放入重试队列
err = q.unack2Retry()
if err != nil {
return err
}
// 清理已达到最大重试次数的消息
err = q.garbageCollect()
if err != nil {
return err
}
// 消费重试队列
fetchCount = 0
for {
idStr, err := q.retry2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
return nil
}
至此一个简单可靠的延时队列就做好了,何不赶紧开始试用呢😘😋?
作者:finley
出处:https://www.cnblogs.com/Finley/p/16400287.html
版权:本作品采用「署名-非商业性使用-相同方式共享 4.0 国际」许可协议进行许可。
好文要顶;) 关注我;) 收藏该文;) ; "分享至新浪微博") ; "分享至微信")
+加关注;)
3
0
« 上一篇: 领导:谁再用redis过期监听实现关闭订单,立马滚蛋!
原网址: 访问
创建于: 2022-07-20 11:39:54
目录: default
标签: 无
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论