当前位置:网站首页>Flink state使用
Flink state使用
2022-07-24 05:19:00 【sf_www】
简介
Flink相比其他流计算引擎,最大的优势就是号称是有状态的流计算。可见state在Flink中极其重要的位置。数据流是由一个个单独的事件按时间序列组合成的,虽然数据流中的许多操作一次只查看一个单独的事件(例如事件解析器,即不关注状态,不需要过往信息),但有些操作会跨多个事件记住信息(例如窗口操作符)。这些操作称为有状态操作。
下面是一些有状态的操作的使用场景:
1)对一个时间窗口内的数据进行聚合分析
2)在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数
3)数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重等
除了这些有用的应用场景外,state也是Flink使用checkpoints和savepoints实现容错的关键。
现在Flink正在慢慢实现让用户在运行时从Flink外部访问state,当然在发展中,可能api等都会改变,目前不是很稳定,不过未来应该是个不错的功能。
state的分类
state主要分为两类:Keyed State和Operator State
Keyed State
Keyed State只能用在KeyedStream上,所以要先形成KeyedStream(使用stream.keyBy(…))。
Flink的数据模型不是基于键值对的。因此,不需要将数据集类型物理地打包到键和值中。键是“虚拟的”:它们被定义为在实际数据上的函数,以指导分组操作符。
对于Keyed State,Flink提供了几种现成的数据结构供我们使用:ValueState<T>、ListState<T>、ReducingState<T>、AggregatingState<IN, OUT>、MapState<UK, UV>。要注意理解,上面的5种state类型都是表示stream keyBy 的 key的value的state类型。为了与 keyBy 的 key 进行区分,所以 Flink 中把 MapState 的 key、value 分别叫 UserKey、UserValue。
ValueState<T>:存储单一的值,即每个key只有一个值
ListState<T>:存储一个list,即每个key有一个list值
MapState<UK, UV>:存储一个map,即每个key有一个map值
ReducingState<T>和AggregatingState<IN, OUT>与ListState<T>同属于MergingState<T>。与ListState<T>不同的是,ReducingState<T>只有一个元素,而不是一个列表。它的原理是新元素通过add(T)加入后,与已有的状态元素使用ReduceFunction合并为一个元素,并更新到状态里。AggregatingState<IN, OUT>与ReducingState<T>类似,也只有一个元素,只不过AggregatingState<IN, OUT>的输入和输出类型可以不一样。ReducingState<T>和AggregatingState<IN, OUT>与窗口上进行ReduceFunction和AggregateFunction很像,都是将新元素与已有元素做聚合。
因为本身支持这么多类型的,所以不要用ValueState<T>去存list或者map这种数据类型,直接使用ListState和MapState效率会高很多。
State是通过RuntimeContext类获取的,所以使用State的地方就是rich functions,即实现RichFunction或其子接口,就可以获取State。在里面我们就可以通过StateTtlConfig设置State的TTL等。比如:
public class TTLCountMapFunction extends RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>> {
private transient ValueState<Long> state;
...
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(600))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("count-state", Long.class);
stateDescriptor.enableTimeToLive(ttlConfig);
state = getRuntimeContext().getState(stateDescriptor);
}
}
StateTtlConfig相关的设置可以查看官网链接。即datastream api中是通过StateTtlConfig设置,而在sql中是通过table.exec.state.ttl配置,默认值是0,表示状态永不过期。在table api中是StreamTableEnvironment.getConfig.setIdleStateRetention。
Flink 中 State 支持设置 TTL,TTL 只是将时间戳与 userValue 封装起来。
· MapState 的 TTL 是基于 UK 级别的
· ValueState 的 TTL 是基于整个 key 的
Operator State(non-keyed state)
Operator State是绑定到一个并行运算符实例(one parallel operator instance)的状态(即记录每个Task对应的状态值数据类型)。kafka connecttor是Flink中运算符状态使用的一个很好的示例。Kafka consumer的每个并行实例都维护一个主题分区和偏移的映射,作为其操作符状态。
在典型的有状态 Flink 应用程序中,你不需要Operator State。 它主要是一种特殊类型的状态,用于实现source/sink或你没有可以对状态进行分区的键的场景。
为了使用Operator State就得要实现CheckpointedFunction。请移步CheckpointedFunction说明。
Broadcast State是一种特殊的Operator State,有着特殊的应用场景,后续会说明如何使用,这里不再讲解。
state的存储
state的存储就是State Backends,在Flink1.13版本以前,老的分类是:MemoryStateBackend、FsStateBackend和RocksDBStateBackend。而在1.13版本以后分类就是:HashMapStateBackend和EmbeddedRocksDBStateBackend,再加上对应的storage。
下面列举新老对应关系:
MemoryStateBackend 相当于使用 HashMapStateBackend 和 JobManagerCheckpointStorage组合。存储位置:State: TaskManager 内存,Checkpoint: Jobmanager 内存。
#flink-conf.yaml配置
state.backend: hashmap
# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager
//java代码设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerStateBackend());
FsStateBackend 相当于使用 HashMapStateBackend 和 FileSystemCheckpointStorage。存储位置:State:Taskmanager 内存,Checkpoint: 外部文件系统( 本地或 HDFS )。
#flink-conf.yaml配置
state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
//java代码设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// Advanced FsStateBackend configurations, such as write buffer size
// can be set by manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
RocksDBStateBackend 相当于使用 EmbeddedRocksDBStateBackend 和 FileSystemCheckpointStorage。存储位置:State:rocksdb,Checkpoint: 外部文件系统(本地或 HDFS )。
#flink-conf.yaml配置
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
//java代码设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
使用建议
1. Keyed State如何清空state,state.clear() 实际上只能清理当前 key 对应的 value 值,如果想要清空整个 state,需要借助于 applyToAllKeys 方法。
2. Operator State慎重使用长list
参考官方文档中对state的介绍和使用的页面:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/
边栏推荐
- THREE——OrbitControls轨道控制器
- 如何强制卸载Google浏览器,不用担心Google打开为白板,亲测有效。
- Substrate 技术及生态6月大事记 | Polkadot Decoded 圆满落幕,黑客松获胜项目为生态注入新生力量
- 首届波卡黑客松项目「Manta Network」的进击之路
- Analysis of Dao liquidity dual currency pledge mining development principle
- haclabs: no_name(HL.ova)靶机渗透-Vulnhub
- Gavin wood, founder of Poka: what will happen to Poka governance V2?
- How can the multiple-choice and single choice results of PHP be displayed in the foreground?
- Vulnhub-Funbox: Rookie(Funbox2)靶机渗透
- 02 mobile terminal page adaptation
猜你喜欢
随机推荐
Canvas - rotate
自定义MVC 2.0
MySQL的分页你还在使劲的limit?
盘点波卡生态潜力项目 | 跨链特性促进多赛道繁荣
Substrate technology and ecology June memorabilia | Polkadot decoded came to a successful conclusion, and the hacker song winning project injected new forces into the ecosystem
Moonbeam orbiters program: provides a new way for collectors to participate in moonbeam and Moonriver
B站视频评论爬取——以鬼灭之刃为例(并将其存储到csv中)
MySQL之CRUD
Restore UI design draft
Cess test online line! The first decentralized storage network to provide multiple application scenarios
Fusdt流动性质押挖矿开发逻辑系统原理
How to export Excel files with php+mysql
如何强制卸载Google浏览器,不用担心Google打开为白板,亲测有效。
umi之define属性
Geoserver自动化上传Shapefile
Flink Format系列(1)-JSON
The profound meaning of unlimited ecological development in Poka -- Multidimensional Interpretation of parallel chain
【vsphere高可用】虚拟机的重置和重启
函数多种类型
Scarcity in Web3: how to become a winner in a decentralized world









