当前位置:网站首页>Flink state use
Flink state use
2022-07-24 06:26:00 【sf_ www】
brief introduction
Flink Compared with other stream computing engines , The biggest advantage is the so-called stateful flow computing . so state stay Flink A very important position in . Data flow is composed of individual events in time series , Although many operations in the data flow view only one single event at a time ( For example, event parser , That is, don't pay attention to the state , No past information is required ), But some operations remember information across multiple events ( For example, window operators ). These operations are called stateful operations .
Here are some usage scenarios of stateful operations :
1) Aggregate and analyze the data in a time window
2) Online machine learning scenario , The model parameters of machine learning need to be constantly updated according to the new incoming data
3) There are duplicate data in the data stream , We want to de duplicate duplicate data , You need to record which data has flowed into the application , When new data comes in , Judge the weight removal according to the data that has flowed in
In addition to these useful application scenarios ,state It's also Flink Use checkpoints and savepoints The key to fault tolerance .
Now? Flink It is slowly being implemented to allow users to run from Flink External access state, Of course, in development , Probably api It will change later , At present, it is not very stable , But it should be a good function in the future .
state The classification of
state There are two main categories :Keyed State and Operator State
Keyed State
Keyed State Can only be used in KeyedStream On , So we should first form KeyedStream( Use stream.keyBy(…)).
Flink Our data model is not based on key value pairs . therefore , There is no need to physically package dataset types into keys and values . The key is “ Virtual ”: They are defined as functions on actual data , To guide the grouping operators .
about Keyed State,Flink Several ready-made data structures are provided for us to use :ValueState<T>、ListState<T>、ReducingState<T>、AggregatingState<IN, OUT>、MapState<UK, UV>. Pay attention to understanding , above 5 Kind of state Types all represent stream keyBy Of key Of value Of state type . In order to keyBy Of key Distinguish , therefore Flink Zhongba MapState Of key、value Respectively called UserKey、UserValue.
ValueState<T>: Store a single value , each key Only one value
ListState<T>: Store a list, each key There is one list value
MapState<UK, UV>: Store a map, each key There is one map value
ReducingState<T> and AggregatingState<IN, OUT> And ListState<T> Belong to MergingState<T>. And ListState<T> The difference is ,ReducingState<T> There's only one element , Not a list . Its principle is that new elements pass through add(T) After joining , Use with existing state elements ReduceFunction Merge into one element , And update it to the status .AggregatingState<IN, OUT> And ReducingState<T> similar , There is only one element , It's just AggregatingState<IN, OUT> The input and output types of can be different .ReducingState<T> and AggregatingState<IN, OUT> With the window ReduceFunction and AggregateFunction It's like , It is the aggregation of new elements and existing elements .
Because it supports so many types , So don't use ValueState<T> To deposit list perhaps map This data type , Use it directly ListState and MapState It's a lot more efficient .
State It's through RuntimeContext Class gets , So use State That's where rich functions, I.e. implementation RichFunction Or its sub interface , You can get it State. Inside, we can pass StateTtlConfig Set up State Of TTL etc. . such as :
public class TTLCountMapFunction extends RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>> {
private transient ValueState<Long> state;
...
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(600))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("count-state", Long.class);
stateDescriptor.enableTimeToLive(ttlConfig);
state = getRuntimeContext().getState(stateDescriptor);
}
}
StateTtlConfig You can check the official website link for relevant settings . namely datastream api China is through StateTtlConfig Set up , And in the sql China is through table.exec.state.ttl To configure , The default value is 0, Indicates that the status never expires . stay table api Medium is StreamTableEnvironment.getConfig.setIdleStateRetention.
Flink in State Support settings TTL,TTL Just put the time stamp with userValue encapsulated .
· MapState Of TTL Is based on UK Grade
· ValueState Of TTL It's based on the whole key Of
Operator State(non-keyed state)
Operator State Is bound to a parallel operator instance (one parallel operator instance) The state of ( Record each Task Corresponding status value data type ).kafka connecttor yes Flink A good example of the use of operator states in .Kafka consumer Each parallel instance of maintains a mapping of topic partitions and offsets , As its operator state .
In a typical state Flink In the application , You don't need to Operator State. It is mainly a special type of state , Used to implement source/sink Or you don't have keys that can partition States .
In order to use Operator State We have to realize CheckpointedFunction. Please move CheckpointedFunction explain .
Broadcast State It's a special kind Operator State, There are special application scenarios , The following will explain how to use , I won't explain it here .
state The storage
state Your storage is State Backends, stay Flink1.13 Version before , The old classification is :MemoryStateBackend、FsStateBackend and RocksDBStateBackend. And in the 1.13 Classification after version is :HashMapStateBackend and EmbeddedRocksDBStateBackend, Add the corresponding storage.
The new old correspondence is listed below :
MemoryStateBackend It is equivalent to using HashMapStateBackend and JobManagerCheckpointStorage Combine . Storage location :State: TaskManager Memory ,Checkpoint: Jobmanager Memory .
#flink-conf.yaml To configure
state.backend: hashmap
# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager
//java Code settings
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerStateBackend());
FsStateBackend It is equivalent to using HashMapStateBackend and FileSystemCheckpointStorage. Storage location :State:Taskmanager Memory ,Checkpoint: External file system ( Local or HDFS ).
#flink-conf.yaml To configure
state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
//java Code settings
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// Advanced FsStateBackend configurations, such as write buffer size
// can be set by manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
RocksDBStateBackend It is equivalent to using EmbeddedRocksDBStateBackend and FileSystemCheckpointStorage. Storage location :State:rocksdb,Checkpoint: External file system ( Local or HDFS ).
#flink-conf.yaml To configure
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
//java Code settings
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
Use advice
1. Keyed State How to empty state,state.clear() In fact, we can only clean up the current key Corresponding value value , If you want to empty the whole state, It needs help from applyToAllKeys Method .
2. Operator State Careful use of long list
Refer to the official documents for state Introduction and pages used :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/
边栏推荐
- Tensorflow GPU installation -- 056
- UE4 random generation of items
- Dameng database_ Small matters needing attention during use
- Map the intranet to the public network [no public IP required]
- IP notes (8)
- IP笔记(12)
- Dameng database_ Supported table types, usage, characteristics
- IP notes (7)
- IP课笔记(4)
- Unity shader: realize diffuse reflection and specular reflection
猜你喜欢

