当前位置:网站首页>Rocket message storage

Rocket message storage

2022-06-26 00:06:00 Just put a flower in heaven and earth

Rocket Storage file

RocketMQ The stored files mainly include CommitLog file 、ConsumeQueue file 、Index file .RocketMQ Store messages for all topics in the same file , Make sure that files are written in order when messages are sent , Try your best to ensure high performance and throughput of message sending . Because message oriented middleware is generally a subscription mechanism based on message topic , Therefore, it brings great inconvenience to retrieve messages according to message topics . In order to improve the efficiency of message consumption ,RocketMQ Introduced ConsumeQueue Message consumption queue file , Each message subject contains multiple message consumption queues , Each message queue has a message file .Index The design concept of index file is to accelerate the retrieval performance of messages , From the CommitLog Quick index message in file .

RocketMQ The storage path is ${ROCKET_HOME}/store, The main storage folders are as follows :

  • commitlog: Message store directory
  • config: Some configuration information during operation , It mainly includes the following information .
    • consumerFilter.json: Subject message filtering information .
    • consumerOffset.json: Message consumption progress under cluster consumption mode .
    • delayOffset.json: Delay message queue pull progress .
    • subscriptionGroup.json: Configuration information of the message consumption group
    • topics.json:topic Configuration properties
  • consumequeue: Message consumption queue storage directory .
  • index: Message index file storage directory .
  • abort: If there is abort file , explain Broker Abnormal shutdown , By default, this file is started Broker Created on , Delete... Before normal exit .
  • checkpoint: Checkpoint file , Storage CommitLog Time stamp of the last disk swiping of the file 、ConsumeQueue The last time to brush the disc 、index Time stamp of the last disk swiping of the file .

Commitlog file

RocketMQ In the process of message writing, pursue the ultimate disk sequential writing , All messages on all topics are sent to a file , namely CommitLog file . All messages are appended to... In order of arrival CommitLog In file , Once the message is written , Modification not supported .CommitLog The file feature is that the length of each message is different .

CommitLog The default storage directory of the file is ${ROCKET_HOME}/store/commitlog, It can be done by broker Set... In the configuration file storePathRootDir Property to change the default path .CommitLog The default file size is 1GB, May pass through broker Set... In the configuration file mapedFileSizeCommitLog Property to change the default size .

A similar relational database will introduce one for each data ID Field , File based programming also introduces an identity for each message : Message physical offset , That is, the message is stored at the beginning of the file .

It is with the concept of physical offset ,CommitLog The file naming method is also very skillful , Use the first message stored in the file throughout CommitLog Filegroups Named after the offset in . The advantage of this is to give the physical offset of any message , You can find by dichotomy , Quickly locate the location of this file , Then subtract the name of the file with the message physical offset , The difference is the absolute address in the file .

in addition , File name length is 20 position , Left complement 0, Remaining as start offset . Messages are mainly written to log files sequentially , When the file is full , Write next file .

It should be noted that ,CommitLog The design concept of the document is to pursue the ultimate message writing , But we know that the message consumption model is based on the topic subscription mechanism , That is, a consumption group is to consume messages with specific topics . From... According to the theme CommitLog Retrieve message from file , This is definitely not a good idea , In this way, only the first message of the file can be retrieved one by one , Its performance is not optimistic , In order to solve the problem based on topic Message retrieval problem ,RocketMQ Introduced ConsumeQueue file .

ConsumeQueue file

ConsumeQueue The file is the message consumption queue file , yes CommitLog The document is based on topic The index file of , The news arrived CommitLog After the document , Forward asynchronously to ConsumeQueue In file , Mainly used by consumers according to topic News consumption , Its organization is /topic/queue/file, The specific storage path is ¥HOME/store/consumequeue/{topic}/{queueId}/{fileName}.

