当前位置:网站首页>【Flink】RocksDB增量模式checkpoint大小持续增长的问题及解决
【Flink】RocksDB增量模式checkpoint大小持续增长的问题及解决
2022-06-25 04:01:00 【九师兄】
1.概述
转载:RocksDB增量模式checkpoint大小持续增长的问题及解决
2.背景
Flink版本:1.13.5
一个使用FlinkSQL开发的生产线上任务, 使用Tumble Window做聚和统计,并且配置table.exec.state.ttl为7200000,设置checkpoint周期为5分钟,使用rocksdb的增量模式。
正常情况下,任务运行一段时间以后,新增和过期的状态达到动态的平衡,随着RocksDB的compaction,checkpoint的大小会在小范围内上下起伏。
实际观察到,checkpoint大小持续缓慢增长,运行20天以后,从最初了100M左右,增长到了2G,checkpoint的时间也从1秒增加到了几十秒。
源码分析
我们看一下RocksIncrementalSnapshotStrategy.RocksDBIncrementalSnapshotOperation类中的get()方法:
public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception {
boolean completed = false;
SnapshotResult<StreamStateHandle> metaStateHandle = null;
Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap();
HashMap miscFiles = new HashMap();
boolean var15 = false;
SnapshotResult var18;
try {
var15 = true;
metaStateHandle = this.materializeMetaData(snapshotCloseableRegistry);
Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(), "Metadata for job manager was not properly created.");
this.uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);
synchronized(RocksIncrementalSnapshotStrategy.this.materializedSstFiles) {
RocksIncrementalSnapshotStrategy.this.materializedSstFiles.put(this.checkpointId, sstFiles.keySet());
}
IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, RocksIncrementalSnapshotStrategy.this.keyGroupRange, this.checkpointId, sstFiles, miscFiles, (StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot());
DirectoryStateHandle directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
SnapshotResult snapshotResult;
if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {
IncrementalLocalKeyedStateHandle localDirKeyedStateHandle = new IncrementalLocalKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, this.checkpointId, directoryStateHandle, RocksIncrementalSnapshotStrategy.this.keyGroupRange, (StreamStateHandle)metaStateHandle.getTaskLocalSnapshot(), sstFiles.keySet());
snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
} else {
snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
}
completed = true;
var18 = snapshotResult;
var15 = false;
} finally {
if (var15) {
if (!completed) {
List<StateObject> statesToDiscard = new ArrayList(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
this.cleanupIncompleteSnapshot(statesToDiscard);
}
}
}
重点关注uploadSstFiles()方法的实现细节:
Preconditions.checkState(this.localBackupDirectory.exists());
Map<StateHandleID, Path> sstFilePaths = new HashMap();
Map<StateHandleID, Path> miscFilePaths = new HashMap();
Path[] files = this.localBackupDirectory.listDirectory();
if (files != null) {
this.createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
sstFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(sstFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));
miscFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(miscFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));
}
进入到createUploadFilePaths()方法:
private void createUploadFilePaths(Path[] files, Map<StateHandleID, StreamStateHandle> sstFiles, Map<StateHandleID, Path> sstFilePaths, Map<StateHandleID, Path> miscFilePaths) {
Path[] var5 = files;
int var6 = files.length;
for(int var7 = 0; var7 < var6; ++var7) {
Path filePath = var5[var7];
String fileName = filePath.getFileName().toString();
StateHandleID stateHandleID = new StateHandleID(fileName);
if (!fileName.endsWith(".sst")) {
miscFilePaths.put(stateHandleID, filePath);
} else {
boolean existsAlready = this.baseSstFiles != null && this.baseSstFiles.contains(stateHandleID);
if (existsAlready) {
sstFiles.put(stateHandleID, new PlaceholderStreamStateHandle());
} else {
sstFilePaths.put(stateHandleID, filePath);
}
}
}
}
这里是问题的关键,我们可以归纳出主要逻辑:
扫描rocksdb本地存储目录下的所有文件,获取到所有的sst文件和misc文件(除sst文件外的其他所有文件);
将sst文件和历史checkpoint上传的sst文件做对比,将新增的sst文件路径记录下来;
将misc文件的路径记录下来;
这里就是增量checkpoint的关键逻辑了, 我们发现一点,增量的checkpoint只针对sst文件, 对其他的misc文件是每次全量备份的,我们进到一个目录节点看一下有哪些文件被全量备份了:
[hadoop@fsp-hadoop-1 db]$ ll
总用量 8444
-rw-r--r-- 1 hadoop hadoop 0 3月 28 14:56 000058.log
-rw-r--r-- 1 hadoop hadoop 2065278 3月 31 10:17 025787.sst
-rw-r--r-- 1 hadoop hadoop 1945453 3月 31 10:18 025789.sst
-rw-r--r-- 1 hadoop hadoop 75420 3月 31 10:18 025790.sst
-rw-r--r-- 1 hadoop hadoop 33545 3月 31 10:18 025791.sst
-rw-r--r-- 1 hadoop hadoop 40177 3月 31 10:18 025792.sst
-rw-r--r-- 1 hadoop hadoop 33661 3月 31 10:18 025793.sst
-rw-r--r-- 1 hadoop hadoop 40494 3月 31 10:19 025794.sst
-rw-r--r-- 1 hadoop hadoop 33846 3月 31 10:19 025795.sst
-rw-r--r-- 1 hadoop hadoop 16 3月 30 19:46 CURRENT
-rw-r--r-- 1 hadoop hadoop 37 3月 28 14:56 IDENTITY
-rw-r--r-- 1 hadoop hadoop 0 3月 28 14:56 LOCK
-rw-rw-r-- 1 hadoop hadoop 38967 3月 28 14:56 LOG
-rw-r--r-- 1 hadoop hadoop 1399964 3月 31 10:19 MANIFEST-022789
-rw-r--r-- 1 hadoop hadoop 10407 3月 28 14:56 OPTIONS-000010
-rw-r--r-- 1 hadoop hadoop 13126 3月 28 14:56 OPTIONS-000012
CURRENT、IDENTIFY、LOCK、OPTIONS-*, 这些文件基本是固定大小,不会有变化;
LOG文件, 这个文件是rocksdb的日志文件,默认情况下,flink设置的rocksdb的日志输出级别是HEAD级别,几乎不会有日志输出,但是如果你配置了state.backend.rocksdb.log.level,比如说配置为了INFO_LEVEL,那么这个LOG文件会持续输出并且不会被清理;
MANIFEST-*,这是rocksdb的事务日志,在任务恢复重放过程中会用到, 这个日志也会持续增长,达到阈值以后滚动生成新的并且清楚旧文件;
3.原因总结
在增量checkpoint过程中,虽然sst文件所保存的状态数据大小保持动态平衡,但是LOG日志和MANIFEST文件仍然会当向持续增长,所以checkpoint会越来越大,越来越慢。
4.解决办法
在生产环境关闭Rocksdb日志(保持state.backend.rocksdb.log.level的默认配置即可);
设置manifest文件的滚动阈值,我设置的是10485760byte;
边栏推荐
- 论文笔记: 多标签学习 ESMC (没看懂, 还没写出来, 暂时放这里占个位置)
- 哪个编程语言实现hello world最烦琐?
- 【esp32学习之路6——flash加密】
- 小白学习MySQL - 统计的'投机取巧'
- Structure syntaxique des procédures stockées gbase 8S
- 第九章 APP项目测试(2) 测试工具
- SQL injection details
- Wechat likes to pay attention to the solution of invalid automatic reply
- Synchronous and asynchronous functions (callback function, promise, generator, async/await)
- GBase 8s 锁的分类
猜你喜欢
随机推荐
The yii2 debug toolbar is missing
Gbase 8s overall architecture
论文笔记: 多标签学习 ESMC (没看懂, 还没写出来, 暂时放这里占个位置)
GBASE 8s 索引B+树
写shell脚本报错总结
Record the problem of C # print size once
GBASE 8s的触发器
JS arrow function
CTF_ Web: deserialization learning notes (I) classes and objects in PHP
CTF_ Web: how to recognize and evaluate a regular expression
CTF_ Web: advanced problem WP (5-8) of attack and defense world expert zone
JS' sort() function
What is persistence? What are RDB and AOF in redis persistence?
Simple text analysis of malicious samples - Introduction
Data import and export for gbase 8s
JS arguments
Anaconda安装+TensorFlow安装+Keras安装+numpy安装(包含镜像和版本信息兼容问题)
PHP encapsulates curl to send get and post request methods, and uses
Upgrade PHP to php7 The impact of X (I). The problem of session retention. Keep login
GBASE 8s的数据导入和导出