当前位置:网站首页>[Flink] problems and solutions of the continuous growth of checkpoint size in rocksdb incremental mode
[Flink] problems and solutions of the continuous growth of checkpoint size in rocksdb incremental mode
2022-06-25 04:40:00 【Ninth senior brother】
1. summary
Reprint :RocksDB Incremental mode checkpoint The problem and solution of continuous growth of size
2. background
Flink edition :1.13.5
A use FlinkSQL Developed production line tasks , Use Tumble Window Gather and count , And configure table.exec.state.ttl by 7200000, Set up checkpoint The period is 5 minute , Use rocksdb Incremental mode .
Under normal circumstances , After the task runs for a period of time , The new and expired status achieves a dynamic balance , With RocksDB Of compaction,checkpoint The size of will fluctuate in a small range .
Actually observed ,checkpoint The size continues to grow slowly , function 20 After heaven , From the beginning 100M about , increased 2G,checkpoint The time is also from 1 Seconds have increased to tens of seconds .
Source code analysis
Let's see RocksIncrementalSnapshotStrategy.RocksDBIncrementalSnapshotOperation Class get() Method :
public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception {
boolean completed = false;
SnapshotResult<StreamStateHandle> metaStateHandle = null;
Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap();
HashMap miscFiles = new HashMap();
boolean var15 = false;
SnapshotResult var18;
try {
var15 = true;
metaStateHandle = this.materializeMetaData(snapshotCloseableRegistry);
Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(), "Metadata for job manager was not properly created.");
this.uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);
synchronized(RocksIncrementalSnapshotStrategy.this.materializedSstFiles) {
RocksIncrementalSnapshotStrategy.this.materializedSstFiles.put(this.checkpointId, sstFiles.keySet());
}
IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, RocksIncrementalSnapshotStrategy.this.keyGroupRange, this.checkpointId, sstFiles, miscFiles, (StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot());
DirectoryStateHandle directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
SnapshotResult snapshotResult;
if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {
IncrementalLocalKeyedStateHandle localDirKeyedStateHandle = new IncrementalLocalKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, this.checkpointId, directoryStateHandle, RocksIncrementalSnapshotStrategy.this.keyGroupRange, (StreamStateHandle)metaStateHandle.getTaskLocalSnapshot(), sstFiles.keySet());
snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
} else {
snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
}
completed = true;
var18 = snapshotResult;
var15 = false;
} finally {
if (var15) {
if (!completed) {
List<StateObject> statesToDiscard = new ArrayList(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
this.cleanupIncompleteSnapshot(statesToDiscard);
}
}
}
Focus on uploadSstFiles() Method implementation details :
Preconditions.checkState(this.localBackupDirectory.exists());
Map<StateHandleID, Path> sstFilePaths = new HashMap();
Map<StateHandleID, Path> miscFilePaths = new HashMap();
Path[] files = this.localBackupDirectory.listDirectory();
if (files != null) {
this.createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
sstFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(sstFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));
miscFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(miscFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));
}
Enter into createUploadFilePaths() Method :
private void createUploadFilePaths(Path[] files, Map<StateHandleID, StreamStateHandle> sstFiles, Map<StateHandleID, Path> sstFilePaths, Map<StateHandleID, Path> miscFilePaths) {
Path[] var5 = files;
int var6 = files.length;
for(int var7 = 0; var7 < var6; ++var7) {
Path filePath = var5[var7];
String fileName = filePath.getFileName().toString();
StateHandleID stateHandleID = new StateHandleID(fileName);
if (!fileName.endsWith(".sst")) {
miscFilePaths.put(stateHandleID, filePath);
} else {
boolean existsAlready = this.baseSstFiles != null && this.baseSstFiles.contains(stateHandleID);
if (existsAlready) {
sstFiles.put(stateHandleID, new PlaceholderStreamStateHandle());
} else {
sstFilePaths.put(stateHandleID, filePath);
}
}
}
}
Here's the crux of the problem , We can generalize the main logic :
scanning rocksdb All files in the local storage directory , Get all the sst Document and misc file ( except sst All other documents except documents );
take sst Documents and history checkpoint The uploaded sst Compare the documents , Will add sst Record the file path ;
take misc Record the path of the file ;
Here is the increment checkpoint The key logic of , We found something , incremental checkpoint Only aim at sst file , For others misc Files are backed up in full each time , Let's go to a directory node to see which files have been backed up in full :
[hadoop@fsp-hadoop-1 db]$ ll
Total usage 8444
-rw-r--r-- 1 hadoop hadoop 0 3 month 28 14:56 000058.log
-rw-r--r-- 1 hadoop hadoop 2065278 3 month 31 10:17 025787.sst
-rw-r--r-- 1 hadoop hadoop 1945453 3 month 31 10:18 025789.sst
-rw-r--r-- 1 hadoop hadoop 75420 3 month 31 10:18 025790.sst
-rw-r--r-- 1 hadoop hadoop 33545 3 month 31 10:18 025791.sst
-rw-r--r-- 1 hadoop hadoop 40177 3 month 31 10:18 025792.sst
-rw-r--r-- 1 hadoop hadoop 33661 3 month 31 10:18 025793.sst
-rw-r--r-- 1 hadoop hadoop 40494 3 month 31 10:19 025794.sst
-rw-r--r-- 1 hadoop hadoop 33846 3 month 31 10:19 025795.sst
-rw-r--r-- 1 hadoop hadoop 16 3 month 30 19:46 CURRENT
-rw-r--r-- 1 hadoop hadoop 37 3 month 28 14:56 IDENTITY
-rw-r--r-- 1 hadoop hadoop 0 3 month 28 14:56 LOCK
-rw-rw-r-- 1 hadoop hadoop 38967 3 month 28 14:56 LOG
-rw-r--r-- 1 hadoop hadoop 1399964 3 month 31 10:19 MANIFEST-022789
-rw-r--r-- 1 hadoop hadoop 10407 3 month 28 14:56 OPTIONS-000010
-rw-r--r-- 1 hadoop hadoop 13126 3 month 28 14:56 OPTIONS-000012
CURRENT、IDENTIFY、LOCK、OPTIONS-*, These files are basically fixed size , There will be no change ;
LOG file , This file is rocksdb Log file , By default ,flink Set up rocksdb The log output level of is HEAD Level , Almost no log output , But if you configure state.backend.rocksdb.log.level, For example, the configuration is INFO_LEVEL, So this LOG The file will continue to be exported and will not be cleaned up ;
MANIFEST-*, This is a rocksdb Transaction log for , It will be used in the process of task replay , This log will continue to grow , After reaching the threshold, scroll to generate new and clear old files ;
3. The reason summary
In increment checkpoint In the process , although sst The size of the state data saved in the file is dynamically balanced , however LOG Journal and MANIFEST Documents will continue to grow , therefore checkpoint It gets bigger and bigger , Slower and slower .
4. terms of settlement
Shut down in the production environment Rocksdb journal ( keep state.backend.rocksdb.log.level The default configuration of );
Set up manifest File rollover threshold , What I set up is 10485760byte;
边栏推荐
- Gbase 8s parallel operation problem scenario description
- CTF_ Web: basic 12 questions WP of attack and defense world novice zone
- unity Quad剔除背面并剔除透明部分的shader
- 我的IC之旅——资深芯片设计验证工程师成长——“胡”说IC工程师完美进阶
- What is the storage engine and the three common database storage engines for MySQL
- CTF_ Web: Learn flask template injection (SSTI) from 0
- GBASE 8s的数据视图
- File upload vulnerability shooting range upload labs learning (pass1-pass5)
- ROS2/DDS/QoS/主题的记录
- Codeforces Round #802 (Div. 2) C D
猜你喜欢
Upgrade PHP to php7 The impact of X (2), the obsolescence of mcrypt decryption
halcon之区域:多种区域(Region)生成(3)
A detailed summary of four handshakes (or four waves) over TCP connections
i. Max development board learning record
Simple text analysis of malicious samples - Introduction
Concat() in JS
CTF_ Web: how to recognize and evaluate a regular expression
Vscode 设置clang-format
第九章 APP项目测试(2) 测试工具
论文笔记: 多标签学习 ESMC (没看懂, 还没写出来, 暂时放这里占个位置)
随机推荐
GBASE 8s活锁、死锁问题的解决
What is the storage engine and the three common database storage engines for MySQL
[untitled]
Gbase 8s parallel operation problem scenario description
什么是存储引擎以及MySQL常见的三种数据库存储引擎
Record small knowledge points
Concat() in JS
Package for gbase 8s
GBASE 8s存储过程语法结构
GBASE 8s的数据导入和导出
Coordinate system left multiply right multiply
Immutable學習之路----告別傳統拷貝
PostgreSQL database Wal - RM_ HEAP_ ID logging action
Record the problem of C # print size once
jsz中的join()
CTF_ Web: advanced problem WP (5-8) of attack and defense world expert zone
CTF_ Web: Advanced questions of attack and defense world expert zone WP (9-14)
Php7.2 add JPEG extension
CTF_ Web: Advanced questions of attack and defense world expert zone WP (15-18)
Xiaobai learns MySQL - Statistical 'opportunism'