当前位置:网站首页>Flink snapshot analysis: operators for locating large states and data skew
Flink snapshot analysis: operators for locating large states and data skew
2022-06-24 12:17:00 【KyleMeow】
Job status is getting bigger and bigger , What's going on ?
stay Flink In homework , Whether it's SQL still JAR Pattern , Often used directly or indirectly to state (State). When Flink When taking a snapshot , These user-defined status data can be saved in the status point , For subsequent crash recovery .
Flink The state of is divided into Operator State and Keyed State, and Keyed State Can be divided into ValueState、MapState、ListState、AggregatingState、MergingState、ReducingState Etc . Besides , These numerous states have a variety of specific implementations (HeapState、RocksDBState etc. ), State access also requires various Serializer and Deserializer Participation , The whole link is exquisite and complicated .
For ordinary users ,Flink The internal operation mode is like a black box , But the trouble brought by state is real , Especially in use SQL A lot of watches JOIN perhaps GROUP BY Equal semantic time , It's easy because there are more and more states , Cause frequent TaskManager OOM( Out of memory ), Affect the stability of online business , More affect the mood ╮(╯_╰)╭
Many users face jobs that continue to crash , And dozens of hundreds on disk GB The snapshot file for , I also collapsed : Such a big state , What's in it ? Can you delete something ?
Type of snapshot
Flink Snapshots of include Checkpoint( Cycle triggers ) and Savepoint( The user actively triggers ) Two kinds of , among Checkpoint Divided into ordinary Checkpoint And externalization (Externalized)Checkpoint. Ordinary Checkpoint Can only be used for this JobManager Internal recovery during survival ; Externalization Checkpoint and Savepoint It can be used for cold start recovery from scratch .
about Savepoint, And turned on Externalization characteristics Of Checkpoint,Flink A metadata file will be generated in the snapshot Directory ( The snapshot directory is named _metadata The file of ), This file is a crucial clue when we analyze the snapshot .
The storage format of the snapshot
Let's start with this metadata (_metadata file ) Starting with , Take a look at its data structure :
stay Master State In the indefinite length structure of , They have their own Magic Number、 Data length and other information , There is usually not much data .
Operator State It's the big end of the State , In its indefinite length structure , It mainly includes each Operator Of ID( By two Long Put it together to form ), And the parallelism of the current operator (parallelism) And maximum parallelism (maximum parallelism), And subtasks (subtask) The number of States 、 Of each subtask index、 Metadata ( Does it include raw and managed Of Operator State、 Does it include raw and managed Of Keyed State、 What specific states are included 、 KeyGroup Range 、 Offset 、 Whether it is Incremental state 、 Pointer to the status file RelativeFileStateHandle etc. ).
In addition to metadata files , There are also many specific status files (RelativeFileStateHandle The file pointed to by the pointer ), They are usually too large to be embedded directly _metadata file , A state that can only exist as a separate file .
How to read snapshots
As you can see from the above , Parsing state files is not easy , There are many things to consider . He who breaks the bell must tie the bell , We can use Flink Read and parse the state file by itself :
1. Flink Inside API
The easiest way , Is to find Flink Restore snapshot state of the source code , Then follow the diagram to find and deserialize _metadata File class . Soon , We found org.apache.flink.runtime.checkpoint.Checkpoints#loadCheckpointMetadata This static method , It can reverse sequence a given data stream into Flink Inside CheckpointMetadata object ( That is, the memory mapping of the above file ).
If you only want to process metadata information , It does not involve reading and writing specific status data , This method can be used .
2. After the encapsulation State Processor API
In the new Flink In the version , It also includes the encapsulated State Processor API, Through this API, We can not only read the specific status file , Status data can also be generated as needed for new applications Flink Homework uses .
Use State Processor API when , Because it involves the reading and writing of specific states , You need to give StateBackend example , And specific Operator UID Etc , And with DataSet Executed as a batch task , The process is relatively complex , This article will not expand the description , There will be a separate article on how to use it .
Practice together
Let's try to use Flink Inside API To read the status metadata information , And analyze what Operator The state of accounts for the largest proportion , And these Operator Each of them Subtask( Subtasks with multiple degrees of parallelism ) The state and dosage of .
Sample code It's simple , Here are the specific analysis results :
You can see , All the information in the metadata file is printed out , And it shows 4421bbc22ac32fa6abe810c70a869c54 This Operator The state of accounts for the largest proportion , Reached 92.31%, And each Subtask The state quantity of is relatively average , All in 1.1G ~ 1.3G Between , There is basically no phenomenon of data skew .
Because the metadata does not contain this Operator Information such as your name and type , You need to search this by looking up the log Operator ID. From the log, we can see that it is a InnerJoin The operator of .
Further detailed analysis of the source code can get , yes StreamingJoinOperator This flow JOIN Two of the operators JoinRecordStateView State data .
In principle , To implement the general of two streams JOIN( Borderless JOIN), All data of the two streams must be permanently retained for future reference , And there is no cleaning mechanism by default ( Unless you set the following Idle State Retention Time), So this kind of JOIN In the production environment, it is easy to happen because the state is too large OOM. We recommend that users use Interval JOIN( Time interval JOIN) Instead of , For details, please refer to This document .
The other one SQL The operators that are easy to cause super large states in the environment are unbounded GROUP BY, But also good Flink Provides Idle State Retention Time Mechanism , The periodic cleaning logic of the state can be configured , Will these GROUP BY and JOIN The expired status of should be cleared in time .
Reference reading
https://ververica.cn/developers/introduction-to-state-management-and-fault-tolerance/
https://ververica.cn/developers/state-management/
https://flink.apache.org/feature/2019/09/13/state-processor-api.html
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/joins.html
边栏推荐
- Opencv learning notes - loading and saving images
- The solution of distributed system: directory, message queue, transaction system and others
- Reading at night -- about microservices and containers
- AXI低功耗接口
- 《opencv学习笔记》-- 感兴趣区域(ROI)、图像混合
- Programmers spend most of their time not writing code, but...
- 万名校园开发者花式玩AI,亮点看这张图就够啦!
- Opencv learning notes -- Separation of color channels and multi-channel mixing
- Why choose b+ tree as storage engine index structure
- 基于AM335X开发板 ARM Cortex-A8——Acontis EtherCAT主站开发案例
猜你喜欢

