当前位置:网站首页>[Flink] problems and solutions of the continuous growth of checkpoint size in rocksdb incremental mode

[Flink] problems and solutions of the continuous growth of checkpoint size in rocksdb incremental mode

2022-06-25 04:40:00 Ninth senior brother

1. summary

Reprint :RocksDB Incremental mode checkpoint The problem and solution of continuous growth of size

2. background

Flink edition :1.13.5

A use FlinkSQL Developed production line tasks , Use Tumble Window Gather and count , And configure table.exec.state.ttl by 7200000, Set up checkpoint The period is 5 minute , Use rocksdb Incremental mode .

Under normal circumstances , After the task runs for a period of time , The new and expired status achieves a dynamic balance , With RocksDB Of compaction,checkpoint The size of will fluctuate in a small range .

Actually observed ,checkpoint The size continues to grow slowly , function 20 After heaven , From the beginning 100M about , increased 2G,checkpoint The time is also from 1 Seconds have increased to tens of seconds .

Source code analysis
Let's see RocksIncrementalSnapshotStrategy.RocksDBIncrementalSnapshotOperation Class get() Method :

public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception {
    
            boolean completed = false;
            SnapshotResult<StreamStateHandle> metaStateHandle = null;
            Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap();
            HashMap miscFiles = new HashMap();
            boolean var15 = false;
 
            SnapshotResult var18;
            try {
    
                var15 = true;
                metaStateHandle = this.materializeMetaData(snapshotCloseableRegistry);
                Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
                Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(), "Metadata for job manager was not properly created.");
                this.uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);
                synchronized(RocksIncrementalSnapshotStrategy.this.materializedSstFiles) {
    
                    RocksIncrementalSnapshotStrategy.this.materializedSstFiles.put(this.checkpointId, sstFiles.keySet());
                }
 
                IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, RocksIncrementalSnapshotStrategy.this.keyGroupRange, this.checkpointId, sstFiles, miscFiles, (StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot());
                DirectoryStateHandle directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
                SnapshotResult snapshotResult;
                if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {
    
                    IncrementalLocalKeyedStateHandle localDirKeyedStateHandle = new IncrementalLocalKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, this.checkpointId, directoryStateHandle, RocksIncrementalSnapshotStrategy.this.keyGroupRange, (StreamStateHandle)metaStateHandle.getTaskLocalSnapshot(), sstFiles.keySet());
                    snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
                } else {
    
                    snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
                }
 
                completed = true;
                var18 = snapshotResult;
                var15 = false;
            } finally {
    
                if (var15) {
    
                    if (!completed) {
    
                        List<StateObject> statesToDiscard = new ArrayList(1 + miscFiles.size() + sstFiles.size());
                        statesToDiscard.add(metaStateHandle);
                        statesToDiscard.addAll(miscFiles.values());
                        statesToDiscard.addAll(sstFiles.values());
                        this.cleanupIncompleteSnapshot(statesToDiscard);
                    }
 
                }
            }

Focus on uploadSstFiles() Method implementation details :

            Preconditions.checkState(this.localBackupDirectory.exists());
            Map<StateHandleID, Path> sstFilePaths = new HashMap();
            Map<StateHandleID, Path> miscFilePaths = new HashMap();
            Path[] files = this.localBackupDirectory.listDirectory();
            if (files != null) {
    
                this.createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
                sstFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(sstFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));
                miscFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(miscFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));
            }