ConsumeQueue The design of the document is also very characteristic , Each entry has a fixed length 20 byte (8 byte CommitLog Physical offset 、4 Byte message length 、8 byte tag Hash code ), A single file has 30 Ten thousand items , Every ConsumeQueue File size is 5.72MB. This is not storage tag Original string of , Instead, it stores hashcodes , The aim is to ensure that the length of each entry is fixed , You can quickly locate entries by accessing array subscripts , Greatly improved ConsumeQueue File reading performance . Single ConsumeQueue The file can be seen as a ConsumeQueue Array of entries , Its subscript is ConsumeQueue The logical offset of , The offset of message consumption progress storage is the logical offset . Message consumers according to topic、 News consumption progress (ConsumeQueue Logical offset )、 The first few ConsumeQueue entry , Such consumption progress to access messages , By logical offset logicOffset*20, You can find the starting offset of the entry (ConsumeQueue Offset in file ), Then read the offset 20 Bytes to get an entry , No traversal ConsumeQueue file .

ConsumeQueue The file building mechanism is When the message arrives CommitLog After the document , The message forwarding task is generated by a special thread , To build ConsumeQueue The document is the same as that mentioned below Index file .

How to find messages by message logical offset ?
answer : according to startIndex Get message consumption queue entries . adopt startIndex*20 Get in ConsumeQueue Physical offset in file , If the offset is less than minLogicOffset, Then return to null, This indicates that the message has been deleted , If it is greater than minLogicOffset, Then locate the specific physical file according to the offset . The offset in the file is obtained by taking the modulus between the offset and the size of the physical file , Read continuously from the offset 20 Only bytes .

How to find messages by message storage time ?
answer : Tentatively

RocketMQ And Kafka It has a strong advantage over , Namely It supports retrieving messages by message attributes , introduce ConsumeQueue The file solves the problem based on topic Problem finding messages , But if you want to search based on a property of the message ,ConsumeQueue There's nothing I can do with the documents . so RocketMQ And the introduction of Index Index file , Implement file based hash index .

Index file

ConsumeQueue yes RocketMQ Index files built specifically for message subscriptions , The purpose is to improve the speed of retrieving messages based on topics and message queues . and Index File implements hash index based on physical disk file .Index Document has 40 Byte file header 、500 Ten thousand Hashi slots 、2000 m Index Item composition , Each Hashi trough 4 byte 、 Every index The entry contains 20 Bytes , Respectively 4 Byte index key The hash code of 、8 Byte message physical offset 、4 Byte time stamp 、4 The previous byte Index entry ( Hash conflict linked list structure ).

The key implementation of hash conflict chain solution , The hash slot stores the latest hash code falling in the hash slot Index Indexes , new index At the end of the entry 4 Bytes store one entry on the hash code index Subscript . If the value stored in the hash slot is 0 Or greater than the current index The maximum number of entries in the file is less than -1, Indicates that the hash slot does not currently have a corresponding index entry . It is worth noting that ,Index The message index is not stored in the file entry key, It is the message properties key Hash , On the basis of key When searching, you need to find the message according to the physical offset of the message , Then verify the message key Value , The reason why only hash is stored , Instead of storing specific key, It's to make index The item is designed as a fixed length structure , To easily retrieve and locate items .

checkpoint file

checkpoint( checkpoint ) The purpose of the document is to record CommitLog、ConsumeQueue、Index Time point of file disk brushing , The fixed length of the file is 4KB, Only the front of the file is used 24 byte .

  • physicMsgTimestamp:CommitLog File swiping time point , Occupy 8 byte .
  • logicsMsgTimestamp:ConsumeQueue File swiping time point , Occupy 8 byte .
  • indexMsgTimestamp:Index File swiping time point , Occupy 8 byte .

Real time updates ConsumeQueue And Index file

because ConsumeQueue file 、Index Documents are based on CommitLog File built , So when messages submitted by message producers are stored in CommitLog file ,ConsumeQueue file 、Index Documents need to be updated in time , Otherwise, the news cannot be consumed in time , There is also a large delay in finding messages based on message properties .RocketMQ By opening a thread ReputMessageService To quasi real-time forwarding CommitLog File update event , The corresponding task processor updates in time according to the forwarded message ConsumeQueue file 、Index file .

Broker The server starts when it starts ReputMessageService Threads , And initialize a very critical parameter reputFromOffset, The meaning of this parameter is ReputMessageService From which physical offset to start forwarding messages to ConsumeQueue and Index file . If repeated forwarding is allowed , take reputFromOffset Set to CommitLog Submission pointer of the document . If repeated forwarding is not allowed , take reputFromOffset Set to CommitLog The maximum in memory offset of the file .

