当前位置:网站首页>Make a reliable delay queue with redis
Make a reliable delay queue with redis
2022-07-25 11:40:00 【Java collection】
Let's take a look at the following business scenarios :When the order has been unpaid , How to close the order in time , And return the inventory ?
New stores ,N No product uploaded in days , How does the system know that information , And send an activation message ?
The simplest and direct solution for the above scenario is to scan the table regularly . We assume that 10 If you don't pay for minutes, close the order 、 The scheduled task is set to 5 Minutes at a time , Then an order will be placed in 15 Minutes off . the height is 5 The error of minutes is unacceptable to business . On the other hand, frequent table scanning may consume too many database resources , Affect online transaction throughput .
In addition, there are friends to use Redis Expired notification for 、 Time wheel 、Java Of DelayQueue And so on . We discussed their shortcomings in previous articles : For example, use Redis Overdue notice does not guarantee punctuality 、 Forgetting when sending does not guarantee delivery , The time wheel lacks persistence mechanism and is easy to be lost .
To sum up , We have the following requirements for delay queues ( From important to unimportant ):
Persistence : The task cannot be lost when the service restarts or crashes
Confirm the retry mechanism : If the task processing fails or times out, there should be a retry
Timing should be as accurate as possible
The most suitable solution is to use Pulsa、RocketMQ And other professional message queue delay delivery function . However, the introduction of new middleware usually has various non-technical problems .Redis As a widely used middleware , Why not Redis To make a delay queue ?
The method of implementing delay queue using ordered set structure is well known , It is nothing more than an ordered collection of messages member Post timestamp as score, Use zrangebyscore The command searches for messages that have reached the delivery time and sends them to the consumer .
In addition to basic delayed delivery, our message queue has the following advantages :
Provide ACK And retry mechanism
It only needs Redis And consumers can run , No other components required
Provide At-Least-One Delivery semantics 、 And ensure that messages will not be consumed concurrently
The complete code of this article is implemented in hdt3213/delayqueue, Can directly go get github.com/hdt3213/delayqueue Complete the installation .
The specific use is also very simple , Just register the callback function that processes the message and call start() that will do :
package main
import (
"github.com/go-redis/redis/v8"
"github.com/hdt3213/delayqueue"
"strconv"
"time"
)
func main() {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
// Register the callback function for processing messages
// return true Indicates that you have successfully consumed , return false The message queue will redeliver the message
return true
})
// Send a delay message
for i := 0; i < 10; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
if err != nil {
panic(err)
}
}
// Start the consumption process
done := queue.StartConsume()
// Block waiting for the consumption process to exit
<-done
}Because the data is stored in redis So the best we can guarantee is redis No fault and message queue related key Messages will not be lost without external tampering .
The principle,
Message queuing involves several key redis data structure :
msgKey: In order to avoid unexpected effects caused by two messages with exactly the same content , We put each message into a key of type string , And assign one UUID As its unique logo . Only... Is stored in other data structures UUID Instead of storing the complete message content . Every msg Have an independent key Instead of putting all the messages into a hash table, it is to take advantage of TTL Mechanism to avoid
pendingKey: Ordered set type ,member For message ID, score For the delivery time unix Time stamp .
readyKey: List the type , Messages to be delivered ID.
unAckKey: Ordered set type ,member For message ID, score Of the retry time unix Time stamp .
retryKey: List the type , Message that the retry time has expired ID
garbageKey: Collection types , It is used to temporarily store messages that have reached the time limit to retry going online ID
retryCountKey: Hash table type , Key is message ID, The value is the number of retries remaining
The flow is shown in the following figure :
Because we allow distributed deployment of multiple consumers , Every consumer is doing it regularly lua Script , Therefore, multiple consumers may be in different states in the above process , We can't predict ( Or control ) The sequence of the five operations in the above figure , There is no control over how many instances are performing the same operation .
Therefore, we need to ensure that the five operations in the figure above meet three conditions :
It's all atomic
The same message will not be processed repeatedly
The message queue is always in the correct state before and after the operation
As long as these three conditions are met , We can deploy multiple instances without using distributed locking and other technologies for state synchronization .
Does it sound a little scary ? In fact, it's very simple , Let's take a closer look ~
pending2ReadyScript
pending2ReadyScript Use zrangebyscore Scan messages that have reached the delivery time ID And move them to ready in :
-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- from pending key Find out the messages that have reached the delivery time
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- Put them in ready key in
for _,v in ipairs(msgs) do
table.insert(args2, v)
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- from pending key Delete posted messages in ready2UnackScript
ready2UnackScript from ready perhaps retry To send a message to the consumer and put it in unack in , Be similar to RPopLPush:
-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msgunack2RetryScript
unack2RetryScript from retry Find all messages that have reached the retry time and move them to unack in , Interview treasure :https://www.yoodb.com Free questions , Soon to go online :
-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- A message was found that the retry time has expired
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- Query the number of retries remaining
for i,v in ipairs(retryCounts) do
local k = msgs[i]
if tonumber(v) > 0 then -- The remaining times are greater than 0
redis.call("HIncrBy", KEYS[2], k, -1) -- Reduce the number of retries remaining
redis.call("LPush", KEYS[3], k) -- Add to retry key in
else -- The number of retries remaining is 0
redis.call("HDel", KEYS[2], k) -- Delete retry count record
redis.call("SAdd", KEYS[4], k) -- Add to trash , Waiting for subsequent deletion
end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- Transfer processed messages from unack key Delete in because redis requirement lua The script must be executed before KEYS Parameter to declare what you want to access key, And we will each msg There's a separate one key, We're executing unack2RetryScript I didn't know before msg key Need to be deleted . therefore lua The script only records the messages that need to be deleted in garbage key in , After the script is executed, you can pass del Order them to be deleted :
func (q *DelayQueue) garbageCollect() error {
ctx := context.Background()
msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
if err != nil {
return fmt.Errorf("smembers failed: %v", err)
}
if len(msgIds) == 0 {
return nil
}
// allow concurrent clean
msgKeys := make([]string, 0, len(msgIds))
for _, idStr := range msgIds {
msgKeys = append(msgKeys, q.genMsgKey(idStr))
}
err = q.redisCli.Del(ctx, msgKeys...).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("del msgs failed: %v", err)
}
err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("remove from garbage key failed: %v", err)
}
return nil
} Previously mentioned lua Scripts are executed atomically , No other commands will be inserted into it .gc The function is defined by 3 strip redis Command composition , Other commands may be inserted during execution , However, considering that a message will not be revived after it enters the garbage collection process, there is no need to guarantee 3 Command atomicity .ack
ack Just delete the message completely :
func (q *DelayQueue) ack(idStr string) error {
ctx := context.Background()
err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
if err != nil {
return fmt.Errorf("remove from unack failed: %v", err)
}
// msg key has ttl, ignore result of delete
_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
q.redisCli.HDel(ctx, q.retryCountKey, idStr)
return nil
}A negative confirmation only requires that unack key The retry time of the message in is changed to now , Subsequent execution unack2RetryScript Will immediately move it to retry key
func (q *DelayQueue) nack(idStr string) error {
ctx := context.Background()
// update retry time as now, unack2Retry will move it to retry immediately
err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
Member: idStr,
Score: float64(time.Now().Unix()),
}).Err()
if err != nil {
return fmt.Errorf("negative ack failed: %v", err)
}
return nil
}consume
The core logic of message queuing is executed once per second consume function , It is responsible for calling the above script to transfer the message to the correct collection and callback consumer To consume news :
func (q *DelayQueue) consume() error {
// perform pending2ready, Transfer the time expired message to ready
err := q.pending2Ready()
if err != nil {
return err
}
// Cycle call ready2Unack Pull messages for consumption , public Number Java selected
var fetchCount uint
for {
idStr, err := q.ready2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
// take nack Or put the timeout message into the retry queue
err = q.unack2Retry()
if err != nil {
return err
}
// Clean up messages that have reached the maximum number of retries
err = q.garbageCollect()
if err != nil {
return err
}
// Consumption retry queue
fetchCount = 0
for {
idStr, err := q.retry2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
return nil
} Fold So far, a simple and reliable delay queue is ready , Why don't you start trying it out ?
author :Finley
https://www.cnblogs.com/Finley/p/16400287.html
official account “Java selected ” The published content indicates the source of , All rights reserved ( Those whose copyright cannot be verified or whose source is not indicated all come from the Internet , Reprinted , The purpose of reprinting is to convey more information , The copyright belongs to the original author . If there is any infringement , Please contact the , The author will delete the first time ! Many people have asked recently , Is there a reader exchange group ! The way to join is simple , official account Java selected , reply “ Add group ”, You can join the group !
Java Interview questions ( Wechat applet ):3000+ The road test questions , contain Java Basics 、 Concurrent 、JVM、 Threads 、MQ series 、Redis、Spring series 、Elasticsearch、Docker、K8s、Flink、Spark、 Architecture design, etc , Brush questions online at any time !
------ Special recommendation ------ Special recommendation : Focus on the most cutting-edge information and technology sharing , Official account for preparing for overtaking on curves and various open source projects and efficient software ,「 Big coffee notes 」, Focus on finding good things , It's worth your attention . Click the official account card below to follow .
If the article helps , Click to see , Forward! !边栏推荐
- Convert string to number
- 一篇看懂:IDEA 使用scala 编写wordcount程序 并生成jar包 实测
- SQL language (II)
- 用 Redis 做一个可靠的延迟队列
- Redis 入门
- 相似矩阵,可对角化条件
- [tree] 100. Same tree
- Dynamic planning question 05_ Missile interception
- MySQL | GROUP_ The concat function concatenates the values of a column with commas
- My colleague looked at my code and exclaimed: how can I use a singleton in unity
猜你喜欢

活动报名 | 玩转 Kubernetes 容器服务提高班正式开营!

SQL language (II)

There is a newline problem when passing shell script parameters \r

同事看了我的代码惊呼:居然是这么在Unity中用单例的

Small program of vegetable distribution in community

新能源销冠宏光MINIEV,有着怎样的产品力?

Job interviews are always a second kill? After reading the seckill system notes secretly stored by JD T8, I have given my knees

SQL language (III)

Talking about Devops monitoring, how does the team choose monitoring tools?

小微企业智能名片管理小程序
随机推荐
SQL language (V)
Shell - Chapter 6 exercise
Compressed list ziplist of redis
Introduction to shortcut keys in debug chapter
The most complete detailed tutorial on importing ad into lichuanyuan device packaging Library in history (always white and always cool)
Talking about Devops monitoring, how does the team choose monitoring tools?
leetcode 剑指 Offer 28. 对称的二叉树
Redis 入门
Reflection reflection
Detailed explanation of the implementation method of DNS separation and resolution
【电子器件笔记5】二极管参数和选型
There is a newline problem when passing shell script parameters \r
基于cornerstone.js的dicom医学影像查看浏览功能
SQL注入 Less17(报错注入+子查询)
基于MATLAB的常见线性调制方法
Small and micro enterprise smart business card management applet
Getting started with redis
绘图==PYQT5
Why should the hashcode () method be rewritten when rewriting the equals () method
Learn NLP with Transformer (Chapter 1)