【Go语言刷题篇】Go从0到入门4:切片的高级用法、初级复习与Map入门学习
[Architect (Part 41)] installation of server development and connection to redis database

qt -- QTabWidget 中支持拖拽TabBar项

电商红包雨是如何实现的?拿去面试用(典型高并发)
[Old Wei makes machines] issue 090: keyboard? host? Full function keyboard host!

《opencv学习笔记》-- 图像的载入和保存

PHP SMS notification + voice broadcast automatic double call

Axi low power interface

工具及方法 - 在Source Insight中使用代码格式化工具

Opencv learning notes -- Separation of color channels and multi-channel mixing
随机推荐
怎么可以打新债 开户是安全的吗
[go language questions] go from 0 to entry 4: advanced usage of slice, elementary review and introduction to map
【直播回顾】战码先锋第七期:三方应用开发者如何为开源做贡献
Google Earth Engine(GEE)—如何新增一个图例在Map面板
11+的基于甲基化组和转录组综合分析识别葡萄膜黑色素瘤中新的预后 DNA 甲基化特征~
[Old Wei makes machines] issue 090: keyboard? host? Full function keyboard host!
How is the e-commerce red envelope realized? For interview (typical high concurrency)
Using the collaboration database query of Poole in laravel5.6
Jenkins performance test
Embedded must learn! Detailed explanation of hardware resource interface - based on arm am335x development board (Part 1)
Single gene pan cancer + simple experiment can be published 7 points+
链接器 --- Linker
打新债的条件 开户是安全的吗
《opencv学习笔记》-- 分离颜色通道、多通道混合
如何优雅的写 Controller 层代码?
I'm in Shenzhen. Where can I open an account? Is it safe to open an account online now?
[Architect (Part 41)] installation of server development and connection to redis database
5 points + single gene pan cancer pure Shengxin idea!
12+!不同癌症中TMB与ICI反应之间的免疫相关因素研究
How to purchase new bonds is it safe to open an account