当前位置:网站首页>Deep parsing and implementation of redis stream advanced message queue [10000 words]
Deep parsing and implementation of redis stream advanced message queue [10000 words]
2022-06-25 13:24:00 【Liu Java】
Keep creating , Accelerate growth ! This is my participation 「 Nuggets day new plan · 6 Yuegengwen challenge 」 Of the 24 God , Click to see the event details
In detail Redis 5.0 Version of the new data structure Stream How and how to use it , How to achieve a more reliable message queue .
Stream summary
be based on Reids There are many kinds of message queuing implementations for , For example, based on PUB/SUB( subscribe / Release ) Pattern 、 be based on List Of PUSH and POP Implementation of a series of commands 、 be based on Sorted-Set The implementation of the . Although they all have their own characteristics , such as List Support blocking message acquisition ,Pub/Sub Support message multicast ,Sorted Set Delay messages are supported , But they have too many shortcomings :
- Redis List No message multicast , No, ACK Mechanism , No repeated consumption, etc .
- Redis Pub/Sub Messages cannot be persisted , Just send , If there is a network disconnect 、Redis Downtime, etc , The news is gone , There is no such thing as ACK Mechanism .
- Redis Sorted Set Blocking get message is not supported 、 No repeat consumption is allowed 、 Grouping is not supported .
Redis Stream It is Redis 5.0 Version of the new data structure .Redis Stream It is mainly used to implement message queue (MQ,Message Queue), It can be said to be the latest Redis edition (6.2) The most perfect message queue implementation in .
Redis Stream It has the following functions :
- Provides blocking for consumers and consumer groups 、 Non blocking function of getting messages .
- Provides the function of message multicast , The same message can be distributed to multiple single consumers and consumer groups ;
- Provides the function of message persistence , It allows any consumer to access historical news at any time ;
- Provides a powerful consumer group function :
- A consumer group can realize the ability of multiple consumers in the same group to consume messages in parallel without repeating , Increase consumption power .
- The consumer group can remember the latest consumption information , Ensure continuous consumption of messages ;
- The consumer group can remember the number of message transfers , Realize the message transfer of consumption failure retry and permanent failure .
- The consumer group can remember the number of message transfers , Thus, the function of dead letter message can be realized ( You need to do it yourself ).
- The consumer group offers PEL Unconfirmed list and ACK Acknowledgement mechanism , Ensure that messages are successfully consumed , No loss ;
Redis Stream Basically, it can meet all your needs for message queuing .
2 Stream The basic structure
Redis Stream Like a message linked list with only additional content , String all the added messages one by one , Every message has a unique ID And content , It also comes from Kafka Drawing on another concept : Consumer group (Consumer Group), This makes Redis Stream Become more complicated .
Redis Stream The structure is as follows :
Every Stream All have unique names , It is Redis Of key, In our first use XADD Automatically create when the command appends a message .
- Consumer Group: Consumer group , The consumer group recorded Starem The state of **, Use XGROUP CREATE Command to create manually , In the same Stream The name of the internal consumer group is unique . A consumer group can have multiple consumers (Consumer) Simultaneous intra group consumption , All consumers share Stream All information in , But only one consumer will consume the same message to , Different consumers will consume Stream Different messages in , This can be applied in distributed scenarios to ensure the uniqueness of message consumption .
- last_delivered_id: The cursor , Used to record a consumer group in Stream Consumer location information on **, Each consumer group will have a cursor , Any consumer reading a message will make the cursor last_delivered_id Move forward . When creating a consumer group, you need to specify from Stream Which news of ID( Which position ) Start spending , Data before this location will be ignored , It is also used to initialize last_delivered_id This variable . This last_delivered_id Generally speaking, it is the latest consumption news ID.
- pending_ids: State variables within consumers , The function is to maintain the unconfirmed messages of consumers ID.pending_ids Records that have been read by the client , But not yet ack (Acknowledge character: Confirmation character ) The news of . The purpose is to ensure that the client consumes the message at least once , It will not be lost in the middle of network transmission without processing the message . If the client does not ack, So the message in this variable ID More and more , Once a message is ack, It will start to decrease correspondingly . This variable is also called Redis Officially called PEL (Pending Entries List).
3 Store the data
Use XADD
Command to add a message to Stream At the end of ,Stream Each message is more than just a string , Instead, it consists of one or more field value pairs .XADD
And the only way to Stream Add data to Redis command , But there are other orders , for example XDEL and XTRIM
, It can be downloaded from Stream Delete data from .
complete XADD The grammar is : XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
The first necessary parameter is key Name , If key Corresponding Stream non-existent , Automatically create ,key Add later NOMKSTREAM The command can disable the automatic creation of Stream.
A message consists of a set of field value pairs , It's basically a small dict Dictionaries . Key value pairs will be stored in the order given by the user , Read Stream The order of ( Such as XRANGE or XREAD) Ensure that the returned fields and values are in the same order as XADD Add in exactly the same order .
The second required parameter is to represent Stream Unique name of the current message in the ID,Stream Every message in the has a unique ID,XADD The command also returns the of the added message ID. If specified in the command ID Parameter is * character , that XADD The command will automatically generate a unique ID.
stay key and ID after , The following necessary parameters are the key value pairs that make up our message .
The following cases , To be named xx Of Stream Insert a message into :
127.0.0.1:6379> XADD xx * name xiaoming age 22
"1624458068086-0"
Use XLEN
You can view it Stream Number of messages in :
127.0.0.1:6379> XLEN xx
(integer) 1
3.1 Entry ID
Automatically generated ID The format is :
<millisecondsTime>-<sequenceNumber>
At present “ Millisecond time stamp - Serial number ”, It indicates that the current message is time stamped in milliseconds millisecondsTime Produced , And it's the sequenceNumber+1 Bar message . In this format ID Meet the characteristics of self increment , Support range lookup .
ID It can also be defined by the client itself , But the form has to be " Integers - Integers ", Minimum ID by 0-1, And what's more, I'll add the following message ID Must be bigger than the previous message ID, If not greater than , Then an exception will be returned :
127.0.0.1:6379> XADD xx 123-123 name xiaoming age 22
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
Thus, the auto generated ID, Then, manually specified later ID It must also be better than the previously automatically generated ID Bigger , such as :
127.0.0.1:6379> XADD xx 1624458068096-0 name xiaoming age 22
"1624458068096-0"
3.2 Quantitative restriction
If messages accumulate too much , that Stream The linked list will be very long , It's a big problem for memory . and XDEL
The instruction will not actually delete the message , It just flags the message .
We can use some designations to Stream Do real pruning , Limit its maximum length . Use alone XTRIM Instructions can also be used to Stream Limit , It can specify MAXLEN Parameters , Is used to specify the Stream Maximum length of , The length after the message exceeds MAXLEN when , Will automatically clear the oldest messages , Make sure that the specified length is not exceeded at most .
add to 3 individual Stream Elements :
127.0.0.1:6379> XADD yy * a1 b1
"1624460262356-0"
127.0.0.1:6379> XADD yy * a2 b2
"1624460267913-0"
127.0.0.1:6379> XADD yy * a3 b3
"1624460273296-0"
127.0.0.1:6379> XRANGE yy - +
1) 1) "1624460262356-0"
2) 1) "a1"
2) "b1"
2) 1) "1624460267913-0"
2) 1) "a2"
2) "b2"
3) 1) "1624460273296-0"
2) 1) "a3"
2) "b3"
Use XTRIM
Limit up to two :
127.0.0.1:6379> XTRIM yy MAXLEN = 2
(integer) 1
127.0.0.1:6379> XRANGE yy - +
1) 1) "1624460267913-0"
2) 1) "a2"
2) "b2"
2) 1) "1624460273296-0"
2) 1) "a3"
2) "b3"
You can see , The oldest element was eliminated .XADD
Instructions also have XTRIM
The function of , It can control the number of elements while adding elements , Its optional parameters MAXLEN, When a message is added, the length exceeds MAXLEN when , Will automatically clear the oldest messages , Make sure that the specified length is not exceeded at most .
127.0.0.1:6379> XADD yy MAXLEN 2 * a4 b4
"1624460537583-0"
127.0.0.1:6379> XRANGE yy - +
1) 1) "1624460273296-0"
2) 1) "a3"
2) "b3"
2) 1) "1624460537583-0"
2) 1) "a4"
2) "b4"
Use MAXLEN
The cost of precise trimming options is huge ,Stream To save memory space , A special structure is used to represent , And the adjustment of this kind of structure needs extra expenses . So we can use it “~” To represent inexact pruning , It will ensure that there will be at least specified N Data , Maybe more .
for example , Call... In the following form XADD:
ADD mystream MAXLEN ~ 1000 * ... entry fields here ...
The above command will add a new element , But it will also expel the old elements , In order to Stream Will contain only 1000 Elements , Or at most dozens of elements .
4 get data
from Stream There are many ways to obtain data in :
- The most basic is that a single client blocks or non blocks to get messages .
- Redis Stream It also supports consumer groups to obtain , Each consumer in the consumer group will consume different messages , This draws on kafka Characteristics of consumer groups .
- Based on self increment ID Characteristics of ,Redis Stream It also supports getting messages by time range , It also supports the use of cursor iteration messages to incrementally check all undetermined historical consumption records .
Redis Stream Support all the above query modes through different commands .
4.1 Range queries
Use XRANGE
and XREVRANGE
The command implements the forward and reverse range query of messages .
To query by range Stream, We only need to specify two ID, A beginning and an end . There are two special ones ID:- and + , Represents the possible minimum and maximum, respectively ID.
The following cases , Inquire about Stream All the news in :
127.0.0.1:6379> XRANGE xx - +
1) 1) "1624458068086-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "22"
2) 1) "1624458068096-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "22"
The return is an array of two elements :ID And a list of field key value pairs .
because ID The first part of is unix Time stamp , Therefore, it is especially suitable for range finding , also ID Do not pass sequence value part , This is allowed . If no sequence value is passed , Then the beginning of the scope will assume ID The sequence value of is 0, In the end, it will be assumed that ID The sequence number is the maximum available sequence number .
127.0.0.1:6379> XRANGE xx 1624458068086 1624458068087
1) 1) "1624458068086-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "22"
XRANGE Finally, an optional COUNT Options , Use this option to specify before returning N A message .
127.0.0.1:6379> XRANGE xx - + COUNT 1
1) 1) "1624458068086-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "22"
By default , The scope contains two endpoints , Can be in the first ID Before using “(” To exclude the first endpoint value .
127.0.0.1:6379> XRANGE xx 1624458068086 1624458068096
1) 1) "1624458068086-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "22"
2) 1) "1624458068096-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "22"
Use “(” after :
127.0.0.1:6379> XRANGE xx (1624458068086 1624458068096
1) 1) "1624458068096-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "22"
because XRANGE The complexity of O(log(N)) To find , then O(M) Come back to M Elements , So with very few counts , This command has logarithmic time complexity , This means that every step of the iteration is fast . therefore XRANGE It can also be used as a stream iterator , Unwanted XSCAN command (Redis Not provided XSCAN).
command XREVRANGE And XRANGE equivalent , But the elements are returned in reverse order , Two ID The order of parameters is also the opposite , therefore XREVRANGE The practical use of is to check Stream What is the last item in the :
127.0.0.1:6379> XREVRANGE xx + - COUNT 1
1) 1) "1624458068096-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "22"
4.2 Independent consumption news
Usually what we want is a subscription to arrive Stream New news from , Instead of not wanting to press Stream Scope access messages in . That is to produce - Consumption patterns , This concept may be related to Redis Pub/Sub or Redis Block list , But in the use of Stream There are fundamental differences in the way .
One Stream There can be multiple clients ( consumer ) Waiting for data . By default , Each new message will be delivered to the given Stream Every consumer waiting for data in . This behavior is different from the blocking list that each consumer will get different messages , however , The ability to deliver to multiple consumers is similar to Pub/Sub.
stay Pub/Sub in , As soon as the news is released, it is discarded , No matter what , When using blocking lists , When the client receives a message , The message will pop up from the list ( Effectively delete ), but Stream Work in a completely different way . All messages are stored indefinitely in Stream in ( Unless the user explicitly requests to delete the message ): Different consumers will remember the last message they received ID To guide what is the latest news .
XREAD
Command provides listening to one or more Stream The ability to new news , Return only ID Greater than the last received... Passed by the caller ID The news of .
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
4.2.1 Non blocking use
If not used BLOCK Options , Then the command is synchronized , It can be thought that it is related to XRANGE It's a little related : It will return a series of items in the stream , But even if we only consider synchronous usage , It is associated with XRANGE There are also two fundamental differences :
- If we want to start from multiple at the same time key Read from , You can pass multiple... When calling this command Stream Of key. This is a XREAD A key function of , Because especially when using BLOCK When blocking , Ability to listen to multiple messages over a single connection key Is an important function .
- XRANGE Often used to return two ID A series of messages in the scope , but XREAD More suitable for getting from a ID The first series of messages , This message may be the result of any other message we have so far ID All big , That is, consume the latest news from the front to the back .XREAD Often used for iterations Stream The news of , So we pass it on to XREAD It's usually the last time we started from this Stream Of the last message received ID.
XREAD The simple use of is as follows , There are two streams xx and yy, And I want to read data from these two streams starting with the first element they contain , You can call... Like the following example XREAD:
127.0.0.1:6379> XREAD STREAMS xx yy 0 0
1) 1) "xx"
2) 1) 1) "1624458068086-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "22"
2) 1) "1624458068096-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "22"
2) 1) "yy"
2) 1) 1) "1624460273296-0"
2) 1) "a3"
2) "b3"
2) 1) "1624460537583-0"
2) 1) "a4"
2) "b4"
In the command STREAMS
Options are necessary , And it must be the final option , Because this option is followed by the corresponding Stream The beginning of ID:
STREAMS key_1 key_2 key_3 ... key_N ID_1 ID_2 ID_3 ... ID_N
In the case above , The starting point we wrote ID All are 0( incomplete ID It works , Rules and XRANGE equally ), So we hope to start from Stream xx and Stream yy Of all messages obtained in ID Is greater than 0-0( Will not contain passed ID).
We can also add... Before COUNt N
Options , Means at most from each Stream Back in N A message :
127.0.0.1:6379> XREAD COUNT 1 STREAMS xx yy 0 0
1) 1) "xx"
2) 1) 1) "1624458068086-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "22"
2) 1) "yy"
2) 1) 1) "1624460273296-0"
2) 1) "a
In the example above , The stream we received xx and yy The last message received ID by 1624458068086-0 and 1624460273296-0, So the next iteration , What we pass on is ID That's the last message ID:
127.0.0.1:6379> XREAD COUNT 1 STREAMS xx yy 1624458068086-0 1624460273296-0
1) 1) "xx"
2) 1) 1) "1624458068096-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "22"
2) 1) "yy"
2) 1) 1) "1624460537583-0"
2) 1) "a4"
Final , When Stream Iterated ( Sequential consumption ) At the end of the day , The call does not return any messages , It's just an empty array , If you want to get the latest news at any time , Then we must constantly retry the operation , Therefore, the command also supports blocking mode .
4.2.2 Use of blocking
The above non blocking usage and XRANGE It doesn't seem to make much difference , The interesting part is that we can specify BLOCK Optional parameters make it easy to XREAD Convert to blocking command , This command can be executed according to the specified Stream and ID To block , And in the requested key One automatically unblocks after receiving data .
It is important to , If there are multiple, use this command to wait for the same Stream In the same ID Range of clients , Then every consumer will get a copy of the data , This is different from what happens when you use the pop-up operation of the blocking list , Each consumer in the blocking list will get a different message .
You can specify the blocking timeout , In milliseconds , If you deliver 0, It means permanent blocking until the of any one Stream There's data coming back . If there is no timeout condition after the message arrives , This command automatically returns null.
Even if it passes BLOCK command , But at least in passing Stream When there is matching data in one that can be returned immediately , This command will be executed synchronously , It's like missing BLOCK Options as .
in addition , When it's blocked , Sometimes we just want to receive traffic from the moment of blocking XADD Messages added to the stream , We are not interested in the history of added messages . For this demand , We can use special “$”ID Signal the flow , Shows that we only need the latest news , This is also usually the most commonly used .Redis Will check by default Stream The biggest news so far in ID, And in XREAD Use this... On the command line ID.
The following cases , Our blocked monitor xx and yy Two Stream, Are all used “$” Monitor the latest data , And the timeout is 10000 millisecond , namely 10 second .
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS xx yy $ $
1) 1) "yy"
2) 1) 1) "1624506208518-0"
2) 1) "a5"
2) "b5"
(2.88s)
When using “$” After listening to the latest data , Of the next instruction ID You should pass the largest... Returned this time ID.
Similar to blocking list operation , From the perspective of the client waiting for data , Blocking Stream Reading is fair , Because semantics is FIFO style . When new projects are available , The first block is given Stream The client will be the first to unblock .
XREAD
except COUNT and BLOCK There are no other options , Therefore, it is a very basic command , Have a specific purpose , Consumers can be directly used to monitor one or more streams . Use consumer groups API You can get more powerful functions of using flow , But reading through the consumer group is by the name XREADGROUP
Implemented by different commands .
4.3 Consumer group
4.3.1 Basic concepts
XREAD
One or more consumers can monitor Stream, When data arrives , The data that meets the conditions will be returned to the consumer to buy the same copy , Realize message multicast . But sometimes , What we may need is not to provide the same message flow for multiple different consumers or clients , But from the same Stream Provides different subsets of messages to many clients in .
An obviously useful example of this is dealing with slow messages , By way of Stream Different messages in are routed to ready to receive Stream Different threads to extend the ability of message processing .
What we want is different consumer spending Stream Different data in , This looks a bit like a blocking list . To achieve this ,Redis Stream Used a group called consumer (consumer groups) The concept of . The name clearly draws on kafka, But from an implementation perspective ,Redis Consumer groups and Kafka The consumer group has nothing to do with it .kafka The consumers in the consumer group also need to correspond to the partition , and Redis Consumers in the consumer group are equivalent to directly from Stream Get message from .
A consumer group is like one from one Stream Fake consumers who get data from , Actually serve multiple consumers , Distribute the obtained message to multiple different consumers , also Provide a certain guarantee :
- Each message is provided to a different consumer , Therefore, it is impossible to deliver the same message to multiple consumers in the same group .
- In the consumer group , Consumers are identified by name , The name is a case sensitive string that must be provided by the client implementing the consumer . therefore , Even after disconnecting , The streaming consumer group still retains the status of all consumers , The client can be declared as the same consumer again .
- Every consumer group has the first one who doesn't spend ID The concept of :last_delivered_id, So when a consumer requests a new message , It can only provide previously undelivered messages .
- The consumption message needs to be explicitly acknowledged with a specific command , namely ack. This indicates that the message has been handled correctly , Therefore, it can be removed from the consumer group .
- The consumer group tracks all currently unavailable items ack The news of , That is, a message that has been delivered to a consumer of the consumer group but has not been confirmed as processed . Because of this function , During a visit to Stream Message history for , Every consumer will only see the message delivered to it .
A group of consumers can also understand Stream A status record of , Or say Stream An auxiliary data structure , Obviously Stream There can be multiple consumer groups , These consumer groups can have different consumer sets .
actually , It is even possible to let clients pass through without consumer groups in the same stream XREAD Read , Clients in different consumer groups pass XREADGROUP Command read .
Relevant commands of the consumer group :
XGROUP
Used to create 、 Destroy and manage consumer groups .XREADGROUP
Used to get from... Through consumer groups Stream Read messages from .XACK
Allows consumers to mark pending messages as properly processed .
4.3.2 Create consumer groups
XGROUP
The command is very powerful , The command template is :
XGROUP [CREATE key groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]
XGROUP Commands are used to manage and Stream The consumer group associated with the data structure . What you can do :
- Create a new consumer group associated with the flow .
- Destroy a consumer group .
- Remove a specific consumer from a consumer group .
- Put the last_delivered_id Set other values to .
adopt XGROUP CREATE
You can create a Stream The consumer group of , The start message must be delivered ID Parameters are used to initialize last_delivered_id Variable , In this way, the consumer group can know what message to provide when the first consumer connects , The last message when the group was just created ID What is it? .
A simple example is as follows :
127.0.0.1:6379> XGROUP CREATE yy mygroup 0
OK
If we offer “” Represents the current largest message in the stream ID, So specified “$” Will have the effect of consuming only new news .
XGROUP CREATE It also supports automatic creation Stream( If it doesn't exist ), Use optional MKSTREAM
Subcommand as the last argument :
127.0.0.1:6379> XGROUP CREATE yyy mygroup $ MKSTREAM
OK
4.3.3 Consume from the consumer group
We can use XREADGROUP
The command reads the message through the consumer group .
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
XREADGROUP And XREAD Very similar , Provide the same BLOCK Blocking options , Otherwise, it is a synchronization command . however , You must always specify a mandatory option , namely GROUP, And there are two parameters : The name of the consumer group and the name of the consumer trying to read . Options COUNT Also supported and associated with XREAD The options in are the same .
The consumer name is the string used by the client to identify itself in the group .Redis When you find a consumer with a new name , It will be automatically created in the corresponding consumer group , Different clients should choose different consumer names .
Stream yy There is 4 Data :
127.0.0.1:6379> XRANGE yy - +
1) 1) "1624516905844-0"
2) 1) "a3"
2) "b3"
2) 1) "1624516910389-0"
2) 1) "a4"
2) "b4"
3) 1) "1624516914709-0"
2) 1) "a5"
2) "b5"
4) 1) "1624516919774-0"
2) 1) "a6"
2) "b6"
We create a consumer and consumer group mygroup A message from :
127.0.0.1:6379> XREADGROUP GROUP mygroup c1 COUNT 1 STREAMS yy >
1) 1) "yy"
2) 1) 1) "1624516905844-0"
2) 1) "a3"
2) "b3"
In the above order . stay STREAMS
After the option , Requested ID It's special ID “>”. This special ID Valid only in the context of consumer groups , It means : Get so far , Messages never delivered to other consumers , And will update last_delivered_id, This parameter is usually passed .
ID You can also specify 0、 other ID Or incomplete ID( Time stamp part ), But in that case ,Stream Only those that have been passed to the current consumer and have not been XACK Certain historical news , That is, the internal of the consumer pending_ids aggregate , under these circumstances ,BLOCK and NOACK All ignored .
We are interested in the consumer group mygroup Create another consumer c2, And consume another piece of data :
127.0.0.1:6379> XREADGROUP GROUP mygroup c2 COUNT 1 STREAMS yy >
1) 1) "yy"
2) 1) 1) "1624516910389-0"
2) 1) "a4"
2) "b4"
It will return to a4-b4, because a3-b3 Has been by... In the same group a3-b3 The consumption .
XREADGROUP There are several characteristics :
- Consumers are automatically created in time for the first time , No need to explicitly create .
- Multiple can be read at the same time key, But make it work , Need to be in every Stream Create a consumer group with the same name in .
- XREADGROUP It's writing commands , Because even from Stream Read from , The consumer group will also modify last_delivered_id, So only in Main instance On the call .
5 Permanent failure recovery
In the above consumption introduction , When the consumer restarts due to a failure , Can pass XREADGROUP
And specify at the end ID by 0 To get all the unused items assigned to the consumer ACK The news of , Realize that messages are not lost .
However , In the real world , Consumers may fail forever and never recover . What about the pending messages of consumers that will never recover after being stopped for any reason ?Redis The consumer group of supports reassignment of messages that have been allocated but have not been processed .
5.1 XPENDING View unprocessed messages
First , We need to use XPENDING
Command to get information about unprocessed messages in a consumer group , This command is read-only and can be used safely :
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
The simplest use is as follows :
127.0.0.1:6379> XPENDING yy mygroup
1) (integer) 3
2) "1624516905844-0"
3) "1624516914709-0"
4) 1) 1) "c1"
2) "1"
2) 1) "c2"
2) "2"
A total of four kinds of data are output , The first line is the total number of messages to be confirmed in the current consumer group , second 、 The third line is the lowest and highest of the pending messages ID, Finally, the list of consumers and their number of pending messages . You can see c1 and c2 There are one and two without ACK The news of .
The above is the simplest output , By providing start and end ID( It can deliver - and +) And the count of the amount of information returned by the control command can iterate over the unprocessed message , We can learn more about pending messages .
127.0.0.1:6379> XPENDING yy mygroup - + 10
1) 1) "1624516905844-0"
2) "c1"
3) (integer) 1422027
4) (integer) 1
2) 1) "1624516910389-0"
2) "c2"
3) (integer) 1354378
4) (integer) 1
3) 1) "1624516914709-0"
2) "c2"
3) (integer) 929737
4) (integer) 1
The returned format is an array , Each element contains four attributes . The first 1 One is not ACK The news of ID, The first 2 One is the name of all current consumers of the message , The first 3 Is the number of milliseconds since this message was last delivered to this user IDLE, The first 4 Is the number of times this message was delivered , When other consumers use XCLAIM When transferring a message , Or when consumers pass XREADGROUP Check ACK Historical news of , It will increase .
If we want to limit the output to pending messages for a given consumer , Then use the optional final parameter , Consumer name :
127.0.0.1:6379> XPENDING yy mygroup - + 10 c1
1) 1) "1624516905844-0"
2) "c1"
3) (integer) 3060966
4) (integer) 1
And then you can use XRANGE To pass ID Get the details of each message :
127.0.0.1:6379> XRANGE yy 1624516905844-0 1624516905844-0
1) 1) "1624516905844-0"
2) 1) "a3"
2) "b3"
5.2 XCLAIM Transfer message
Now we know which consumers have which ones don't ACK The news of , Suppose the consumer c1 Never recover , We need to c1 Unprocessed messages are transferred to c2, Now we can start the second step , Need to be used XCLAIM
command . stay Stream In the context of consumer groups ,XCLAIM Command to change the ownership of unprocessed messages , Therefore, the new owner is the consumer specified as the command parameter .
XCLAIM Usually used for permanent failure recovery :
- There is a... With associated consumer groups Stream.
- Some consumers A In the context of this consumer group, pass XREADGROUP from Stream Read messages from .
- Every time you get a message , It will be in the pending message list of the consumer (PEL) Create a pending message element in , This means that the message has been delivered to a given consumer , But it has not yet passed XACK confirm .
- Due to some circumstances , Suddenly the consumer went offline forever .
- At this time, other consumers can use XPENDING List of pending messages to be checked by the command , In order to continue processing such messages , Then use XCLAIM Take ownership of the message and continue . from Redis 6.2 Start , Consumers can use XAUTOCLAIM Command automatically scans and declares stale pending messages .
XCLAIM The command format is as follows :
XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]
consumer For the purpose of consumers , In addition, you need to specify IDLE, That is, the time when the message was not processed ( Minimum idle time ), So that only when the idle time of the above message is greater than the specified idle time , This operation will work .
Appoint IDLE The purpose of is to prevent two clients from retrying to declare a message at the same time :
Client 1: XCLAIM yy mygroup c2 3060966 1624516905844-0
Client 2: XCLAIM yy mygroup c3 3060966 1624516905844-0
Because of the execution XCLAIM after , Of this message IDLE Will be reset , And will increase the number of revolutions count , Therefore, the second client will not be able to claim to transfer it again .
Next c1 Transfer your message to c2:
127.0.0.1:6379> XCLAIM yy mygroup c2 3060966 1624516905844-0
1) 1) "1624516905844-0"
2) 1) "a3"
2) "b3"
Again using XPENDING see c2:
127.0.0.1:6379> XPENDING yy mygroup - + 10
1) 1) "1624516905844-0"
2) "c2"
3) (integer) 86472
4) (integer) 2
2) 1) "1624516910389-0"
2) "c2"
3) (integer) 3292893
4) (integer) 1
3) 1) "1624516914709-0"
2) "c2"
3) (integer) 2868252
4) (integer) 1
You can find 1624516905844-0 This message has been transferred to c2 了 , Now? c2 Can pass XREAD Re traversal PEL To process the message .
XCLAIM
The command will return the details of the corresponding message , If you don't need to , We can add JUSTID Parameter to return only successfully declared messages ID, This helps to reduce the occupation of network bandwidth .
5.3 XAUTOCLAIM Automatic transfer
XPENDING and XCLAIM It provides basic steps for different types of recovery mechanisms .Redis 6.2
Added in XAUTOCLAIM
The command is by letting Redis Manage it to optimize common processes , And provide simple solutions for most recovery needs .
XAUTOCLAIM
Identify idle pending messages and transfer their ownership to the designated Consumer ,XAUTOCLAIM It is equivalent to calling XPENDING, And then call XCLAIM.
XAUTOCLAIM key group consumer min-idle-time start [COUNT count] [JUSTID]
therefore , We can use XAUTOCLAIM To declare the transfer of a message , Make a statement c3, from mygroup Pull a message from :
127.0.0.1:6379> XREADGROUP GROUP mygroup c3 STREAMS yy >
1) 1) "yy"
2) 1) 1) "1624516919774-0"
2) 1) "a6"
2) "b6"
Use XAUTOCLAIM take c3 The message in is transferred to c2, As shown below :
127.0.0.1:6379> XAUTOCLAIM yy mygroup c2 100 1624516919773 COUNT 1
1) "0-0"
2) 1) 1) "1624516919774-0"
2) 1) "a6"
2) "b6"
The specified millisecond value is also the idle time ,ID Represents the smallest message ID Not exactly ID,COUNT N Indicates the maximum number of transfers N Bar message . This command also returns the details of the message , Use JUSTID You can just return ID. When XAUTOCLAIM return “0-0”Stream ID As , This means that it reaches the end of the list of pending items in the consumer group .
6 Dead letter queue
stay XPENDING The fourth parameter observed in the output is the number of deliveries per message .
127.0.0.1:6379> XPENDING yy mygroup - + 10
1) 1) "1624516905844-0"
2) "c2"
3) (integer) 129041
4) (integer) 5
2) 1) "1624516910389-0"
2) "c2"
3) (integer) 1224177
4) (integer) 2
3) 1) "1624516914709-0"
2) "c2"
3) (integer) 1224177
4) (integer) 2
4) 1) "1624516919774-0"
2) "c2"
3) (integer) 59913
4) (integer) 2
The counter is incremented in two ways : adopt XCLAIM When the message is successfully transferred , Use XREADGROUP When accessing the history of pending messages .
When something goes wrong , The message will be delivered many times , This is normal , But eventually they are usually processed and confirmed . however , There may be problems processing certain messages , Because there is a problem with its own data, it leads to an exception in the processing code . under these circumstances , Consumers will continue to be unable to process this particular message .
Because we can use this counter to detect messages that cannot be processed for some reason , This kind of news is called bad news ( Also known as dead letter ,DeadLetter, Undeliverable messages ). therefore , Once the transfer counter reaches the given threshold , Put such messages into another Stream And it may be wiser to send an email .
This is basically Redis Streams The way to realize the concept of dead letter .
7 Stream monitor
Redis Support various commands to monitor Stream Information about , We have already introduced XPENDING
command , It allows us to check the list of messages being processed at a given time , And their free time and transfer times .
When we want to get other information , We can use XINFO
command ,XINFO Command is an observable interface , Can be used with subcommands to get information about Stream Or consumer group information .
XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
XINFO Related calls to :
XINFO STREAM <key>
: Show about Stream Information about .XINFO STREAM <key> FULL [COUNT <count>]
: The command returns Stream Details of the entire state of , Including news 、 Group 、 Consumer and pending message list (PEL) Information , Similar to a combination of several commands .XINFO GROUPS <key>
: Get information about all consumer groups associated with the stream .XINFO CONSUMERS <key> <group>
: Get information about each consumer in a specific consumer group .
for example ,XINFO STREAM Show about Stream The message itself :
127.0.0.1:6379> XINFO STREAM yy
1) "length"
2) (integer) 4
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1624516919774-0"
9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1624516905844-0"
2) 1) "a3"
2) "b3"
13) "last-entry"
14) 1) "1624516919774-0"
2) 1) "a6"
2) "b6"
It shows about Stream How to encode information internally ,Stream Is based on RadixTree
Data structure . It also shows Stream The first and last message in . It also shows this Stream Number of associated consumer groups , We can dig further , Get more information about consumer groups :
127.0.0.1:6379> XINFO GROUPS yy
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 3
5) "pending"
6) (integer) 4
7) "last-delivered-id"
8) "1624516919774-0"
It shows Stream yy All group Information about , Including the name 、 Number of consumers 、 Number of unacknowledged messages 、last-delivered-id( The cursor ).
Then we can check the status of a particular consumer group in more detail by checking the consumers registered in the group .
127.0.0.1:6379> XINFO CONSUMERS yy mygroup
1) 1) "name"
2) "c1"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 6414621
2) 1) "name"
2) "c2"
3) "pending"
4) (integer) 4
5) "idle"
6) (integer) 974923
3) 1) "name"
2) "c3"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 1255452
8 removal message
Stream There is also a special command , For from Stream Pass through ID removal message . Usually for additional data structures only , For example, for other message queues , This may look like a strange function , But it's actually useful for applications involving privacy regulations .
127.0.0.1:6379> XRANGE yy - +
1) 1) "1624516905844-0"
2) 1) "a3"
2) "b3"
2) 1) "1624516910389-0"
2) 1) "a4"
2) "b4"
3) 1) "1624516914709-0"
2) 1) "a5"
2) "b5"
4) 1) "1624516919774-0"
2) 1) "a6"
2) "b6"
Use XDEL
And transmission Stream The name of , Followed by... To be deleted ID:
127.0.0.1:6379> XDEL yy 1624516919774-0
(integer) 1
Again using XRANGE:
127.0.0.1:6379> XRANGE yy - +
1) 1) "1624516905844-0"
2) 1) "a3"
2) "b3"
2) 1) "1624516910389-0"
2) 1) "a4"
2) "b4"
3) 1) "1624516914709-0"
2) 1) "a5"
2) "b5"
But at the moment Redis6.2 In the implementation of , In fact, I was deceived from Stream When deleting a message in , The message was not really removed , Just marked as deleted , Use XPENDING It can also be seen that it still exists in PEL in . Final , If all messages are marked as deleted , Then all messages will be destroyed and memory recycled .
stay Redis In future versions of , If a given number of deleted messages are reached , Node garbage collection may be triggered . But now Stream It's not much use , And the implementation is complex , This function does not provide .
9 Zero length Stream
Stream And others Redis The difference between data structures is , When other data structures no longer have any elements ,key Itself will be deleted . for example , When calling ZREM Will delete ZSET When the last element in , The... Will be completely deleted ZSET.
Due to the use of zero count MAXLEN
Options (XADD and XTRIM command ), Or because you called XDEL,Stream Is allowed to remain at the zero element .
The reason Stream So special , Because Stream May be related to consumer group, We don't want to because Stream There is no more news in and lose consumer group Defined states .
at present Redis6.2 In the version of the , Even if there is no associated consumer group ,Stream It will not be deleted , But that may change in the future .
10 ACK confirm
When the message passes through the call XREADGROUP
When passed on to a consumer , Or when the consumer gets the call XCLAIM
Ownership of the message , The message is pending , Will be stored in the consumer's pending message list (Pending Entries List,PEL) in .PEL That's what I said at the beginning pending_ids attribute .
Although the pending message has been delivered to a consumer , But the server is not sure if it has been handled correctly at least once . Yes XREADGROUP Call and pass the specific ID value ( For example, using ID 0), You can get the message history of consumers , That is, it will return PEL The messages in the . It can also be done through XPENDING
Command lists the list of pending messages PEL.
XACK
Command from the Stream Consumers' PEL Delete one or more messages from .
XACK key group ID [ID ...]
Once the consumer has successfully processed a message , It should call XACK, In this way, the message will not be processed again , And about this news PEL Records are also cleared , from Redis Server freeing memory .
127.0.0.1:6379> XPENDING yy mygroup - + 10
1) 1) "1624516905844-0"
2) "c2"
3) (integer) 2858416
4) (integer) 5
2) 1) "1624516910389-0"
2) "c2"
3) (integer) 3953552
4) (integer) 2
3) 1) "1624516914709-0"
2) "c2"
3) (integer) 3953552
4) (integer) 2
4) 1) "1624516919774-0"
2) "c2"
3) (integer) 2789288
4) (integer) 2
127.0.0.1:6379> XACK yy mygroup 1624516905844-0
(integer) 1
127.0.0.1:6379> XPENDING yy mygroup - + 10
1) 1) "1624516910389-0"
2) "c2"
3) (integer) 4751015
4) (integer) 2
2) 1) "1624516914709-0"
2) "c2"
3) (integer) 4751015
4) (integer) 2
3) 1) "1624516919774-0"
2) "c2"
3) (integer) 3586751
4) (integer) 2
This command returns the number of successfully confirmed messages . Some news ID May no longer be PEL Part of the list ( For example, because they have been confirmed ), that XACK They are not counted as successful confirmations .
11 summary
Redis Stream Based on memory storage , Its speed is compared with that of real message queues, such as kafka、rocketmq Wait faster , But it's also because of memory , We can't use Redis Stream Store a lot of data for a long time , Because memory is much more expensive than disk . in addition ,Redis Stream There is no ability to provide delayed messages .
although Redis Stream The function of message queue has been very powerful , But because “ Memory based ” This Redis Most important advantages , Lead to Redis Stream Unable to store large amount of data ( This requires a lot of memory ), So so far ,Redis Stream There are not many applications in production environment , It is more suitable for small 、 Cheap apps , And scenarios where data can be discarded ( Limit Stream length ), For example, record some unimportant operation logs .
at present (Redis 6.2) Version of Redis Stream It seems to be in the process of improvement , Look forward to more powerful new functions in the future .
Related articles :
If you need to communicate , Or the article is wrong , Please leave a message directly . In addition, I hope you will like it 、 Collection 、 Focus on , I will keep updating all kinds of Java Learning blog !
边栏推荐
- Implementation of a small book system
- On the simple realization of Sanzi chess game and the method of judging the victory of n-zi chess
- Nova组件源码分析之冷迁移与Resize
- How unity makes the UI intercept click events
- Fedora 35 部署DNS主从和分离解析 —— 筑梦之路
- Sword finger offer day 2 linked list (simple)
- C# 切换中英文输入法
- 语法'陷阱'
- 字符串入门十八讲合集四
- Sword finger offer II 029 Sorted circular linked list
猜你喜欢
Maui的学习之路(二)--设置
Drago Education - typescript learning
Optimization of lazyagg query rewriting in parsing data warehouse
About data storage in memory
Stockage des données en mémoire
[turn] starting from the end, analyze in detail how to fill in the college entrance examination volunteer
Prototype and prototype chain - constructor and instanceof
Which Chinese virtual human is better? Sullivan, IDC: Xiaobing Baidu Shangtang ranks in the first echelon
关于一道教材题的讲解
.NET in China - What's New in .NET
随机推荐
关于扫雷的简易实现
Golang keyboard input statement scanln scanf code example
学习编程的起点。
Optimization of lazyagg query rewriting in parsing data warehouse
On the simple realization of Sanzi chess game and the method of judging the victory of n-zi chess
leetcode:918. 环形子数组的最大和【逆向思维 + 最大子数组和】
Stockage des données en mémoire
An article clearly explains MySQL's clustering / Federation / coverage index, back to table, and index push down
Sword finger offer II 032 Effective anagrams
关于数据在内存中存储的相关例题
API in Nova
Detailed explanation of string operation functions and memory functions
Pointer, it has to say that the subject
OpenStack学习笔记(一)
Prototype relationship between constructor and instance (1)
. NET in China - What's New in . NET
字符串各操作函数与内存函数详解
關於數據在內存中的存儲下
How to configure aliases for typescript + vite projects
About data storage in memory