当前位置:网站首页>Flink advanced features and new features (VIII) V2
Flink advanced features and new features (VIII) V2
2022-07-24 13:45:00 【Hua Weiyun】
Double current JOIN
Multiple data streams DataStream Carry out between JOIN operation
Double current JOIN There are two main categories : Window Window join, Interval Of join
Window window It is divided into tumbling window , sliding window , session window
Interval Include next , last

demand
Order details and goods list every 5 One window in seconds JOIN , Place the result on the ground and print it out
Development steps
package cn.itcast.flink.broadcast;import com.alibaba.fastjson.JSON;import lombok.Data;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import java.math.BigDecimal;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;/** * Author itcast * Date 2021/6/24 9:40 * Desc TODO */public class JoinDemo { public static void main(String[] args) throws Exception { // Create a flow execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // Build commodity data flow SingleOutputStreamOperator<Goods> goodsSource = env.addSource(new GoodsSource()).assignTimestampsAndWatermarks(new GoodsWatermark()); // Build order detail data flow SingleOutputStreamOperator<OrderItem> orderItemSource = env.addSource(new OrderItemSource()).assignTimestampsAndWatermarks(new OrderItemWatermark()); // The order sheet join Commodity list The order sheet .goodsId=== Commodity list .goodsId DataStream<FactOrderItem> result = orderItemSource.join(goodsSource) .where(o -> o.goodsId) .equalTo(g -> g.goodsId) /// The window is a scrolling window 5 second .window(TumblingEventTimeWindows.of(Time.seconds(5))) /// apply Realization (OrderItem first, Goods second) -> factOrderItem .apply((OrderItem first, Goods second) -> { FactOrderItem factOrderItem = new FactOrderItem(); factOrderItem.setGoodsId(first.goodsId); factOrderItem.setGoodsName(second.goodsName); factOrderItem.setCount(new BigDecimal(first.count)); factOrderItem.setTotalMoney(new BigDecimal(first.count).multiply(second.goodsPrice)); return factOrderItem; }); // Printout result.print(); // execution environment env.execute(); } // Commodity entity class @Data public static class Goods { private String goodsId; private String goodsName; private BigDecimal goodsPrice; public static List<Goods> GOODS_LIST; public static Random r; static { r = new Random(); GOODS_LIST = new ArrayList<>(); GOODS_LIST.add(new Goods("1", " millet 12", new BigDecimal(4890))); GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000))); GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000))); GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800))); GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200))); GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500))); } public static Goods randomGoods() { int rIndex = r.nextInt(GOODS_LIST.size()); return GOODS_LIST.get(rIndex); } public Goods() { } public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) { this.goodsId = goodsId; this.goodsName = goodsName; this.goodsPrice = goodsPrice; } @Override public String toString() { return JSON.toJSONString(this); } } // Order details entity class @Data public static class OrderItem { private String itemId; private String goodsId; private Integer count; @Override public String toString() { return JSON.toJSONString(this); } } // Correlation results , Entity table of landing table @Data public static class FactOrderItem { private String goodsId; private String goodsName; private BigDecimal count; private BigDecimal totalMoney; @Override public String toString() { return JSON.toJSONString(this); } } // Build a commodity Stream Source ( This is like a dimension table ) public static class GoodsSource extends RichSourceFunction<Goods> { private Boolean isCancel; @Override public void open(Configuration parameters) throws Exception { isCancel = false; } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } // Build order details Stream Source public static class OrderItemSource extends RichSourceFunction<OrderItem> { private Boolean isCancel; private Random r; @Override public void open(Configuration parameters) throws Exception { isCancel = false; r = new Random(); } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods goods = Goods.randomGoods(); OrderItem orderItem = new OrderItem(); orderItem.setGoodsId(goods.getGoodsId()); orderItem.setCount(r.nextInt(10) + 1); orderItem.setItemId(UUID.randomUUID().toString()); sourceContext.collect(orderItem); orderItem.setGoodsId("111"); sourceContext.collect(orderItem); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } // Build watermark allocator ( Here for simplicity ), Use the system time directly public static class GoodsWatermark implements WatermarkStrategy<Goods> { @Override public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Goods>() { @Override public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } } public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> { @Override public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<OrderItem>() { @Override public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } }}
demand
Associate commodity data with order details , interval , last 0( It doesn't contain ), next -1( contain ), Statistics and landing
Development steps
package cn.itcast.flink.broadcast;import com.alibaba.fastjson.JSON;import lombok.Data;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;import java.math.BigDecimal;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;/** * Author itcast * Desc */public class JoinDemo02 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Build commodity data flow DataStream<Goods> goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class)).assignTimestampsAndWatermarks(new GoodsWatermark()); // Build order detail data flow DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderItemWatermark()); // Make association query SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId()) .intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId())) .between(Time.seconds(-1), Time.seconds(0)) // The opening range of the last session , Exclude the last [-1,0) .upperBoundExclusive() .process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() { @Override public void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception { FactOrderItem factOrderItem = new FactOrderItem(); factOrderItem.setGoodsId(right.getGoodsId()); factOrderItem.setGoodsName(right.getGoodsName()); factOrderItem.setCount(new BigDecimal(left.getCount())); factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount()))); out.collect(factOrderItem); } }); factOrderItemDS.print(); env.execute("Interval JOIN"); } // Commodity type @Data public static class Goods { private String goodsId; private String goodsName; private BigDecimal goodsPrice; public static List<Goods> GOODS_LIST; public static Random r; static { r = new Random(); GOODS_LIST = new ArrayList<>(); GOODS_LIST.add(new Goods("1", " millet 12", new BigDecimal(4890))); GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000))); GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000))); GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800))); GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200))); GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500))); } public static Goods randomGoods() { int rIndex = r.nextInt(GOODS_LIST.size()); return GOODS_LIST.get(rIndex); } public Goods() { } public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) { this.goodsId = goodsId; this.goodsName = goodsName; this.goodsPrice = goodsPrice; } @Override public String toString() { return JSON.toJSONString(this); } } // Order details @Data public static class OrderItem { private String itemId; private String goodsId; private Integer count; @Override public String toString() { return JSON.toJSONString(this); } } // Correlation results @Data public static class FactOrderItem { private String goodsId; private String goodsName; private BigDecimal count; private BigDecimal totalMoney; @Override public String toString() { return JSON.toJSONString(this); } } // Build a commodity Stream Source ( This is like a dimension table ) public static class GoodsSource11 extends RichSourceFunction { private Boolean isCancel; @Override public void open(Configuration parameters) throws Exception { isCancel = false; } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } // Build order details Stream Source public static class OrderItemSource extends RichSourceFunction { private Boolean isCancel; private Random r; @Override public void open(Configuration parameters) throws Exception { isCancel = false; r = new Random(); } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods goods = Goods.randomGoods(); OrderItem orderItem = new OrderItem(); orderItem.setGoodsId(goods.getGoodsId()); orderItem.setCount(r.nextInt(10) + 1); orderItem.setItemId(UUID.randomUUID().toString()); sourceContext.collect(orderItem); orderItem.setGoodsId("111"); sourceContext.collect(orderItem); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } // Build watermark allocator ( Here for simplicity ), Use the system time directly public static class GoodsWatermark implements WatermarkStrategy<Goods> { @Override public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Goods>() { @Override public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } } public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> { @Override public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<OrderItem>() { @Override public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } }}
Streaming File Sink
Sink to ground
Sink classification
- sink MySQL
- sink Kafka
- sink Redis
- sink Console
Sink Landing on the distributed file system HDFS On
Sink To file system Streaming File Sink Landing application scenarios
- Real time data warehouse
- Hourly data analysis etc.
- Extract data
demand
By means of socket Data flow will be timed 2 Seconds written to hdfs On .
Development steps
package cn.itcast.flink.broadcast;import org.apache.flink.api.common.serialization.SimpleStringEncoder;import org.apache.flink.api.common.time.Time;import org.apache.flink.core.fs.Path;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;/** * Author itcast * Date 2021/6/24 10:52 * Desc TODO */public class StreamingFileSinkDemo { public static void main(String[] args) throws Exception { //1. Initialize the stream computing runtime environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2. Set up Checkpoint(10s) Start periodically and stateBackend Storage path // Sink Ensure only one semantic use checkpoint and Second paragraph submission env.enableCheckpointing(10000); env.setStateBackend(new FsStateBackend("file:///d:/chk/")); //4. Access socket data source , get data DataStreamSource<String> source = env.socketTextStream("node1", 9999); //5. establish Streamingfilesink object OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("crm") .withPartSuffix(".txt") .build(); //5-1. Create output file configuration , Specify the output path /FlinkStreamFileSink/parquet StreamingFileSink sink = StreamingFileSink.forRowFormat(new Path("hdfs://node1:8020/FlinkStreamFileSink/parquet"), new SimpleStringEncoder<String>("UTF-8")) // sink-kafka new FlinkKafkaProducer //5-2.StreamingFileSink Line formatting , withBucketAssigner->DateTimeBucketAssigner .withBucketAssigner(new DateTimeBucketAssigner()) //withRollingPolicy -> Default drum strategy .withRollingPolicy(DefaultRollingPolicy.builder() .withMaxPartSize(128 * 1024 * 1024) .withRolloverInterval(Time.seconds(2).toMilliseconds()) .withInactivityInterval(Time.seconds(2).toMilliseconds()) .build()) //withOutputFileConfig -> Configuration of output file .withOutputFileConfig(config) .build(); //6. Set output sink source.print(); source.addSink(sink).setParallelism(1); //7. Perform tasks env.execute(); }}Sink Line formatting , withBucketAssigner->DateTimeBucketAssigner
.withBucketAssigner(new DateTimeBucketAssigner())
//withRollingPolicy -> Default drum strategy
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(128 * 1024 * 1024)
.withRolloverInterval(Time.seconds(2).toMilliseconds())
.withInactivityInterval(Time.seconds(2).toMilliseconds())
.build())
//withOutputFileConfig -> Configuration of output file
.withOutputFileConfig(config)
.build();
//6. Set output sink
source.print();
source.addSink(sink).setParallelism(1);
//7. Perform tasks
env.execute();
}
}
边栏推荐
- Data formatting widget
- Network security - function bypass injection
- Network security - file upload penetration test
- Realize a JS lottery?
- Network security - Web penetration testing
- Network security - Web information collection
- Statistical table of competition time and host school information of 2022 national vocational college skills competition (the second batch)
- R语言使用epiDisplay包的statStack函数基于因子变量通过分层的方式查看连续变量的统计量(均值、中位数等)以及对应的假设检验、对连续数据进行对数化之后符合参数检验条件自动执行参数检验
- XSS white list
- 【无标题】rhcsa第一次作业
猜你喜欢
随机推荐
Flink综合案例(九)
CSP2021 T1 廊桥分配
R语言使用epiDisplay包的dotplot函数通过点图的形式可视化不同区间数据点的频率、使用by参数指定分组参数可视化不同分组的点图分布、使用cex.Y.axis参数指定Y轴分组标签文本的大小
使用activiti创建数据库表报错
R语言使用sort函数排序向量数据实战、返回实际排序后的数据(默认升序)
Wildcard (Pan domain name) SSL certificate
网络安全——报错注入
RHCE first operation
Editor formula
Kunyu installation details
Rhcsa sixth note
How to quickly wrap lines in Excel table
Adjust the array order so that odd numbers precede even numbers
Nmap安全测试工具使用教程
CSP2021 T3 回文
Dtcloud uses custom fonts
Group intelligence decision-making in an open environment: concepts, challenges and leading technologies
通配符(泛域名)SSL证书
Ansible服务常用命令模块详细解析
Ggarrange function of R language ggpubr package combines multiple images and annotates them_ Figure add annotation, annotation, annotation information for the combined image, and add annotation inform










