当前位置:网站首页>A hundred lines of code to realize reliable delay queue based on redis
A hundred lines of code to realize reliable delay queue based on redis
2022-06-22 21:56:00 【Finley】
Keep creating , Accelerate growth ! This is my participation 「 Nuggets day new plan · 6 Yuegengwen challenge 」 Of the 28 God , Click to see the event details
In the previous article on delay queues, we mentioned redisson delayqueue Use redis The delay queue is implemented by the ordered set structure of , Unfortunately go There is no such library in the language community . But it's not a big problem , We make our own wheels without them .
The complete code of this article is implemented in hdt3213/delayqueue, Can directly go get Install and use .
The method of implementing delay queue using ordered set structure is well known , It is nothing more than an ordered collection of messages member, Post timestamp as score Use zrangebyscore The command searches for messages that have reached the delivery time and sends them to the consumer .
However, message queuing is not about sending messages to consumers , They also have an important responsibility to ensure delivery and consumption . The usual implementation is to return an acknowledgement to the message queue after the consumer receives the message (ack), If the consumer returns a negative confirmation (nack) Or timeout did not return , The message queue will resend according to the predetermined rules , Stop until the maximum number of retries is reached . How to achieve ack And retry mechanism are the key issues we should consider .
Our message queuing allows distributed deployment of multiple producers and consumers , The consumer instance executes regularly lua Script driven message flow in the queue does not need to deploy additional components . because Redis To ensure the lua Atomicity of script execution , There is no need to lock the whole process .
Consumers use pull mode to get messages , Ensure that each message is delivered at least once , Message queuing will retry messages that have timed out or have been negatively acknowledged (nack) Until the maximum number of retries is reached . At most one consumer is processing a message , Reduces concurrency issues to consider .
Please note that : If the consumption time exceeds MaxConsumeDuration The message queue will think that the consumption has timed out and redeliver , At this time, multiple consumers may consume at the same time .
The specific use is also very simple , Just register the callback function that processes the message and call start() that will do :
package main
import (
"github.com/go-redis/redis/v8"
"github.com/hdt3213/delayqueue"
"strconv"
"time"
)
func main() {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
// Register the callback function for processing messages
// return true Indicates that you have successfully consumed , return false The message queue will redeliver the message
return true
})
// Send a delay message
for i := 0; i < 10; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
if err != nil {
panic(err)
}
}
// start consume
done := queue.StartConsume()
<-done
}
Because the data is stored in redis So the best we can guarantee is redis No fault and message queue related key Messages will not be lost without external tampering .
The principle,
Message queuing involves several key redis data structure :
- msgKey: In order to avoid unexpected effects caused by two messages with exactly the same content , We put each message into a key of type string , And assign one UUID As its unique logo . Only... Is stored in other data structures UUID Instead of storing the complete message content . Every msg Have an independent key Instead of putting all the messages into a hash table, it is to take advantage of TTL Mechanism to avoid
- pendingKey: Ordered set type ,member For message ID, score For the delivery time unix Time stamp .
- readyKey: List the type , Messages to be delivered ID.
- unAckKey: Ordered set type ,member For message ID, score Of the retry time unix Time stamp .
- retryKey: List the type , Message that the retry time has expired ID
- garbageKey: Collection types , It is used to temporarily store messages that have reached the time limit to retry going online ID
- retryCountKey: Hash table type , Key is message ID, The value is the number of retries remaining
The flow is shown in the following figure :
Because we allow distributed deployment of multiple consumers , Every consumer is doing it regularly lua Script , Therefore, multiple consumers may be in different states in the above process , We can't predict ( Or control ) The sequence of the five operations in the above figure , There is no control over how many instances are performing the same operation .
Therefore, we need to ensure that the five operations in the figure above meet three conditions :
- It's all atomic
- The same message will not be processed repeatedly
- The message queue is always in the correct state before and after the operation
As long as these three conditions are met , We can deploy multiple instances without using distributed locking and other technologies for state synchronization .
Does it sound a little scary ? In fact, it's very simple , Let's take a closer look ~
pending2ReadyScript
pending2ReadyScript Use zrangebyscore Scan messages that have reached the delivery time ID And move them to ready in :
-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- from pending key Find out the messages that have reached the delivery time
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- Put them in ready key in
for _,v in ipairs(msgs) do
table.insert(args2, v)
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- from pending key Delete posted messages in
ready2UnackScript
ready2UnackScript from ready perhaps retry To send a message to the consumer and put it in unack in , Be similar to RPopLPush:
-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg
unack2RetryScript
unack2RetryScript from retry Find all messages that have reached the retry time and move them to unack in :
-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- A message was found that the retry time has expired
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- Query the number of retries remaining
for i,v in ipairs(retryCounts) do
local k = msgs[i]
if tonumber(v) > 0 then -- The remaining times are greater than 0
redis.call("HIncrBy", KEYS[2], k, -1) -- Reduce the number of retries remaining
redis.call("LPush", KEYS[3], k) -- Add to retry key in
else -- The number of retries remaining is 0
redis.call("HDel", KEYS[2], k) -- Delete retry count record
redis.call("SAdd", KEYS[4], k) -- Add to trash , Waiting for subsequent deletion
end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- Transfer processed messages from unack key Delete in
because redis requirement lua The script must be executed before KEYS Parameter to declare what you want to access key, And we will each msg There's a separate one key, We're executing unack2RetryScript I didn't know before msg key Need to be deleted . therefore lua The script only records the messages that need to be deleted in garbage key in , After the script is executed, you can pass del Order them to be deleted :
func (q *DelayQueue) garbageCollect() error {
ctx := context.Background()
msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
if err != nil {
return fmt.Errorf("smembers failed: %v", err)
}
if len(msgIds) == 0 {
return nil
}
// allow concurrent clean
msgKeys := make([]string, 0, len(msgIds))
for _, idStr := range msgIds {
msgKeys = append(msgKeys, q.genMsgKey(idStr))
}
err = q.redisCli.Del(ctx, msgKeys...).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("del msgs failed: %v", err)
}
err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("remove from garbage key failed: %v", err)
}
return nil
}
Previously mentioned lua Scripts are executed atomically , No other commands will be inserted into it . gc The function is defined by 3 strip redis Command composition , Other commands may be inserted during execution , However, considering that a message will not be revived after it enters the garbage collection process, there is no need to guarantee 3 Command atomicity .
ack
ack Just delete the message completely :
func (q *DelayQueue) ack(idStr string) error {
ctx := context.Background()
err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
if err != nil {
return fmt.Errorf("remove from unack failed: %v", err)
}
// msg key has ttl, ignore result of delete
_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
q.redisCli.HDel(ctx, q.retryCountKey, idStr)
return nil
}
A negative confirmation only requires that unack key The retry time of the message in is changed to now , Subsequent execution unack2RetryScript Will immediately move it to retry key
func (q *DelayQueue) nack(idStr string) error {
ctx := context.Background()
// update retry time as now, unack2Retry will move it to retry immediately
err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
Member: idStr,
Score: float64(time.Now().Unix()),
}).Err()
if err != nil {
return fmt.Errorf("negative ack failed: %v", err)
}
return nil
}
consume
The core logic of message queuing is executed once per second consume function , It is responsible for calling the above script to transfer the message to the correct collection and callback consumer To consume news :
func (q *DelayQueue) consume() error {
// perform pending2ready, Transfer the time expired message to ready
err := q.pending2Ready()
if err != nil {
return err
}
// Cycle call ready2Unack Pull messages for consumption
var fetchCount uint
for {
idStr, err := q.ready2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
// take nack Or put the timeout message into the retry queue
err = q.unack2Retry()
if err != nil {
return err
}
// Clean up messages that have reached the maximum number of retries
err = q.garbageCollect()
if err != nil {
return err
}
// Consumption retry queue
fetchCount = 0
for {
idStr, err := q.retry2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
return nil
}
So far, a simple and reliable delay queue is ready , Why don't you start trying it out ?
边栏推荐
- 《跟唐老师学习云网络》 - OpenStack网络实现
- Lesson 021: functions: lambda expressions | after class test questions and answers
- 2022年朝阳区科技创新课之“产品创新与成果转化”训练营活动圆满结束
- Summary of differences between localstorage, sessionstorage and cookies
- Jerry's near end tone change problem of opening four channel call [chapter]
- Lesson 018: function: flexible is powerful after class test questions and answers
- DACL output on Jerry's hardware, DAC output sound of left and right channels [chapter]
- Jerry's dynamic switching EQ document [chapter]
- Final review of scientific and technological literature of Shandong University (Personal Crash Course)
- Fegin的解析
猜你喜欢
![Kali2021 installing the rtl8188gu wireless network card [tl-wn726n] driver](/img/29/8dd188cc4e909562862b5f2c57c898.png)
Kali2021 installing the rtl8188gu wireless network card [tl-wn726n] driver

长安旗下阿维塔科技增资扩股落定:宁德时代将持股约24%!

7-1 creating a binary tree from a preorder sequence

第026讲:字典:当索引不好用时2 | 课后测试题及答案
![[513. find the value in the lower left corner of the tree]](/img/6d/b2ec8e3072a65c20c586941e6b2a85.png)
[513. find the value in the lower left corner of the tree]

Lesson 026: Dictionary: when the index is not easy to use 2 | after class test questions and answers
![DACL output on Jerry's hardware, DAC output sound of left and right channels [chapter]](/img/8a/ce164a5538bd8edf10eba5e4e8abe6.png)
DACL output on Jerry's hardware, DAC output sound of left and right channels [chapter]

Optimization solver | gurobi's Mvar class: a sharp tool for matrix modeling and an alternative solution to dual problems (with detailed cases and codes attached)

Leetcode daily question - 513 Find the value in the lower left corner of the tree

鸿蒙第三次培训
随机推荐
引入稀疏激活机制!Uni-Perceiver-MoE显著提升通才模型的性能
7-9 super Mary
2022年6月25日PMP考试通关宝典-6
Lesson 033: exception handling: you can't always be right 2 | after class test questions and answers
73- find the SQL example during the business peak period (report development class)
70 root cause analysis Oracle database sudden performance problems, who will take the blame
Share deadlock problems encountered in insert into select (project practice)
74- how to remedy the loss of Oracle to MySQL for this kind of SQL optimization?
Lesson 020: functions: embedded functions and closures | after class test questions and answers
The interception of Chinese and English strings in Oracle database is different
长安旗下阿维塔科技增资扩股落定:宁德时代将持股约24%!
Learning cloud network from teacher Tang - openstack network implementation
Lesson 030: file system: introduce a big thing | after class test questions and answers
HarmonyOS应用开发培训第二次
杰理之使用 DP 和 DM 做 IO 按键检测注意点【篇】
86- to attend & lt; SQL writing and rewriting training & gt; 's participants add a second-hand case
6-7 图的深度遍历-邻接表实现
7-9 超级玛丽
Implementation of breadth traversal adjacency matrix of 6-6 graph
IDC發布中國數據治理報告 億信華辰第一