当前位置:网站首页>Flink高级特性和新特性(八)v2
Flink高级特性和新特性(八)v2
2022-07-24 13:37:00 【华为云】
双流 JOIN
多个数据流 DataStream 之间进行 JOIN 操作
双流 JOIN 分为两大类: Window 窗口的join, Interval 的 join
Window窗口 分为 tumbling 窗口, sliding 窗口, session 窗口
Interval 包括 下届, 上届

需求
订单明细表和商品表每 5 秒中进行一个窗口 JOIN , 将结果落地并打印输出
开发步骤
package cn.itcast.flink.broadcast;import com.alibaba.fastjson.JSON;import lombok.Data;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import java.math.BigDecimal;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;/** * Author itcast * Date 2021/6/24 9:40 * Desc TODO */public class JoinDemo { public static void main(String[] args) throws Exception { // 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 构建商品数据流 SingleOutputStreamOperator<Goods> goodsSource = env.addSource(new GoodsSource()).assignTimestampsAndWatermarks(new GoodsWatermark()); // 构建订单明细数据流 SingleOutputStreamOperator<OrderItem> orderItemSource = env.addSource(new OrderItemSource()).assignTimestampsAndWatermarks(new OrderItemWatermark()); // 订单表 join 商品表 订单表.goodsId===商品表.goodsId DataStream<FactOrderItem> result = orderItemSource.join(goodsSource) .where(o -> o.goodsId) .equalTo(g -> g.goodsId) /// 窗口为滚动窗口 5 秒 .window(TumblingEventTimeWindows.of(Time.seconds(5))) /// apply 实现 (OrderItem first, Goods second) -> factOrderItem .apply((OrderItem first, Goods second) -> { FactOrderItem factOrderItem = new FactOrderItem(); factOrderItem.setGoodsId(first.goodsId); factOrderItem.setGoodsName(second.goodsName); factOrderItem.setCount(new BigDecimal(first.count)); factOrderItem.setTotalMoney(new BigDecimal(first.count).multiply(second.goodsPrice)); return factOrderItem; }); //打印输出 result.print(); //执行环境 env.execute(); } //商品类实体类 @Data public static class Goods { private String goodsId; private String goodsName; private BigDecimal goodsPrice; public static List<Goods> GOODS_LIST; public static Random r; static { r = new Random(); GOODS_LIST = new ArrayList<>(); GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890))); GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000))); GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000))); GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800))); GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200))); GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500))); } public static Goods randomGoods() { int rIndex = r.nextInt(GOODS_LIST.size()); return GOODS_LIST.get(rIndex); } public Goods() { } public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) { this.goodsId = goodsId; this.goodsName = goodsName; this.goodsPrice = goodsPrice; } @Override public String toString() { return JSON.toJSONString(this); } } //订单明细实体类 @Data public static class OrderItem { private String itemId; private String goodsId; private Integer count; @Override public String toString() { return JSON.toJSONString(this); } } //关联结果,落地表的实体表 @Data public static class FactOrderItem { private String goodsId; private String goodsName; private BigDecimal count; private BigDecimal totalMoney; @Override public String toString() { return JSON.toJSONString(this); } } //构建一个商品Stream源(这个好比就是维表) public static class GoodsSource extends RichSourceFunction<Goods> { private Boolean isCancel; @Override public void open(Configuration parameters) throws Exception { isCancel = false; } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } //构建订单明细Stream源 public static class OrderItemSource extends RichSourceFunction<OrderItem> { private Boolean isCancel; private Random r; @Override public void open(Configuration parameters) throws Exception { isCancel = false; r = new Random(); } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods goods = Goods.randomGoods(); OrderItem orderItem = new OrderItem(); orderItem.setGoodsId(goods.getGoodsId()); orderItem.setCount(r.nextInt(10) + 1); orderItem.setItemId(UUID.randomUUID().toString()); sourceContext.collect(orderItem); orderItem.setGoodsId("111"); sourceContext.collect(orderItem); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } //构建水印分配器(此处为了简单),直接使用系统时间了 public static class GoodsWatermark implements WatermarkStrategy<Goods> { @Override public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Goods>() { @Override public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } } public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> { @Override public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<OrderItem>() { @Override public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } }}
需求
将商品数据和订单明细数据进行关联,间隔,上届 0(不包含),下届 -1(包含),统计数据并落地
开发步骤
package cn.itcast.flink.broadcast;import com.alibaba.fastjson.JSON;import lombok.Data;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;import java.math.BigDecimal;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;/** * Author itcast * Desc */public class JoinDemo02 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 构建商品数据流 DataStream<Goods> goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class)).assignTimestampsAndWatermarks(new GoodsWatermark()); // 构建订单明细数据流 DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderItemWatermark()); // 进行关联查询 SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId()) .intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId())) .between(Time.seconds(-1), Time.seconds(0)) //上届的开区间,排除掉上届 [-1,0) .upperBoundExclusive() .process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() { @Override public void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception { FactOrderItem factOrderItem = new FactOrderItem(); factOrderItem.setGoodsId(right.getGoodsId()); factOrderItem.setGoodsName(right.getGoodsName()); factOrderItem.setCount(new BigDecimal(left.getCount())); factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount()))); out.collect(factOrderItem); } }); factOrderItemDS.print(); env.execute("Interval JOIN"); } //商品类 @Data public static class Goods { private String goodsId; private String goodsName; private BigDecimal goodsPrice; public static List<Goods> GOODS_LIST; public static Random r; static { r = new Random(); GOODS_LIST = new ArrayList<>(); GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890))); GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000))); GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000))); GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800))); GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200))); GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500))); } public static Goods randomGoods() { int rIndex = r.nextInt(GOODS_LIST.size()); return GOODS_LIST.get(rIndex); } public Goods() { } public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) { this.goodsId = goodsId; this.goodsName = goodsName; this.goodsPrice = goodsPrice; } @Override public String toString() { return JSON.toJSONString(this); } } //订单明细类 @Data public static class OrderItem { private String itemId; private String goodsId; private Integer count; @Override public String toString() { return JSON.toJSONString(this); } } //关联结果 @Data public static class FactOrderItem { private String goodsId; private String goodsName; private BigDecimal count; private BigDecimal totalMoney; @Override public String toString() { return JSON.toJSONString(this); } } //构建一个商品Stream源(这个好比就是维表) public static class GoodsSource11 extends RichSourceFunction { private Boolean isCancel; @Override public void open(Configuration parameters) throws Exception { isCancel = false; } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } //构建订单明细Stream源 public static class OrderItemSource extends RichSourceFunction { private Boolean isCancel; private Random r; @Override public void open(Configuration parameters) throws Exception { isCancel = false; r = new Random(); } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods goods = Goods.randomGoods(); OrderItem orderItem = new OrderItem(); orderItem.setGoodsId(goods.getGoodsId()); orderItem.setCount(r.nextInt(10) + 1); orderItem.setItemId(UUID.randomUUID().toString()); sourceContext.collect(orderItem); orderItem.setGoodsId("111"); sourceContext.collect(orderItem); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } //构建水印分配器(此处为了简单),直接使用系统时间了 public static class GoodsWatermark implements WatermarkStrategy<Goods> { @Override public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Goods>() { @Override public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } } public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> { @Override public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<OrderItem>() { @Override public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } }}
Streaming File Sink
Sink 落地
Sink 分类
- sink MySQL
- sink Kafka
- sink Redis
- sink 控制台
Sink 落地到分布式文件系统上 HDFS 上
Sink 到文件系统 Streaming File Sink 落地使用应用场景
- 实时数据仓库
- 小时级的数据分析 等
- 抽取数据
需求
通过在 socket 数据流中将数据定时 2秒钟写入到 hdfs 上。
开发步骤
package cn.itcast.flink.broadcast;import org.apache.flink.api.common.serialization.SimpleStringEncoder;import org.apache.flink.api.common.time.Time;import org.apache.flink.core.fs.Path;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;/** * Author itcast * Date 2021/6/24 10:52 * Desc TODO */public class StreamingFileSinkDemo { public static void main(String[] args) throws Exception { //1.初始化流计算运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.设置Checkpoint(10s)周期性启动 和 stateBackend 存储路径 // Sink保证仅一次语义使用 checkpoint 和 二段提交 env.enableCheckpointing(10000); env.setStateBackend(new FsStateBackend("file:///d:/chk/")); //4.接入socket数据源,获取数据 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //5.创建Streamingfilesink对象 OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("crm") .withPartSuffix(".txt") .build(); //5-1. 创建输出文件配置,指定输出路径 /FlinkStreamFileSink/parquet StreamingFileSink sink = StreamingFileSink.forRowFormat(new Path("hdfs://node1:8020/FlinkStreamFileSink/parquet"), new SimpleStringEncoder<String>("UTF-8")) // sink-kafka new FlinkKafkaProducer //5-2.StreamingFileSink 行格式化 , withBucketAssigner->DateTimeBucketAssigner .withBucketAssigner(new DateTimeBucketAssigner()) //withRollingPolicy -> 默认滚筒策略 .withRollingPolicy(DefaultRollingPolicy.builder() .withMaxPartSize(128 * 1024 * 1024) .withRolloverInterval(Time.seconds(2).toMilliseconds()) .withInactivityInterval(Time.seconds(2).toMilliseconds()) .build()) //withOutputFileConfig -> 输出文件的配置 .withOutputFileConfig(config) .build(); //6.设置输出 sink source.print(); source.addSink(sink).setParallelism(1); //7.执行任务 env.execute(); }}Sink 行格式化 , withBucketAssigner->DateTimeBucketAssigner
.withBucketAssigner(new DateTimeBucketAssigner())
//withRollingPolicy -> 默认滚筒策略
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(128 * 1024 * 1024)
.withRolloverInterval(Time.seconds(2).toMilliseconds())
.withInactivityInterval(Time.seconds(2).toMilliseconds())
.build())
//withOutputFileConfig -> 输出文件的配置
.withOutputFileConfig(config)
.build();
//6.设置输出 sink
source.print();
source.addSink(sink).setParallelism(1);
//7.执行任务
env.execute();
}
}
边栏推荐
- 基于典型相关分析的多视图学习方法综述
- Overview of multi view learning methods based on canonical correlation analysis
- 基于ABP实现DDD--实体创建和更新
- Can communication protocol (I)
- How to generate expected data? Emory University and others' latest "deep learning controllable data generation" review, 52 page PDF, covering 346 documents, comprehensively expounds the controllable g
- Easycvr platform security scanning prompt go pprof debugging information leakage solution
- 爱可可AI前沿推介(7.24)
- Happy number ~ ~ ~ (in fact, I'm not happy at all) & ugly number
- 网络安全——中间人攻击渗透测试
- 户外广告牌不能“想挂就挂”!广州城管部门加强户外广告安全管理
猜你喜欢

Odoo+ test

网络安全——文件上传黑名单绕过

Kunyu(坤舆) 安装 详解

Introduction of embedded network interface scheme and summary of driver debugging methods

网络安全——使用Evil Maid物理访问安全漏洞进行渗透

Sort method -- bubble sort (use an array to sort a string of numbers from large to small or from small to large)

Simulate the implementation of the library function memcpy-- copy memory blocks. Understand memory overlap and accurate replication in detail

Implementation of dynamic columns in EAS BOS doc list

简易订单管理系统小练习

Network security -- man in the middle attack penetration test
随机推荐
Odoo+ test
数据修改修改
Vscode configuration user code snippet (including deletion method)
Network security - Web penetration testing
Recommended idling tools | comprehensive comparison of 10 spatial transcriptome deconvolution tools
游戏思考04总结:针对帧、状态、物理同步的总结(之前写的太长,现在简略下)
Pointer advanced part (1)
vscode配置用户代码片段(包括删除方法)
Network security - function bypass injection
Summary of embedded network problems (packet loss of network card, unrecognized network card)
Network security -- man in the middle attack penetration test
DDD based on ABP -- Entity creation and update
ICML2022 | 分支强化学习
EAS environment structure directory
Question 10: find numbers in an array with rows and columns in order
On node embedding
网络安全——函数绕过注入
2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)v2
网络安全——文件上传渗透测试
深入浅出边缘云 | 2. 架构
