当前位置:网站首页>Detailed explanation of Flink checkpoint specific operation process and summary of error reporting and debugging methods
Detailed explanation of Flink checkpoint specific operation process and summary of error reporting and debugging methods
2022-06-25 11:39:00 【Direction_ Wind】
This article mainly refers to the official community checkpoint Error type and type , And how to find the error report .
flink checkpint Type of error
There are two main types
Checkpoint Decline And Checkpint Expire Two types of Let's discuss it separately
Checkpint Introduction of the process
- First step ,Checkpoint Coordinator To all source node trigger Checkpoint;.
- The second step ,source The nodes broadcast to the downstream barrier, This barrier Is to achieve Chandy-Lamport The core of distributed snapshot algorithm , The downstream task Only to receive all input Of barrier To execute the corresponding Checkpoint.
- The third step , When task complete state After backup , The address of the backup data (state handle) Notified to Checkpoint coordinator.
- Step four , The downstream sink The nodes collect two upstream nodes input Of barrier after , Local snapshot will be executed , Here is a special display of RocksDB incremental Checkpoint The process of , First RocksDB Can fully brush data to disk , then Flink The framework will select the files that have not been uploaded for persistent backup .
- alike ,sink Node is completing its own Checkpoint after , Will state handle Return to inform Coordinator.
- task Receive all upstream barrier after , Will be able to barrier Continue to pass down , And asynchronously write your own state to persistent storage , Give... When finished jm Medium Checkpoint coordinator Notification has been completed , And the address of the backup data (state handle) Give it to the past .Checkpoint coordinator After collecting all , Will Checkpoint Meta Write to persistent storage , End .
To sum up checkpoint It is divided into the following operations :
- JM trigger checkpoint
- Source received trigger checkpoint Of PRC, Do it yourself snapshot, And send it downstream barrier
- Downstream receiving barrier( need barrier We won't start until we're all together checkpoint)
- Task Start the synchronization phase snapshot
- Task Start the asynchronous phase snapshot
Task snapshot complete , Report to JM
Failure of any of the above operations will result in checkpoint Failure
Checkpoint Troubleshooting of abnormal conditions
The above parameters are :
- A column indicates how many there are subtask For this Checkpoint the ack
- It means that we should operator All of the subtask Last ack Time for
- surface in whole individual operator Of the Yes subtask in End become snapshot The longest time
- At present Checkpoint Of state size , Increment is the size of increment
From the above figure, we can see that 4 individual task The operation results in an overall checkpoint Very slow , According to UI Give a physical execution diagram to justify the inspection task , But most of the time when we find checkpoint When reporting a mistake , The task has down fall , Then it needs to be based on yarn Log on the
Checkpoint Decline:
from jm You can see in the log of
Decline checkpoint 10000 by task ********* container_e119_1640332468237_165586_01_000002 @ hostname01 with allocation id 2872ccdf76d6af3baf9064be9d46fcaa
You can go to container_e119_1640332468237_165586_01_000002 Where tm That is to say hostname01 , You can see the specific tm Log to view specific error messages
Checkpoint Decline There is a situation in Checkpoint Cancel, This is because smaller barrier Not aligned yet , We have already received more barrier, In this case, the small checkpoint To cancel out
stay jm.log There will be At present chk-11 Still in the alignment stage , But I received chk-12 Of barrier , So it canceled chk-11
Received checkpoint barrier for checkpoint ****** before completing current checkpoint ** Skipping current checkpoint
The downstream task Received the cancelled barrier Will print
$taskNameWithSubTaskAndID: Checkpoint chk-11 canceled, aborting alignment.
or
$taskNameWithSubTaskAndID: Received cancellation barrier for checkpoint chk-12 before completing current checkpoint chk-11. Skipping current checkpoint
Checkpoint Expire:
above Decline Relatively rare , More often Expire The situation of . The main reason is that checkpoint It's very slow , Lead to timeout and other conditions .
appear expire when ,jm.log There will be
Checkpoint 157 of job ba02728367ae85bca4d43ab7445251f5 expired before completing.
as well as
Received late message for now expired checkpoint attempt 158 from task d11aac4d0b6f4fd9bde0fa4e76240c71 of job ba02728367ae85bca4d43ab7445251f5 at container_e119_1640332468237_165586_01_000002 @ cp-hadoop-hdp-node07 (dataPort=11460).
among tm For specific logs, please refer to the above methods to find the corresponding error reporting logs .
chk There are mainly the following slow situations :
Source Trigger slow
This usually happens less , But it's also possible , because source do snapshot And send it downstream barrier When , Need to grab the lock ( The community is now using mailBox Instead of the current lock grabbing method , Details refer to [1]). If you can't get the lock all the time , May lead to Checkpoint Never got a chance to . If in Source Where taskmanager.log Can't find a start in Checkpoint Of log, You can consider whether this is the case , Can pass jstack Further confirm the holding of the lock
State A very large
In this case, use incremental checkpoint, Increment now checkpoint Only support RocksDBStateBackend And it needs to be set to open
Data skew or backpressure
Data skew can be improved by redesigning the primary key and data processing flow , Back pressure can refer to flink UI To see where the back pressure , And use Metrics To get key indicators
Dealing with the problem of back pressure :
Positioning nodes , Add Metrics
We will use it when monitoring the back pressure Metrics Main and Channel Receiving end Buffer Usage is related to , The most useful are the following :
Metrics: Metris describe
outPoolUsage The sender Buffer The usage rate of
inPoolUsage The receiver Buffer The usage rate of
floatingBuffersUsage(1.9 above ) The receiver Floating Buffer The usage rate of
exclusiveBuffersUsage (1.9 above ) The receiver Exclusive Buffer The usage rate of
barrier Alignment is slow
Checkpoint stay task End divided into barrier alignment ( Collect all the data sent from upstream barrier), Then start the synchronization phase , Then do the asynchronous phase . If barrier If it's not aligned all the time , You won't start doing snapshot
This situation can also lead to State A very large , When it comes first barrier After arrival , Late barrier before , The data between these will also be put into State And save them together .
stay Debug Journal ,barrier After alignment, there will be
Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)
If it hasn't been , Be careful ! yes Debug journal , have access to at least once, To see which barrier Didn't arrive , Say more ,at least once And exectly once The main semantic difference is , First come barrier, Whether it will arrive later barrier Alignment is done checkpoint
Received barrier for checkpoint 96508 from channel 5
Threads are too busy
stay task End , All processing is single threaded , Data processing and barrier Processing is handled by the main thread , If the main thread is processing too slowly ( For example, use RocksDBBackend,state Slow operation leads to slow overall processing ), Lead to barrier Slow processing , It will also affect the whole Checkpoint Progress , There may be barrier Have been on the uneven situation
It can be used AsyncProfile Generate a flame diagram , View occupancy cpu Most stacks , In big data cluster , If a cluster is used offline in real time , In the early morning , Offline task collective scheduling , It may lead to node There are not enough threads on the node , Unable to complete checkpoint Result in an error
The synchronization phase is slow
Not RocksDBBackend We can consider checking whether asynchrony is enabled snapshot, If asynchronous is enabled snapshot Still slow , Need to see the whole JVM What's going on? , You can also use the tools in the previous section .
about RocksDBBackend Come on , We can use iostate See how stressed the disk is , In addition, you can view tm End RocksDB Of log How about your log , Check it out SNAPSHOT How much is the total cost of your time
Asynchronous phase is slow
This step is mainly ,jm take Checkpoint Meta Write to persistent storage ,
Not RocksDB-Backend , The main problem is the network traffic , have access to metirc To monitor and check for problems
RocksDB Come on , You need to read the file locally , Write to remote persistent storage , Will involve disks IO Bottleneck , If you feel IO enough , The Internet is no problem , You can enable multi-threaded uploading
边栏推荐
猜你喜欢
Shichuang energy rushes to the scientific innovation board: it plans to raise 1.1 billion yuan, with an annual revenue of 700million yuan and a 36% decrease in net profit
RPC typical framework
Redis6 note02 configuration file, publish and subscribe, new data type, jedis operation
Double buffer transparent encryption and decryption driven course paper + project source code based on minifilter framework
Redis6笔记02 配置文件,发布和订阅,新数据类型,Jedis操作
基于OpenStreetMap+PostGIS的地理位置系统 论文文档+参考论文文献+项目源码及数据库文件
時創能源沖刺科創板:擬募資11億 年營收7億淨利反降36%
Introduction to JVM principle
Explain websocket protocol in detail
Comment TCP gère - t - il les exceptions lors de trois poignées de main et de quatre vagues?
随机推荐
反应c语言程序结构特点的程序
数据库系列:MySQL索引优化总结(综合版)
Yisheng biological sprint scientific innovation board: 25% of the revenue comes from the sales of new crown products, and it is planned to raise 1.1 billion yuan
Handler、Message、Looper、MessageQueue
Free access to the global human settlements layer (ghsl) dataset from Gee
Keywords serializable serialization and deserialization
基于C语言的图书信息管理系统 课程论文+代码及可执行exe文件
CMU提出NLP新范式—重构预训练,高考英语交出134高分
Source code analysis of AQS & reentrantlock
Jincang database kingbasees plug-in force_ view
Introduction to socket UDP and TCP
Why distributed IDS? What are the distributed ID generation schemes?
牛客网:主持人调度
牛客网:旋转数组
Query method and interrupt method to realize USART communication
PHP如何提取字符串中的图片地址
Niuke.com: Candy distribution
C disk uses 100% cleaning method
某APP中模拟器检测分析
手机上股票开户安全吗?找谁可以开户啊?