ReputMessageService Every time a thread executes a task push , rest 1ms Then continue trying to push messages to ConsumeQueue and Index file .

Update according to the message ConsumeQueue file
The general steps are as follows :

  • According to the message subject and queue ID, Get the corresponding first ConsumeQueue file , The logic is simple , Because each message subject corresponds to one ConsumeQueue Catalog , Each message queue under the topic corresponds to a folder , So take out the last... Of the folder ConsumeQueue File can .
  • The message offset in turn 、 The length of the message 、tag Hash code write ByteBuffer, And according to consumeQueueOffset Calculation ConsumeQueue Physical address in , Append content to ConsumeQueue In the memory mapped file ( This operation only adds , Do not brush the disc ),ConsumeQueue The disk brushing mode of is fixed as asynchronous disk brushing .

Update according to the message Index file
The implementation class of hash index file forwarding task is CommitLogDispatchBuildIndex. Of course , Whether to generate Index file , Can pass messageIndexEnable Control , If set to true, Call IndexService#buildIndex Build a hash index , Otherwise, the forwarding task will be ignored .
The general steps are as follows :

  • Get or create Index File and get the maximum physical offset of all files . If the physical offset of the message is less than Index Physical offset in file , It means duplicate data , Ignore this index build .
  • If the unique key of the message is not empty , Is added to the hash index , To speed up the retrieval of messages based on the unique key .
  • Building index keys ,RocketMq Multiple indexes for the same message are supported , Multiple index keys are separated by spaces .

Memory mapping

Although disk based sequential message writing can greatly improve I/O Write efficiency of , But if file - based storage is conventional Java File operations API, for example FileOutputStream etc. , The performance improvement will be limited , so RocketMQ And the introduction of Memory mapping , Map disk files to memory , Operate the disk in the way of memory , The performance has been improved to a higher level .

stay Java Pass through FileChannel Of map Method to create a memory mapped file . stay linux The files created by this method in the server use the page cache of the operating system .Linux The memory usage policy in the operating system will make the best use of the physical memory of the machine , And always live in memory , Page cache . When the memory of the operating system is not enough , Cache replacement algorithm is adopted , for example LRU Recycle unused page caches , That is, the operating system will automatically manage this part of memory .

If RocketMQ Broker Process exited abnormally , The data stored in the page cache is not lost , The operating system will periodically persist the data in the page cache to disk , Achieve data security and reliability . However, if it is an abnormal situation such as power failure of the machine , Data stored in the page cache can also be lost .

File swipe mechanism

With the blessing of sequential write and memory mapping ,RocketMQ The write performance of is greatly guaranteed , But everything has its pros and cons , Memory mapping and page caching mechanism are introduced , Messages are written to the page cache first , At this point, the message is not really persisted to disk . that Broker After receiving the client message , If it is stored in the page cache, it will directly return success , Or do you want to persist to the disk and return success ? This requires a trade-off between performance and message reliability . So ,RocketMQ There are two strategies : Synchronous brush set and Asynchronous brush set .

RocketMQ The storage, read and write of is based on JDK NIO Memory mapping mechanism , When a message is stored, it is first appended to memory , Then, according to the configured disk brushing strategy, you can brush the disk at different times . By means of broker Configuration in profile flushDiskType To set the disc brushing mode , Optional value is ASYNC_FLUSH( Asynchronous brush set )、SYNC_FLUSH( Synchronous brush set ), The default is asynchronous disk brushing .

ConsumeQueue file 、Index The principle of file disk brushing and CommitLog The disk brushing mechanism is similar . It is worth noting that ,Index The disk brushing mechanism of files is not a regular disk brushing mechanism , It's every update Index The file will write the last change to the disk .

Broker Synchronous brush set

Synchronous disk flushing is to add messages to memory , Will synchronously call MappedByteBuffer Of force() Method . Synchronous brush disk in RocketMQ The implementation of is called Group to submit , namely GroupCommitService Take out the request to swipe the disk from the queue , Then perform the disc brushing action , At this point write The pointer and flush All data between the pointers is flushed to disk .