Enter into createUploadFilePaths() Method :

        private void createUploadFilePaths(Path[] files, Map<StateHandleID, StreamStateHandle> sstFiles, Map<StateHandleID, Path> sstFilePaths, Map<StateHandleID, Path> miscFilePaths) {
    
            Path[] var5 = files;
            int var6 = files.length;
 
            for(int var7 = 0; var7 < var6; ++var7) {
    
                Path filePath = var5[var7];
                String fileName = filePath.getFileName().toString();
                StateHandleID stateHandleID = new StateHandleID(fileName);
                if (!fileName.endsWith(".sst")) {
    
                    miscFilePaths.put(stateHandleID, filePath);
                } else {
    
                    boolean existsAlready = this.baseSstFiles != null && this.baseSstFiles.contains(stateHandleID);
                    if (existsAlready) {
    
                        sstFiles.put(stateHandleID, new PlaceholderStreamStateHandle());
                    } else {
    
                        sstFilePaths.put(stateHandleID, filePath);
                    }
                }
            }
 
        }

Here's the crux of the problem , We can generalize the main logic :

  1. scanning rocksdb All files in the local storage directory , Get all the sst Document and misc file ( except sst All other documents except documents );

  2. take sst Documents and history checkpoint The uploaded sst Compare the documents , Will add sst Record the file path ;

  3. take misc Record the path of the file ;

Here is the increment checkpoint The key logic of , We found something , incremental checkpoint Only aim at sst file , For others misc Files are backed up in full each time , Let's go to a directory node to see which files have been backed up in full :

[hadoop@fsp-hadoop-1 db]$ ll
 Total usage  8444
-rw-r--r-- 1 hadoop hadoop       0 3 month   28 14:56 000058.log
-rw-r--r-- 1 hadoop hadoop 2065278 3 month   31 10:17 025787.sst
-rw-r--r-- 1 hadoop hadoop 1945453 3 month   31 10:18 025789.sst
-rw-r--r-- 1 hadoop hadoop   75420 3 month   31 10:18 025790.sst
-rw-r--r-- 1 hadoop hadoop   33545 3 month   31 10:18 025791.sst
-rw-r--r-- 1 hadoop hadoop   40177 3 month   31 10:18 025792.sst
-rw-r--r-- 1 hadoop hadoop   33661 3 month   31 10:18 025793.sst
-rw-r--r-- 1 hadoop hadoop   40494 3 month   31 10:19 025794.sst
-rw-r--r-- 1 hadoop hadoop   33846 3 month   31 10:19 025795.sst
-rw-r--r-- 1 hadoop hadoop      16 3 month   30 19:46 CURRENT
-rw-r--r-- 1 hadoop hadoop      37 3 month   28 14:56 IDENTITY
-rw-r--r-- 1 hadoop hadoop       0 3 month   28 14:56 LOCK
-rw-rw-r-- 1 hadoop hadoop   38967 3 month   28 14:56 LOG
-rw-r--r-- 1 hadoop hadoop 1399964 3 month   31 10:19 MANIFEST-022789
-rw-r--r-- 1 hadoop hadoop   10407 3 month   28 14:56 OPTIONS-000010
-rw-r--r-- 1 hadoop hadoop   13126 3 month   28 14:56 OPTIONS-000012
  1. CURRENT、IDENTIFY、LOCK、OPTIONS-*, These files are basically fixed size , There will be no change ;

  2. LOG file , This file is rocksdb Log file , By default ,flink Set up rocksdb The log output level of is HEAD Level , Almost no log output , But if you configure state.backend.rocksdb.log.level, For example, the configuration is INFO_LEVEL, So this LOG The file will continue to be exported and will not be cleaned up ;

  3. MANIFEST-*, This is a rocksdb Transaction log for , It will be used in the process of task replay , This log will continue to grow , After reaching the threshold, scroll to generate new and clear old files ;

3. The reason summary

In increment checkpoint In the process , although sst The size of the state data saved in the file is dynamically balanced , however LOG Journal and MANIFEST Documents will continue to grow , therefore checkpoint It gets bigger and bigger , Slower and slower .

4. terms of settlement

  1. Shut down in the production environment Rocksdb journal ( keep state.backend.rocksdb.log.level The default configuration of );

  2. Set up manifest File rollover threshold , What I set up is 10485760byte;

原网站

版权声明
本文为[Ninth senior brother]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/176/202206250335287248.html