当前位置:网站首页>Flink函数(2):CheckpointedFunction
Flink函数(2):CheckpointedFunction
2022-07-24 05:19:00 【sf_www】
要想使用Operator State(non-keyed state),可以实现CheckpointedFunction接口实现一个有状态的函数。
关键点:
1. CheckpointedFunction是stateful transformation functions的核心接口,用于跨stream维护state。虽然有更轻量级的接口存在(假如不实现该接口,代替方案,比如operator state可以实现ListCheckpointed,已经废弃;keyed state可以用RuntimeContext,而RuntimeContext出现在RichFunction中,所以可以实现RichFunction),但是该接口为管理keyed state和operator state提供了最大的灵活性。
2. snapshotState是在执行checkpoint的时候会被调用;initializeState是在每次用户定义的function初始化的时候(第一次初始化或者从前一次checkpoint recover的时候)被调用,该方法不仅可以用来初始化state,还可以用于处理state recovery的逻辑。
3. 对于manageed operator state,目前仅仅支持list-style的形式,即要求state是serializable objects的List结构,方便在rescale的时候进行redistributed;关于redistribution schemes的模式目前有两种,分别是Even-split redistribution(在restore/redistribution的时候每个operator仅仅得到整个state的sublist,即多parallel下)及Union redistribution(在restore/redistribution的时候每个operator得到整个state的完整list,状态值比较大时可能会报内存错误或rpc帧过大)
CheckpointedFunction提供了两个函数:
/**
* This method is called when a snapshot for a checkpoint is requested. This acts as a hook to
* the function to ensure that all state is exposed by means previously offered through {@link
* FunctionInitializationContext} when the Function was initialized, or offered now by {@link
* FunctionSnapshotContext} itself.
*
* @param context the context for drawing a snapshot of the operator
* @throws Exception Thrown, if state could not be created ot restored.
*/
void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
* This method is called when the parallel function instance is created during distributed
* execution. Functions typically set up their state storing data structures in this method.
*
* @param context the context for initializing the operator
* @throws Exception Thrown, if state could not be created ot restored.
*/
void initializeState(FunctionInitializationContext context) throws Exception;FunctionSnapshotContext继承了ManagedSnapshotContext接口,它定义了getCheckpointId、getCheckpointTimestamp方法;FunctionInitializationContext继承了ManagedInitializationContext接口,它定义了isRestored、getOperatorStateStore、getKeyedStateStore方法,可以用来判断是否是在前一次execution的snapshot中restored,以及获取OperatorStateStore、KeyedStateStore对象。
例子1:BufferingSink
下面是一个有状态的SinkFunction的例子,它使用CheckpointedFunction在将元素发送到外部之前对其进行缓冲。它演示了Even-split redistribution列表状态:
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}@Override
public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
// 发往sink 前 填充buffer 假如到达threshold了则发往sink 然后清除buffer
bufferedElements.add(value);
if (bufferedElements.size() >= threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 每次发出checkpoints前 需要从buffer里读出最新数据
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));//初始化 ListState
checkpointedState = context.getOperatorStateStore().getListState(descriptor);// 如果需要从checkpoints恢复 则将checkpoints里的元素添加到buffer
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}
例子2:
public class CountFunction<T> implements MapFunction<T, T>, CheckpointedFunction {
private ReducingState<Long> countPerKey;
private ListState<Long> countPerPartition;
private long localCount;
public void initializeState(FunctionInitializationContext context) throws Exception {
// get the state data structure for the per-key state
countPerKey = context.getKeyedStateStore().getReducingState(
new ReducingStateDescriptor<>("perKeyCount", new AddFunction<>(), Long.class));
// get the state data structure for the per-partition state
countPerPartition = context.getOperatorStateStore().getOperatorState(
new ListStateDescriptor<>("perPartitionCount", Long.class));
// initialize the "local count variable" based on the operator state
for (Long l : countPerPartition.get()) {
localCount += l;
}
}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// the keyed state is always up to date anyways
// just bring the per-partition state in shape
countPerPartition.clear();
countPerPartition.add(localCount);
}
public T map(T value) throws Exception {
// update the states
countPerKey.add(1L);
localCount++;
return value;
}
}
边栏推荐
- 读《悟道:一位IT高管20年的职场心经》
- Function_ This keyword
- 02 mobile terminal page adaptation
- String_ Method_ 01match method
- 响应式页面
- Function_ generalization
- 首届波卡黑客松项目「Manta Network」的进击之路
- Promise_ Async and await
- Inventory Poka ecological potential project | cross chain characteristics to promote the prosperity of multi track
- [Baidu map API] the version of the map JS API you are using is too low and no longer maintained. In order to ensure the normal use of the basic functions of the map, please upgrade to the latest versi
猜你喜欢

助力传统游戏转型GameFi,Web3Games推动游戏发展新航向

What is the function of key

首届波卡黑客松项目「Manta Network」的进击之路

Penetration testing knowledge - industry terminology

Variables and constants in C language

The repetition detection function of PHP multi line text content and count the number of repetitions

MySQL的分页你还在使劲的limit?

Flink sql-client.sh使用

canvas - 旋转

ODS、数据集市、数据仓库的异同点
随机推荐
String_ Method_ 01match method
助力传统游戏转型GameFi,Web3Games推动游戏发展新航向
select_ Render small phenomena
Flink sql-client.sh使用
去中心化的底层是共识——Polkadot 混合共识机制解读
【vsphere高可用】主机故障切换
【奖励发放】OneOS专区首届征文活动评奖结果公布
Promise_ Async and await
网页播放rtsp视频流
CESS 测试网上线!首个提供多元应用场景的去中心化存储网络
通用分页01
LP双币流动性质押挖矿系统逻辑开发分析
highcharts使用自定义矢量地图
Summary of data types
Principle of fusdt liquidity pledge mining development logic system
Logic development analysis of LP dual currency liquidity pledge mining system
Analysis of Dao liquidity dual currency pledge mining development principle
Inventory Poka ecological potential project | cross chain characteristics to promote the prosperity of multi track
盘点波卡生态潜力项目 | 跨链特性促进多赛道繁荣
mysql查询手机号码后四位,前几位怎么写?