After the consumption sending thread appends the message to the memory mapped file , The task will be synchronized GroupCommitRequest Submitted to the GroupCommitService Threads , Then call block to wait for the result of disk brushing , The timeout defaults to 5s.

GroupCommitService threading GroupCommitRequest Call after object wakeupCustomer Method wakes up the consuming sending thread , And inform... Of the disk brushing request GroupCommitRequest.

The client submits the synchronization disk brushing task to GroupCommitService Threads , If the thread is waiting, wake it up .

To avoid lock contention between the synchronous disk flushing consumption task and the task submitted by other message producers ,GroupCommitService Provide Read container and Write container , The two containers interact after each task , Continue to consume .

GroupCommitService Group commit thread , After each batch of disk brushing requests is processed , If the subsequent request to brush the disk needs to be processed , The group commit thread is constantly processing the next batch ; If there are no pending tasks , Rest 10ms, That is, every 10ms Idle once .
The specific steps are as follows

  • Perform the disk brushing operation , That is to call MappedByteBuffer#force Method . Traverse the task list of synchronous disk brushing , Execute the disk flushing logic one by one according to the adding sequence . If the brushed disc pointer is greater than 、 Equal to the submitted brush count , It means that the disk is successfully swiped , After each disc brushing operation , Call immediately GroupCommitRequest#wakeupCustomer Wake up the message sending thread and notify the disk flushing result .
  • After all synchronization disk brushing tasks are processed , Update the brush disk checkpoint StoreCheckpoint Medium physicMsgTimestamp, However, the disk brushing operation of the detection point is not performed , The disk flushing operation of the disk flushing detection point will be triggered when the message queue file is written .

Broker Asynchronous brush set

The advantage of synchronous disk brushing is to ensure that messages are not lost , That is, the successful return to the client indicates that the message has been persisted to the disk , But this comes at the expense of write performance , But because RocketMQ The message is written first pagecache, So the possibility of message loss is small , If you can tolerate a certain probability of message loss or can quickly re push at a low cost after loss , Consider using an asynchronous disk flushing strategy .

The value of asynchronous disk brushing is broker Store messages in pagecache Return to success immediately after , Then start an asynchronous thread to execute regularly FileChannel Of force Method , Write the data in the memory to the disk regularly , The default interval is 500ms.

Turn on transientStorePoolEnable The mechanism starts the asynchronous disk brushing mode , The realization of the brush disk is slightly different from that of the synchronous brush disk . If transientStorePoolEnable by true,RocketMQ A separate physical file with the target will be requested (CommitLog) Out of heap memory of the same size , This off heap memory will use memory locking , Ensure that it will not be replaced into virtual memory , The message is first appended to off heap memory , Then commit to the memory map of the physical file , Re menstruation flush Operation to disk . If transientStorePoolEnable by false, The message will be appended to the memory directly mapped to the physical file , Then write to disk .

Asynchronous disk brushing process

  • Append the message directly to ByteBuffer( Out of heap memory DirectByteBuffer),wrotePosition Move backward as messages are appended .
  • CommitRealTimeService Threads default to every 200ms take ByteBuffer The newly added data is submitted to FileChannel in .
  • FileChannel Add the submitted content to the channel , Its wrotePosition The pointer moves back and forth , Then return .
  • commit Operation successful return , take commitedPosition Move the submitted content length forward and backward , here wrotePosition The pointer can still be pushed forward .
  • FlushRealTimeService Threads default to every 500ms take FileChannel Newly added memory in (wrotePosition Subtract the last write position flushedPositiont), By calling FileChannel#force() Method to write data to disk .

Expired file deletion mechanism

