当前位置:网站首页>Make a reliable delay queue with redis
Make a reliable delay queue with redis
2022-07-25 11:40:00 【Java collection】
Let's take a look at the following business scenarios :When the order has been unpaid , How to close the order in time , And return the inventory ?
New stores ,N No product uploaded in days , How does the system know that information , And send an activation message ?
The simplest and direct solution for the above scenario is to scan the table regularly . We assume that 10 If you don't pay for minutes, close the order 、 The scheduled task is set to 5 Minutes at a time , Then an order will be placed in 15 Minutes off . the height is 5 The error of minutes is unacceptable to business . On the other hand, frequent table scanning may consume too many database resources , Affect online transaction throughput .
In addition, there are friends to use Redis Expired notification for 、 Time wheel 、Java Of DelayQueue And so on . We discussed their shortcomings in previous articles : For example, use Redis Overdue notice does not guarantee punctuality 、 Forgetting when sending does not guarantee delivery , The time wheel lacks persistence mechanism and is easy to be lost .
To sum up , We have the following requirements for delay queues ( From important to unimportant ):
Persistence : The task cannot be lost when the service restarts or crashes
Confirm the retry mechanism : If the task processing fails or times out, there should be a retry
Timing should be as accurate as possible
The most suitable solution is to use Pulsa、RocketMQ And other professional message queue delay delivery function . However, the introduction of new middleware usually has various non-technical problems .Redis As a widely used middleware , Why not Redis To make a delay queue ?
The method of implementing delay queue using ordered set structure is well known , It is nothing more than an ordered collection of messages member Post timestamp as score, Use zrangebyscore The command searches for messages that have reached the delivery time and sends them to the consumer .
In addition to basic delayed delivery, our message queue has the following advantages :
Provide ACK And retry mechanism
It only needs Redis And consumers can run , No other components required
Provide At-Least-One Delivery semantics 、 And ensure that messages will not be consumed concurrently
The complete code of this article is implemented in hdt3213/delayqueue, Can directly go get github.com/hdt3213/delayqueue Complete the installation .
The specific use is also very simple , Just register the callback function that processes the message and call start() that will do :
package main
import (
"github.com/go-redis/redis/v8"
"github.com/hdt3213/delayqueue"
"strconv"
"time"
)
func main() {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
// Register the callback function for processing messages
// return true Indicates that you have successfully consumed , return false The message queue will redeliver the message
return true
})
// Send a delay message
for i := 0; i < 10; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
if err != nil {
panic(err)
}
}
// Start the consumption process
done := queue.StartConsume()
// Block waiting for the consumption process to exit
<-done
}Because the data is stored in redis So the best we can guarantee is redis No fault and message queue related key Messages will not be lost without external tampering .
The principle,
Message queuing involves several key redis data structure :
msgKey: In order to avoid unexpected effects caused by two messages with exactly the same content , We put each message into a key of type string , And assign one UUID As its unique logo . Only... Is stored in other data structures UUID Instead of storing the complete message content . Every msg Have an independent key Instead of putting all the messages into a hash table, it is to take advantage of TTL Mechanism to avoid
pendingKey: Ordered set type ,member For message ID, score For the delivery time unix Time stamp .
readyKey: List the type , Messages to be delivered ID.
unAckKey: Ordered set type ,member For message ID, score Of the retry time unix Time stamp .
retryKey: List the type , Message that the retry time has expired ID
garbageKey: Collection types , It is used to temporarily store messages that have reached the time limit to retry going online ID
retryCountKey: Hash table type , Key is message ID, The value is the number of retries remaining
The flow is shown in the following figure :
Because we allow distributed deployment of multiple consumers , Every consumer is doing it regularly lua Script , Therefore, multiple consumers may be in different states in the above process , We can't predict ( Or control ) The sequence of the five operations in the above figure , There is no control over how many instances are performing the same operation .
Therefore, we need to ensure that the five operations in the figure above meet three conditions :
It's all atomic
The same message will not be processed repeatedly
The message queue is always in the correct state before and after the operation
As long as these three conditions are met , We can deploy multiple instances without using distributed locking and other technologies for state synchronization .
Does it sound a little scary ? In fact, it's very simple , Let's take a closer look ~
pending2ReadyScript
pending2ReadyScript Use zrangebyscore Scan messages that have reached the delivery time ID And move them to ready in :
-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- from pending key Find out the messages that have reached the delivery time
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- Put them in ready key in
for _,v in ipairs(msgs) do
table.insert(args2, v)
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- from pending key Delete posted messages in ready2UnackScript
ready2UnackScript from ready perhaps retry To send a message to the consumer and put it in unack in , Be similar to RPopLPush:
-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msgunack2RetryScript
unack2RetryScript from retry Find all messages that have reached the retry time and move them to unack in , Interview treasure :https://www.yoodb.com Free questions , Soon to go online :
-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- A message was found that the retry time has expired
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- Query the number of retries remaining
for i,v in ipairs(retryCounts) do
local k = msgs[i]
if tonumber(v) > 0 then -- The remaining times are greater than 0
redis.call("HIncrBy", KEYS[2], k, -1) -- Reduce the number of retries remaining
redis.call("LPush", KEYS[3], k) -- Add to retry key in
else -- The number of retries remaining is 0
redis.call("HDel", KEYS[2], k) -- Delete retry count record
redis.call("SAdd", KEYS[4], k) -- Add to trash , Waiting for subsequent deletion
end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- Transfer processed messages from unack key Delete in because redis requirement lua The script must be executed before KEYS Parameter to declare what you want to access key, And we will each msg There's a separate one key, We're executing unack2RetryScript I didn't know before msg key Need to be deleted . therefore lua The script only records the messages that need to be deleted in garbage key in , After the script is executed, you can pass del Order them to be deleted :
func (q *DelayQueue) garbageCollect() error {
ctx := context.Background()
msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
if err != nil {
return fmt.Errorf("smembers failed: %v", err)
}
if len(msgIds) == 0 {
return nil
}
// allow concurrent clean
msgKeys := make([]string, 0, len(msgIds))
for _, idStr := range msgIds {
msgKeys = append(msgKeys, q.genMsgKey(idStr))
}
err = q.redisCli.Del(ctx, msgKeys...).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("del msgs failed: %v", err)
}
err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("remove from garbage key failed: %v", err)
}
return nil
} Previously mentioned lua Scripts are executed atomically , No other commands will be inserted into it .gc The function is defined by 3 strip redis Command composition , Other commands may be inserted during execution , However, considering that a message will not be revived after it enters the garbage collection process, there is no need to guarantee 3 Command atomicity .ack
ack Just delete the message completely :
func (q *DelayQueue) ack(idStr string) error {
ctx := context.Background()
err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
if err != nil {
return fmt.Errorf("remove from unack failed: %v", err)
}
// msg key has ttl, ignore result of delete
_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
q.redisCli.HDel(ctx, q.retryCountKey, idStr)
return nil
}A negative confirmation only requires that unack key The retry time of the message in is changed to now , Subsequent execution unack2RetryScript Will immediately move it to retry key
func (q *DelayQueue) nack(idStr string) error {
ctx := context.Background()
// update retry time as now, unack2Retry will move it to retry immediately
err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
Member: idStr,
Score: float64(time.Now().Unix()),
}).Err()
if err != nil {
return fmt.Errorf("negative ack failed: %v", err)
}
return nil
}consume
The core logic of message queuing is executed once per second consume function , It is responsible for calling the above script to transfer the message to the correct collection and callback consumer To consume news :
func (q *DelayQueue) consume() error {
// perform pending2ready, Transfer the time expired message to ready
err := q.pending2Ready()
if err != nil {
return err
}
// Cycle call ready2Unack Pull messages for consumption , public Number Java selected
var fetchCount uint
for {
idStr, err := q.ready2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
// take nack Or put the timeout message into the retry queue
err = q.unack2Retry()
if err != nil {
return err
}
// Clean up messages that have reached the maximum number of retries
err = q.garbageCollect()
if err != nil {
return err
}
// Consumption retry queue
fetchCount = 0
for {
idStr, err := q.retry2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
return nil
} Fold So far, a simple and reliable delay queue is ready , Why don't you start trying it out ?
author :Finley
https://www.cnblogs.com/Finley/p/16400287.html
official account “Java selected ” The published content indicates the source of , All rights reserved ( Those whose copyright cannot be verified or whose source is not indicated all come from the Internet , Reprinted , The purpose of reprinting is to convey more information , The copyright belongs to the original author . If there is any infringement , Please contact the , The author will delete the first time ! Many people have asked recently , Is there a reader exchange group ! The way to join is simple , official account Java selected , reply “ Add group ”, You can join the group !
Java Interview questions ( Wechat applet ):3000+ The road test questions , contain Java Basics 、 Concurrent 、JVM、 Threads 、MQ series 、Redis、Spring series 、Elasticsearch、Docker、K8s、Flink、Spark、 Architecture design, etc , Brush questions online at any time !
------ Special recommendation ------ Special recommendation : Focus on the most cutting-edge information and technology sharing , Official account for preparing for overtaking on curves and various open source projects and efficient software ,「 Big coffee notes 」, Focus on finding good things , It's worth your attention . Click the official account card below to follow .
If the article helps , Click to see , Forward! !边栏推荐
- 只知道预制体是用来生成物体的?看我如何使用Unity生成UI预制体
- [dynamic planning] 70. Climbing stairs
- 模型部署简述
- Greedy problem 01_ Activity arrangement problem
- Learn NLP with Transformer (Chapter 1)
- Breadth first traversal (problems related to sequence traversal of graphs and binary trees)
- Getting started with redis
- SQL language (V)
- shell-第八章练习
- 贪心问题01_活动安排问题
猜你喜欢

WIZnet嵌入式以太网技术培训公开课(免费!!!)

工作面试总遇秒杀?看了京东T8大咖私藏的秒杀系统笔记,已献出膝盖

矩阵的特征值和特征向量

Leetcode sword finger offer 28. symmetric binary tree
苹果美国宣布符合销售免税假期的各州产品清单细节

一篇看懂:IDEA 使用scala 编写wordcount程序 并生成jar包 实测

论文解读(MaskGAE)《MaskGAE: Masked Graph Modeling Meets Graph Autoencoders》

同事看了我的代码惊呼:居然是这么在Unity中用单例的

Learn NLP with Transformer (Chapter 1)

JDBC summary
随机推荐
Dataframe print ellipsis problem
从宏观到微观 零基础 详解bert
LVS load balancing lvs-nat building Web Cluster
Shell Chapter 7 exercise
[IJCAI 2022] parameter efficient large model sparse training method, which greatly reduces the resources required for sparse training
shell-第五章作业
flinksql client 连接kafka select * from table没有数据报错,如何解决?
RedisUtil
城市雕塑典型作品信息管理系统(图片分享系统SSM)
Activity registration | play with kubernetes container service improvement class officially opened!
SQL injection less23 (filter comment)
shell- 第七章练习
PostgreSQL stepping on the pit | error: operator does not exist: UUID = character varying
How to judge the performance of static code quality analysis tools? These five factors must be considered
Understanding: idea uses Scala to write wordcount programs and generate jar packages
圆角大杀器,使用滤镜构建圆角及波浪效果!
基于Caffe ResNet-50网络实现图片分类(仅推理)的实验复现
硬件外设=maixpy3
让运动自然发生,FITURE打造全新生活方式
Fillet big killer, use filter to build fillet and wave effect!