当前位置:网站首页>flink使MapState实现KeyedState
flink使MapState实现KeyedState
2022-07-22 22:40:00 【AokCap】
测试数据
辽宁省,沈阳市,1000
辽宁省,大连市,2000
辽宁省,沈阳市,1500
湖南省,长沙市,1200
湖南省,长沙市,1000
湖南省,常德市,4000
湖南省,常德市,3000
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;
/** * @Author: Zhang * @Description: * @Date: Created in 19:23 2021/12/25 * @Modified By: * */
public class MapStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpoint, 每10s进行一次checkpoint, 开启后默认使用无限重启策略
env.enableCheckpointing(10000);
//开启失败率重启策略
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per unit
Time.of(30, TimeUnit.SECONDS), //time interval for measuring failure rate
Time.of(3, TimeUnit.SECONDS) // delay
));
DataStreamSource<String> lines = env.socketTextStream("doitedu03", 8888);
SingleOutputStreamOperator<Tuple3<String,String,Integer>> tpStream = lines.flatMap(new FlatMapFunction<String, Tuple3<String,String,Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String,String,Integer>> collector) throws Exception {
String[] sp = value.split(",");
if (sp[0].equals("error")){
throw new RuntimeException("出现错误了!!!");
}
collector.collect(Tuple3.of(sp[0],sp[1],Integer.parseInt(sp[2])));
}
});
KeyedStream<Tuple3<String, String, Integer>, String> keyed = tpStream.keyBy(t -> t.f0);
SingleOutputStreamOperator<Tuple3<String, String, Integer>> process = keyed.process(new KeyedProcessFunction<String, Tuple3<String, String, Integer>, Tuple3<String, String, Integer>>() {
private transient MapState<String, Integer> mapState;
@Override
public void open(Configuration parameters) throws Exception {
//先定义一个状态描述器
MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, Integer.class);
//初始化或者恢复历史状态
mapState = getRuntimeContext().getMapState(mapStateDescriptor);
}
@Override
public void processElement(Tuple3<String, String, Integer> input, Context context, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
String city = input.f1;
Integer money = input.f2;
Integer historyMoney = mapState.get(city);
if (historyMoney == null) {
historyMoney = 0;
}
Integer totalMoney = money + historyMoney;
//更新到state中
mapState.put(city, totalMoney);
//输出
input.f2 = totalMoney;
collector.collect(input);
}
});
process.print();
env.execute();
}
}
边栏推荐
猜你喜欢

C language decimal number to binary number

多商户系统的直播功能用过吗?用过的朋友扣个 666!

Redis事务与锁机制

I can't be angry with "voluntary salary reduction". I'm naked. I'm three times in four days. How can it end like this?

Web资源共享

Matlab保存数据到csv文件的方法分享

将childNodes返回的伪数组转化为真数组

第三章 栈

【JS 逆向百例】某公共资源交易网,公告 URL 参数逆向分析

为什么有的人把代码写的如此复杂?
随机推荐
【读书笔记->统计学】12-01 置信区间的构建-置信区间概念简介
“蔚来杯“2022牛客暑期多校训练营1
Understand the interrupt system in STM32 in simple terms -- from principle to simple engineering examples -- nanny level tutorial
Organizational structure of agile testing team
三种缓存策略:Cache Aside 策略、Read/Write Through 策略、Write Back 策略
Introduction to JVM monitoring tools jstack, jconsole, Jinfo, jmap, JDB, jstat
张宇高数30讲总结
1.10 API and string
uni-app进阶之内嵌应用【day14】
C language decimal number to binary number
目标检测之锚点与锚框
开发者分享|『啃书吧:深度学习与MindSpore实践』第一章
I can't be angry with "voluntary salary reduction". I'm naked. I'm three times in four days. How can it end like this?
The boss asked me to do an IP territorial function and an open source library!
敏捷测试团队组织构成
H7-TOOL串口脱机烧录操作说明,支持TTL串口,RS232和RS485(2022-06-30)
This is not a true sense of the meta universe, which should have its own distinctive characteristics and unique development logic
Flink原理初探和流批一体API(二)v2
如何用C语言实现简单职工信息管理系统
93.(leaflet篇)leaflet态势标绘-进攻方向修改