当前位置:网站首页>Flink advanced features and new features (VIII) V2

Flink advanced features and new features (VIII) V2

2022-07-24 13:45:00 Hua Weiyun

Double current JOIN

  • Multiple data streams DataStream Carry out between JOIN operation

  • Double current JOIN There are two main categories : Window Window join, Interval Of join

  • Window window It is divided into tumbling window , sliding window , session window

  • Interval Include next , last

    image-20210624093501057

  • demand

    Order details and goods list every 5 One window in seconds JOIN , Place the result on the ground and print it out

    • Development steps

      package cn.itcast.flink.broadcast;import com.alibaba.fastjson.JSON;import lombok.Data;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import java.math.BigDecimal;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;/** * Author itcast * Date 2021/6/24 9:40 * Desc TODO */public class JoinDemo {    public static void main(String[] args) throws Exception {        //  Create a flow execution environment         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        //  Build commodity data flow         SingleOutputStreamOperator<Goods> goodsSource = env.addSource(new GoodsSource()).assignTimestampsAndWatermarks(new GoodsWatermark());        //  Build order detail data flow         SingleOutputStreamOperator<OrderItem> orderItemSource = env.addSource(new OrderItemSource()).assignTimestampsAndWatermarks(new OrderItemWatermark());        //  The order sheet  join  Commodity list   The order sheet .goodsId=== Commodity list .goodsId        DataStream<FactOrderItem> result = orderItemSource.join(goodsSource)                .where(o -> o.goodsId)                .equalTo(g -> g.goodsId)                ///  The window is a scrolling window  5  second                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))                /// apply  Realization  (OrderItem first, Goods second) -> factOrderItem                .apply((OrderItem first, Goods second) -> {                    FactOrderItem factOrderItem = new FactOrderItem();                    factOrderItem.setGoodsId(first.goodsId);                    factOrderItem.setGoodsName(second.goodsName);                    factOrderItem.setCount(new BigDecimal(first.count));                    factOrderItem.setTotalMoney(new BigDecimal(first.count).multiply(second.goodsPrice));                    return factOrderItem;                });        // Printout         result.print();        // execution environment         env.execute();    }    // Commodity entity class     @Data    public static class Goods {        private String goodsId;        private String goodsName;        private BigDecimal goodsPrice;        public static List<Goods> GOODS_LIST;        public static Random r;        static  {            r = new Random();            GOODS_LIST = new ArrayList<>();            GOODS_LIST.add(new Goods("1", " millet 12", new BigDecimal(4890)));            GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));            GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));            GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));            GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));            GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));        }        public static Goods randomGoods() {            int rIndex = r.nextInt(GOODS_LIST.size());            return GOODS_LIST.get(rIndex);        }        public Goods() {        }        public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {            this.goodsId = goodsId;            this.goodsName = goodsName;            this.goodsPrice = goodsPrice;        }        @Override        public String toString() {            return JSON.toJSONString(this);        }    }    // Order details entity class     @Data    public static class OrderItem {        private String itemId;        private String goodsId;        private Integer count;        @Override        public String toString() {            return JSON.toJSONString(this);        }    }    // Correlation results , Entity table of landing table     @Data    public static class FactOrderItem {        private String goodsId;        private String goodsName;        private BigDecimal count;        private BigDecimal totalMoney;        @Override        public String toString() {            return JSON.toJSONString(this);        }    }    // Build a commodity Stream Source ( This is like a dimension table )    public static class GoodsSource extends RichSourceFunction<Goods> {        private Boolean isCancel;        @Override        public void open(Configuration parameters) throws Exception {            isCancel = false;        }        @Override        public void run(SourceContext sourceContext) throws Exception {            while(!isCancel) {                Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));                TimeUnit.SECONDS.sleep(1);            }        }        @Override        public void cancel() {            isCancel = true;        }    }    // Build order details Stream Source     public static class OrderItemSource extends RichSourceFunction<OrderItem>    {        private Boolean isCancel;        private Random r;        @Override        public void open(Configuration parameters) throws Exception {            isCancel = false;            r = new Random();        }        @Override        public void run(SourceContext sourceContext) throws Exception {            while(!isCancel) {                Goods goods = Goods.randomGoods();                OrderItem orderItem = new OrderItem();                orderItem.setGoodsId(goods.getGoodsId());                orderItem.setCount(r.nextInt(10) + 1);                orderItem.setItemId(UUID.randomUUID().toString());                sourceContext.collect(orderItem);                orderItem.setGoodsId("111");                sourceContext.collect(orderItem);                TimeUnit.SECONDS.sleep(1);            }        }        @Override        public void cancel() {            isCancel = true;        }    }    // Build watermark allocator ( Here for simplicity ), Use the system time directly     public static class GoodsWatermark implements WatermarkStrategy<Goods> {        @Override        public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {            return (element, recordTimestamp) -> System.currentTimeMillis();        }        @Override        public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {            return new WatermarkGenerator<Goods>() {                @Override                public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {                    output.emitWatermark(new Watermark(System.currentTimeMillis()));                }                @Override                public void onPeriodicEmit(WatermarkOutput output) {                    output.emitWatermark(new Watermark(System.currentTimeMillis()));                }            };        }    }    public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {        @Override        public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {            return (element, recordTimestamp) -> System.currentTimeMillis();        }        @Override        public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {            return new WatermarkGenerator<OrderItem>() {                @Override                public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {                    output.emitWatermark(new Watermark(System.currentTimeMillis()));                }                @Override                public void onPeriodicEmit(WatermarkOutput output) {                    output.emitWatermark(new Watermark(System.currentTimeMillis()));                }            };        }    }}
  • demand

    Associate commodity data with order details , interval , last 0( It doesn't contain ), next -1( contain ), Statistics and landing

  • Development steps

    package cn.itcast.flink.broadcast;import com.alibaba.fastjson.JSON;import lombok.Data;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;import java.math.BigDecimal;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;/** * Author itcast * Desc */public class JoinDemo02 {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //  Build commodity data flow         DataStream<Goods> goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class)).assignTimestampsAndWatermarks(new GoodsWatermark());        //  Build order detail data flow         DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderItemWatermark());        //  Make association query         SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId())                .intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId()))                .between(Time.seconds(-1), Time.seconds(0))                // The opening range of the last session , Exclude the last  [-1,0)                .upperBoundExclusive()                .process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() {                    @Override                    public void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception {                        FactOrderItem factOrderItem = new FactOrderItem();                        factOrderItem.setGoodsId(right.getGoodsId());                        factOrderItem.setGoodsName(right.getGoodsName());                        factOrderItem.setCount(new BigDecimal(left.getCount()));                        factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));                        out.collect(factOrderItem);                    }                });        factOrderItemDS.print();        env.execute("Interval JOIN");    }    // Commodity type     @Data    public static class Goods {        private String goodsId;        private String goodsName;        private BigDecimal goodsPrice;        public static List<Goods> GOODS_LIST;        public static Random r;        static  {            r = new Random();            GOODS_LIST = new ArrayList<>();            GOODS_LIST.add(new Goods("1", " millet 12", new BigDecimal(4890)));            GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));            GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));            GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));            GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));            GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));        }        public static Goods randomGoods() {            int rIndex = r.nextInt(GOODS_LIST.size());            return GOODS_LIST.get(rIndex);        }        public Goods() {        }        public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {            this.goodsId = goodsId;            this.goodsName = goodsName;            this.goodsPrice = goodsPrice;        }        @Override        public String toString() {            return JSON.toJSONString(this);        }    }    // Order details     @Data    public static class OrderItem {        private String itemId;        private String goodsId;        private Integer count;        @Override        public String toString() {            return JSON.toJSONString(this);        }    }    // Correlation results     @Data    public static class FactOrderItem {        private String goodsId;        private String goodsName;        private BigDecimal count;        private BigDecimal totalMoney;        @Override        public String toString() {            return JSON.toJSONString(this);        }    }    // Build a commodity Stream Source ( This is like a dimension table )    public static class GoodsSource11 extends RichSourceFunction {        private Boolean isCancel;        @Override        public void open(Configuration parameters) throws Exception {            isCancel = false;        }        @Override        public void run(SourceContext sourceContext) throws Exception {            while(!isCancel) {                Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));                TimeUnit.SECONDS.sleep(1);            }        }        @Override        public void cancel() {            isCancel = true;        }    }    // Build order details Stream Source     public static class OrderItemSource extends RichSourceFunction {        private Boolean isCancel;        private Random r;        @Override        public void open(Configuration parameters) throws Exception {            isCancel = false;            r = new Random();        }        @Override        public void run(SourceContext sourceContext) throws Exception {            while(!isCancel) {                Goods goods = Goods.randomGoods();                OrderItem orderItem = new OrderItem();                orderItem.setGoodsId(goods.getGoodsId());                orderItem.setCount(r.nextInt(10) + 1);                orderItem.setItemId(UUID.randomUUID().toString());                sourceContext.collect(orderItem);                orderItem.setGoodsId("111");                sourceContext.collect(orderItem);                TimeUnit.SECONDS.sleep(1);            }        }        @Override        public void cancel() {            isCancel = true;        }    }    // Build watermark allocator ( Here for simplicity ), Use the system time directly     public static class GoodsWatermark implements WatermarkStrategy<Goods> {        @Override        public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {            return (element, recordTimestamp) -> System.currentTimeMillis();        }        @Override        public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {            return new WatermarkGenerator<Goods>() {                @Override                public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {                    output.emitWatermark(new Watermark(System.currentTimeMillis()));                }                @Override                public void onPeriodicEmit(WatermarkOutput output) {                    output.emitWatermark(new Watermark(System.currentTimeMillis()));                }            };        }    }    public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {        @Override        public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {            return (element, recordTimestamp) -> System.currentTimeMillis();        }        @Override        public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {            return new WatermarkGenerator<OrderItem>() {                @Override                public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {                    output.emitWatermark(new Watermark(System.currentTimeMillis()));                }                @Override                public void onPeriodicEmit(WatermarkOutput output) {                    output.emitWatermark(new Watermark(System.currentTimeMillis()));                }            };        }    }}

Streaming File Sink

  • Sink to ground

  • Sink classification

    1. sink MySQL
    2. sink Kafka
    3. sink Redis
    4. sink Console
  • Sink Landing on the distributed file system HDFS On

  • Sink To file system Streaming File Sink Landing application scenarios

    1. Real time data warehouse
    2. Hourly data analysis etc.
    3. Extract data
  • demand

    By means of socket Data flow will be timed 2 Seconds written to hdfs On .

  • Development steps

package cn.itcast.flink.broadcast;import org.apache.flink.api.common.serialization.SimpleStringEncoder;import org.apache.flink.api.common.time.Time;import org.apache.flink.core.fs.Path;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;/** * Author itcast * Date 2021/6/24 10:52 * Desc TODO */public class StreamingFileSinkDemo {    public static void main(String[] args) throws Exception {        //1. Initialize the stream computing runtime environment         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        //2. Set up Checkpoint(10s) Start periodically   and  stateBackend  Storage path         // Sink Ensure only one semantic use  checkpoint  and   Second paragraph submission         env.enableCheckpointing(10000);        env.setStateBackend(new FsStateBackend("file:///d:/chk/"));        //4. Access socket data source , get data         DataStreamSource<String> source = env.socketTextStream("node1", 9999);        //5. establish Streamingfilesink object         OutputFileConfig config = OutputFileConfig                .builder()                .withPartPrefix("crm")                .withPartSuffix(".txt")                .build();        //5-1.  Create output file configuration , Specify the output path  /FlinkStreamFileSink/parquet        StreamingFileSink sink = StreamingFileSink.forRowFormat(new Path("hdfs://node1:8020/FlinkStreamFileSink/parquet"),                new SimpleStringEncoder<String>("UTF-8"))                // sink-kafka new FlinkKafkaProducer                //5-2.StreamingFileSink  Line formatting  , withBucketAssigner->DateTimeBucketAssigner                .withBucketAssigner(new DateTimeBucketAssigner())                //withRollingPolicy ->  Default drum strategy                 .withRollingPolicy(DefaultRollingPolicy.builder()                        .withMaxPartSize(128 * 1024 * 1024)                        .withRolloverInterval(Time.seconds(2).toMilliseconds())                        .withInactivityInterval(Time.seconds(2).toMilliseconds())                        .build())        //withOutputFileConfig ->  Configuration of output file                 .withOutputFileConfig(config)                .build();        //6. Set output  sink        source.print();        source.addSink(sink).setParallelism(1);        //7. Perform tasks         env.execute();    }}

Sink Line formatting , withBucketAssigner->DateTimeBucketAssigner
.withBucketAssigner(new DateTimeBucketAssigner())
//withRollingPolicy -> Default drum strategy
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(128 * 1024 * 1024)
.withRolloverInterval(Time.seconds(2).toMilliseconds())
.withInactivityInterval(Time.seconds(2).toMilliseconds())
.build())
//withOutputFileConfig -> Configuration of output file
.withOutputFileConfig(config)
.build();
//6. Set output sink
source.print();
source.addSink(sink).setParallelism(1);
//7. Perform tasks
env.execute();
}
}

原网站

版权声明
本文为[Hua Weiyun]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/205/202207241336521094.html