当前位置:网站首页>A hundred lines of code to realize reliable delay queue based on redis

A hundred lines of code to realize reliable delay queue based on redis

2022-06-22 21:56:00 Finley

Keep creating , Accelerate growth ! This is my participation 「 Nuggets day new plan · 6 Yuegengwen challenge 」 Of the 28 God , Click to see the event details

In the previous article on delay queues, we mentioned redisson delayqueue Use redis The delay queue is implemented by the ordered set structure of , Unfortunately go There is no such library in the language community . But it's not a big problem , We make our own wheels without them .

The complete code of this article is implemented in hdt3213/delayqueue, Can directly go get Install and use .

The method of implementing delay queue using ordered set structure is well known , It is nothing more than an ordered collection of messages member, Post timestamp as score Use zrangebyscore The command searches for messages that have reached the delivery time and sends them to the consumer .

However, message queuing is not about sending messages to consumers , They also have an important responsibility to ensure delivery and consumption . The usual implementation is to return an acknowledgement to the message queue after the consumer receives the message (ack), If the consumer returns a negative confirmation (nack) Or timeout did not return , The message queue will resend according to the predetermined rules , Stop until the maximum number of retries is reached . How to achieve ack And retry mechanism are the key issues we should consider .

Our message queuing allows distributed deployment of multiple producers and consumers , The consumer instance executes regularly lua Script driven message flow in the queue does not need to deploy additional components . because Redis To ensure the lua Atomicity of script execution , There is no need to lock the whole process .

Consumers use pull mode to get messages , Ensure that each message is delivered at least once , Message queuing will retry messages that have timed out or have been negatively acknowledged (nack) Until the maximum number of retries is reached . At most one consumer is processing a message , Reduces concurrency issues to consider .

Please note that : If the consumption time exceeds MaxConsumeDuration The message queue will think that the consumption has timed out and redeliver , At this time, multiple consumers may consume at the same time .

The specific use is also very simple , Just register the callback function that processes the message and call start() that will do :

package main

import (
	"github.com/go-redis/redis/v8"
	"github.com/hdt3213/delayqueue"
	"strconv"
	"time"
)

func main() {
	redisCli := redis.NewClient(&redis.Options{
		Addr: "127.0.0.1:6379",
	})
	queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
		//  Register the callback function for processing messages 
        //  return  true  Indicates that you have successfully consumed , return  false  The message queue will redeliver the message 
		return true
	})
	//  Send a delay message 
	for i := 0; i < 10; i++ {
		err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
		if err != nil {
			panic(err)
		}
	}

	// start consume
	done := queue.StartConsume()
	<-done
}

Because the data is stored in redis So the best we can guarantee is redis No fault and message queue related key Messages will not be lost without external tampering .

The principle,

Message queuing involves several key redis data structure :

  • msgKey: In order to avoid unexpected effects caused by two messages with exactly the same content , We put each message into a key of type string , And assign one UUID As its unique logo . Only... Is stored in other data structures UUID Instead of storing the complete message content . Every msg Have an independent key Instead of putting all the messages into a hash table, it is to take advantage of TTL Mechanism to avoid
  • pendingKey: Ordered set type ,member For message ID, score For the delivery time unix Time stamp .
  • readyKey: List the type , Messages to be delivered ID.
  • unAckKey: Ordered set type ,member For message ID, score Of the retry time unix Time stamp .
  • retryKey: List the type , Message that the retry time has expired ID
  • garbageKey: Collection types , It is used to temporarily store messages that have reached the time limit to retry going online ID
  • retryCountKey: Hash table type , Key is message ID, The value is the number of retries remaining

The flow is shown in the following figure :

Because we allow distributed deployment of multiple consumers , Every consumer is doing it regularly lua Script , Therefore, multiple consumers may be in different states in the above process , We can't predict ( Or control ) The sequence of the five operations in the above figure , There is no control over how many instances are performing the same operation .

Therefore, we need to ensure that the five operations in the figure above meet three conditions :

  1. It's all atomic
  2. The same message will not be processed repeatedly
  3. The message queue is always in the correct state before and after the operation

As long as these three conditions are met , We can deploy multiple instances without using distributed locking and other technologies for state synchronization .

Does it sound a little scary ? In fact, it's very simple , Let's take a closer look ~

pending2ReadyScript

pending2ReadyScript Use zrangebyscore Scan messages that have reached the delivery time ID And move them to ready in :

-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  --  from  pending key  Find out the messages that have reached the delivery time 
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} --  Put them in  ready key  in 
for _,v in ipairs(msgs) do
	table.insert(args2, v) 
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  --  from  pending key  Delete posted messages in 

ready2UnackScript

ready2UnackScript from ready perhaps retry To send a message to the consumer and put it in unack in , Be similar to RPopLPush:

-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg

unack2RetryScript