because RocketMQ operation CommitLog、ConsumeQueue The file is based on the memory mapping mechanism and will be loaded at startup comitlog、consumeQueue All the files in the directory , So in order to avoid the waste of memory and disk , It is not possible to permanently store messages on the message server , This requires the introduction of a mechanism to delete expired files ,RocketMQ Sequential writing CommitLog file 、ConsumeQueue file , All writes fall on the last CommitLog File or ConsumeQueue On the file , The previous file will not be updated after the next file is created .RocketMQ How to clear out expired files : If “ Non current write file ” Not updated again within a certain time interval , It is considered to be an expired document , Can be deleted ,RocketMQ I won't pay attention to whether all the messages on this file are consumed . The default expiration time of every file is 72h, By means of broker Set... In the configuration file fileReservedTime To change the expiration time , It's in hours .

RocketMQ Design and implement the mechanism of deleting expired files
RocketMQ every other 10s Schedule once cleanFilesPeriodically, Check whether the expired files need to be cleared . The execution frequency can be determined by cleanResourceInterval Set it up , Default 10s.

Clear separately CommitLog Document and ConsumeQueue file .CommitLog Document and ConsumeQueue Files share a mechanism for deleting expired files .

  • First, judge whether the difference between the latest file update time and the current time exceeds the set file retention time , If this value is exceeded , The document is considered to be expired
  • When cleaning up expired files , If the file is occupied by another thread ( References greater than 0, For example, reading messages ), This delete task is blocked at this time , At the same time, record the current timestamp when you first try to delete the file ,destroyMapedFileIntervalForcibly Indicates the maximum time that a file can be retained after the first rejection of deletion , Within this time , It can also be rejected for deletion , After that time , Will set the number of references to a negative number , The file will be forcibly deleted .

RocketMQ If any of the following conditions is met, the operation of deleting the file will continue

  • Specify the point in time to delete the file ,RocketMQ adopt deleteWhen Set to delete expired files at a fixed time every day , The default in the morning 4 spot .
  • Check if there is enough disk space , If there is not enough disk space , Then return to true, Indicates that the deletion of expired files should be triggered .
  • Reserve manual trigger mechanism , You can call excuteDeleteFilesManualy Method to manually trigger the operation of deleting expired files , at present RocketMQ The command to manually trigger deletion is not encapsulated yet .

Master slave synchronization mechanism

RocketMQ To optimize the performance of synchronous replication , stay RocketMQ4.7.0 A major transformation has been made to the original synchronous replication , Greatly improves the performance of synchronous replication .

stay RocketMQ4.7.0 Previous versions of synchronous replication : Message sending thread SendMessageThread When a client request is received SendMessageProcessor The method in , Write messages to Broker. If the message replication mode is synchronous replication , This requires synchronous replication of messages to the slave node , Only after this message is sent will it return , namely SendMessageThread The thread needs to receive the synchronization result from the slave node before continuing to process the next message .

stay RocketMQ4.7.0 And how to optimize it ? Because the semantics of synchronous replication is to synchronize messages to the slave node , So this replication process has nothing to optimize , But we can reduce SendMessageThread Waiting time of thread , That is, during synchronous replication ,SendMessageThread Threads can continue to process other messages , It only returns the result to the client after receiving the synchronization result from the slave node . Improve Broker Message processing capability , Repeated use Broker Resources for .

The general process is as follows

  • The message first enters pageCache, Then perform the disc brushing operation (submitFlushRequest), Then call submitReplicaRequest Method to submit a message to HaService, Data replication , It's used here ComplateFuture Of thenCombine Method , Set the brush plate 、 Replication is performed as a joint task , Here, set the final status of message appending .
  • towards HaService Submit GroupCommitRequest After the object , The result of the synchronization is not returned , It is a CompletableFuture object , The object thenApply The method is mentioned above handlePutMessageResultFuture Method , and CompletableFuture Of complete Method will be called after the message is copied to the slave node .
  • After the message is successfully copied and the copy fails ,CompletableFuture Of complete Method will be called , thus CompletableFuture Of thenApply Method is triggered to call , This method returns the final result of message sending to the client , Realize in broker Asynchronous programming at the end , Make the performance of synchronous replication close to that of asynchronous replication , Greatly improve message replication performance .

Shutdown recovery mechanism

ConsumeQueue and Index File recovery

Tentatively

Learning links

Learn more about RocketMQ For more information, see RocketMQ Introduction to

原网站

版权声明
本文为[Just put a flower in heaven and earth]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/177/202206252118151288.html