UE4 reload system 1. basic principle of reload system

jz47 礼物的最大价值(动态规划思路)

Leetcode refers to the duplicate number in the offer jz3 array
![Remote connection to Qunhui NAS at home [no public IP, free intranet penetration]](/img/bf/cda5a28f0aabb28b2fc56a79480347.png)
Remote connection to Qunhui NAS at home [no public IP, free intranet penetration]

IP class notes (5)

【226】wireshark的参数使用说明

项目上复盘引导问题清单
![Quickly and simply set up FTP server, and achieve public network access through intranet [no need for public IP]](/img/2a/43ba2839b842e0901a550d2883b883.png)
Quickly and simply set up FTP server, and achieve public network access through intranet [no need for public IP]

【无需公网IP】为远程桌面树莓派配置固定的公网TCP端口地址

【222】内存溢出及定位
随机推荐
IP笔记(9)
Flink restart policy
Getting started with Lunix commands - user and file permissions (Chmod details)
Leetcode refers to the duplicate number in the offer jz3 array
Simple three-step fast intranet penetration
Batch operation of generating MySQL statements from Excel
第二周作业
公网访问内网IIS网站服务器【无需公网IP】
IP class notes (5)
NTP error: no server suitable for synchronization found
【217】#!/usr/bin/env 的意义
leetcode剑指offer JZ23:链表中环的入口节点
Use intranet penetration to realize public network access to the Intranet
Using keras to realize multidimensional (multivariable) time series prediction of cnn+bilstm+attention
IA笔记 1
Leetcode sword finger offer jz23: the entry node of the link in the linked list
UE4: what is the gameplay framework
Unity (III) three dimensional mathematics and coordinate system
Unity shader migrated from built-in rendering pipeline to URP
Set up a WordPress personal blog locally and launch it through the intranet (22)