unack2RetryScript from retry Find all messages that have reached the retry time and move them to unack in :

-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  --  A message was found that the retry time has expired 
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) --  Query the number of retries remaining 
for i,v in ipairs(retryCounts) do
	local k = msgs[i]
	if tonumber(v) > 0 then --  The remaining times are greater than  0
		redis.call("HIncrBy", KEYS[2], k, -1) --  Reduce the number of retries remaining 
		redis.call("LPush", KEYS[3], k) --  Add to  retry key  in 
	else --  The number of retries remaining is  0
		redis.call("HDel", KEYS[2], k) --  Delete retry count record 
		redis.call("SAdd", KEYS[4], k) --  Add to trash , Waiting for subsequent deletion 
	end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  --  Transfer processed messages from  unack key  Delete in 

because redis requirement lua The script must be executed before KEYS Parameter to declare what you want to access key, And we will each msg There's a separate one key, We're executing unack2RetryScript I didn't know before msg key Need to be deleted . therefore lua The script only records the messages that need to be deleted in garbage key in , After the script is executed, you can pass del Order them to be deleted :

func (q *DelayQueue) garbageCollect() error {
	ctx := context.Background()
	msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
	if err != nil {
		return fmt.Errorf("smembers failed: %v", err)
	}
	if len(msgIds) == 0 {
		return nil
	}
	// allow concurrent clean
	msgKeys := make([]string, 0, len(msgIds))
	for _, idStr := range msgIds {
		msgKeys = append(msgKeys, q.genMsgKey(idStr))
	}
	err = q.redisCli.Del(ctx, msgKeys...).Err()
	if err != nil && err != redis.Nil {
		return fmt.Errorf("del msgs failed: %v", err)
	}
	err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
	if err != nil && err != redis.Nil {
		return fmt.Errorf("remove from garbage key failed: %v", err)
	}
	return nil
}

Previously mentioned lua Scripts are executed atomically , No other commands will be inserted into it . gc The function is defined by 3 strip redis Command composition , Other commands may be inserted during execution , However, considering that a message will not be revived after it enters the garbage collection process, there is no need to guarantee 3 Command atomicity .

ack

ack Just delete the message completely :

func (q *DelayQueue) ack(idStr string) error {
	ctx := context.Background()
	err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
	if err != nil {
		return fmt.Errorf("remove from unack failed: %v", err)
	}
	// msg key has ttl, ignore result of delete
	_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
	q.redisCli.HDel(ctx, q.retryCountKey, idStr)
	return nil
}

A negative confirmation only requires that unack key The retry time of the message in is changed to now , Subsequent execution unack2RetryScript Will immediately move it to retry key

func (q *DelayQueue) nack(idStr string) error {
	ctx := context.Background()
	// update retry time as now, unack2Retry will move it to retry immediately
	err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
		Member: idStr,
		Score:  float64(time.Now().Unix()),
	}).Err()
	if err != nil {
		return fmt.Errorf("negative ack failed: %v", err)
	}
	return nil
}

consume

The core logic of message queuing is executed once per second consume function , It is responsible for calling the above script to transfer the message to the correct collection and callback consumer To consume news :

func (q *DelayQueue) consume() error {
	//  perform  pending2ready, Transfer the time expired message to  ready
	err := q.pending2Ready()
	if err != nil {
		return err
	}
	//  Cycle call  ready2Unack  Pull messages for consumption 
	var fetchCount uint
	for {
		idStr, err := q.ready2Unack()
		if err == redis.Nil { // consumed all
			break
		}
		if err != nil {
			return err
		}
		fetchCount++
		ack, err := q.callback(idStr)
		if err != nil {
			return err
		}
		if ack {
			err = q.ack(idStr)
		} else {
			err = q.nack(idStr)
		}
		if err != nil {
			return err
		}
		if fetchCount >= q.fetchLimit {
			break
		}
	}
	//  take  nack  Or put the timeout message into the retry queue 
	err = q.unack2Retry()
	if err != nil {
		return err
	}
    //  Clean up messages that have reached the maximum number of retries 
	err = q.garbageCollect()
	if err != nil {
		return err
	}
	//  Consumption retry queue 
	fetchCount = 0
	for {
		idStr, err := q.retry2Unack()
		if err == redis.Nil { // consumed all
			break
		}
		if err != nil {
			return err
		}
		fetchCount++
		ack, err := q.callback(idStr)
		if err != nil {
			return err
		}
		if ack {
			err = q.ack(idStr)
		} else {
			err = q.nack(idStr)
		}
		if err != nil {
			return err
		}
		if fetchCount >= q.fetchLimit {
			break
		}
	}
	return nil
}

So far, a simple and reliable delay queue is ready , Why don't you start trying it out ?

原网站

版权声明
本文为[Finley]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206222022071784.html