当前位置:网站首页>聊聊flink水位线
聊聊flink水位线
2022-06-22 03:19:00 【滴普科技】
1、概述
flink中比较重要的是时间和状态,学习flink的过程中对水位线的理解一直模糊,经过一段时间的消化,在此总结总结。本文主要把水位线是什么,怎么来的,有什么用描述清楚。
2、不太好理解的水位线
有些人喜欢把水位线叫成水印,不管是水印还是水位线,中文翻译过来一点都不贴切我们的生活,比较抽象,让人难得理解。在我们生活中水位线类似家中挂在墙上的一个挂钟,类似我们的手表。下面来聊聊如下的话题:
1,到底是如何产生。
2,既然是一个挂钟,钟表有哪些特点呢,钟表每隔1s秒针往前走一小步,时间是不是越来越大,这些特点水位线是不是也有呢。
3,挂钟有什么用处啊?晚上看看手表发现12点,我们肯定自我暗示:“应该睡觉了”,通过时间让我们知道什么时间该干什么事情。
3、什么叫水位线
3.1、水位线的定义
水位线就是一个逻辑时钟,为什么叫逻辑时钟?正常时间是有cpu产生的,周期而固定的往前走,但是我们这个时钟的时间是程序员计算出来,根据"事件时间"动态计算出来(至于什么是时间事件,有什么使用场景这里就不讲了),如某一时刻计算的结果为x,x值为2022-10-10 10:10:10对应的时间戳为1665367810000,x的值随着事件时间的变大而变大,可能的结果为x,x+1,x+2,x+3,x+4 … 连续的越来越大的时间戳是不是类似钟表每隔1s往前走一步呢。
3.2、水位线(逻辑时钟)的组成
水位线由一串连续的时间戳组成,越来越大,每个时间戳都是根据事件时间动态计算出来的。时钟也是由一连续的时间组成,也是越来越大,如2022-10-10 10:10:10,2022-10-10 10:10:11,2022-10-10 10:10:12,2022-10-10 10:10:13 。。。等,水位线就是类似生活中的时钟,所以我把这个水位线称为逻辑时钟,逻辑时钟就是水位线,水印机制。
3.3、逻辑时钟当前时间
类似时钟的当前时间,此处此刻为几点几分几秒,这个当前时间比较重要,窗口的闭合,定时任务的触发都是根据当前时间来判断的。
当前值特点:越来越大,流刚刚产生的时候插入负无穷大值,结束是插入正无穷大的值。
个人觉得这个当前值类似一个指针类型的变量,他的指向是不停的变化的(个人理解)。
3.4、当前时间的计算公式
时钟的"当前时间"对应一个具体的时间戳。时钟的当前值xxx = 事件时间 - 最大延迟时间 - 1毫秒。
3.5、来一个案例
案例描述:从socket读取数据,并打印当前水位的具体值。
package com.deepexi.sql;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class ExampleTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
//从socket读取数据
.socketTextStream("192.168.117.211", 9999)
.map(r -> Tuple2.of(r.split(" ")[0], Long.parseLong(r.split(" ")[1])))
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
//5s延迟时间
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
//提取事件时间
return element.f1;
}
})
)
//分流
.keyBy(r -> r.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Long>, String>() {
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {
out.collect("当前的水位线是:" + ctx.timerService().currentWatermark());
}
})
.print();
env.execute();
}
}
nc -lk 9999 开启socket服务,监听9999端口
命令行输入:a 1000
[[email protected] ~]# nc -lk 9999 a 1000
idea控制台打印
当前的水位线是:-9223372036854775808 //-9223372036854775808是一个无穷大的数字
命令行输入:a 2000
idea控制台打印:
当前的水位线是:-4001 //当前水位线的值 = 事件时间 - 最大延迟时间 -1 = 1000 - 5000 -1 = -4000
为什么用1000- 5000 -1而用2000 - 5000 -1? flink会周期往流中插入水位线,水位线也是流中的一个元素,还是看下图理解吧。
命令行输入:a 3000
idea控制台打印:当前的水位线是:-3001 //2000 - 5000 -1 = -2000
命令行输入:a 10000
idea控制台打印:当前的水位线是:-2001 //3000 - 5000 -1 = -2000
命令行输入:a 1000
idea控制台打印:当前的水位线是:4999 //10000 - 5000 -1 = 4999
命令行输入:a 1000
idea控制台打印:当前的水位线是:4999 //10000 - 5000 -1 = 4999
命令行输入:a 2000
idea控制台打印:当前的水位线是:4999 //10000 - 5000 -1 = 4999
通过控制台的打印结果发现水位线的和钟表一样,值总是越来越大的,随着事件时间的变化而变化,但是不会变小,也可能会停止某一刻,如输入a 1000后在输入a 1000,a 2000水位线的值始终是4999。
整个打印过程
命令行窗口:
[root@master ~]# nc -lk 9999
a 1000
a 2000
a 3000
a 10000
a 1000
a 1000
a 2000
idea打印:
当前的水位线是:-9223372036854775808
当前的水位线是:-4001
当前的水位线是:-3001
当前的水位线是:-2001
当前的水位线是:4999
当前的水位线是:4999
当前的水位线是:4999

4、如何产生的
水位线本质就是一个时间戳,这个时间戳是程序员根据事件时间动态计算出来,直接来一个案例吧。
案例1
自定义水位线的产生逻辑,实现WatermarkStrategy接口,flink会每隔200毫秒的调用onPeriodicEmit方法。
public class ExampleTest2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//设置每隔1分钟插入一次水位线
//env.getConfig().setAutoWatermarkInterval(6 * 1000L);
env
.socketTextStream("192.168.117.211", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(" ");
return Tuple2.of(arr[0], Long.parseLong(arr[1]));
}
})
.assignTimestampsAndWatermarks(new CustomWatermarkGenerator())
.print();
env.execute();
}
public static class CustomWatermarkGenerator implements WatermarkStrategy<Tuple2<String, Long>> {
@Override
public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
};
}
@Override
public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Tuple2<String, Long>>() {
// 最大延迟时间
private Long bound = 5000L;
private Long maxTs = -Long.MAX_VALUE + bound + 1L;
@Override
public void onEvent(Tuple2<String, Long> event, long eventTimestamp, WatermarkOutput output) {
//更新观察到的最大事件时间
maxTs = Math.max(maxTs, event.f1);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
System.out.println("水位线的值:" + (maxTs - bound - 1L));
// 发送水位线,计算公式:事件时间-延迟时间-1L
output.emitWatermark(new Watermark(maxTs - bound - 1L));
}
};
}
}
}
nc -lk 9999 开启socket服务,监听9999端口
启动idea,控制台每隔200毫秒打印结果:水位线的值:xxxxx。如下:
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
命令行输入:a 1000
控制台每隔200毫秒打印结果:水位线的值:xxxxx。如下:
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
命令行输入:a 2000
控制台每隔200毫秒打印接口:水位线的值:xxxxx。如下:
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
//默认200毫秒插入水位线到流,可以设置水位线的插入流的时间间隔
env.getConfig().setAutoWatermarkInterval(6 * 1000L);
整个打印过程
命令行窗口:
[root@master ~]# nc -lk 9999
a 1000
a 2000
idea打印:
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
(a,1000)
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
(a,2000)
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
Disconnected from the target VM, address: '127.0.0.1:58591', transport: 'socket'
水位线的值:-3001
Process finished with exit code 130
通过结果我们可以知道,水位线的值随着事件时间1000,2000的变化而变化。如果输入a 2000后在输入a 1000,控制台打印结果是怎样的?那肯定打印的是:水位线的值:-3001,因为水位线的值和时间一样永远只会越来越大。
案例2
改造一下程序,新增如下代码,keyby后,把命令行输入的元素打印出来。
nc -lk 9999启动socket监听9999端口
启动idea
命令行输入
[root@localhost ~]# nc -lk 9999
a 1000
a 2000
a 5000
a 6000
idea控制台打印:
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
输入业务数据是:(a,1000)
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
输入业务数据是:(a,2000)
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
输入业务数据是:(a,5000)
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
输入业务数据是:(a,6000)
水位线的值:999
水位线的值:999
水位线的值:999
水位线的值:999
水位线的值:999
分析计算结果结果:
-9223372036854775807,-9223372036854775807,(a,1000),-4001,-4001,-4001,-4001,-4001,-4001,-4001,-4001,(a,2000),-3001,-3001,-3001,-3001,-3001,(a,5000),-1,-1,-1,(a,6000),999,999,999,999
不知道大家有没有一种感觉,水位线和业务数据什么关系?是不是类似生活中落花和流水的关系呢?业务数据就是河流中的水,水位线就像落在水中的花,他们两一起流向大海,水位线和业务数据一样都属于流中的一个元素。
5、有什么用
在流的世界逻辑时钟就是一个参照物。还是挂钟来举例吧,看看挂钟已经12点了,我们肯定在会暗示自己该放下手机了要睡觉了。针对源源不断的数据流,把数据流拆分为多段进行处理,针对每段数据进行统计,那什么时候触发统计呢?这个时候就会用这个逻辑时钟,窗口看看逻辑时间当前处于几点钟,发现窗口结束时间小于时钟的时间,窗口闭合进行统计。
案例1,水位线触发定时任务的执行
功能描述:水位线的当前时间戳大于定时任务的的触发时间后 触发定时任务的执行。
public class ExampleTest3 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.socketTextStream("192.168.117.211", 9999)
.map(r -> Tuple2.of(r.split(" ")[0], Long.parseLong(r.split(" ")[1])))
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
})
)
.keyBy(r -> r.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Long>, String>() {
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {
// out.collect("当前的水位线是:" + ctx.timerService().currentWatermark());
ctx.timerService().registerEventTimeTimer(value.f1 + 5000L);
out.collect("注册了一个时间戳是:" + new Timestamp(value.f1 + 5000L) + " 的定时器");
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
out.collect("定时器触发了!");
}
})
.print();
env.execute();
}
}
nc -lk 9999 开启socket服务,监听9999端口
命令行输入:a 1665367810000 //1665367810000对应的时间为2022-10-10 10:10:10
控制台输出:注册了一个时间戳是:2022-10-10 10:10:15.0 的定时器 //2022-10-10 10:10:15转换为时间戳为1665367815000
解释一下控制台输出结果
当前水位线的值:2022-10-10 10:10:10 - 5s -1毫秒 = 1665367810000 - 5000 -1 = 1665367804999。当水位线的值大于1665367815000定时任务触发。
命令行输入:1665367821000 //命令行输入2022-10-10 10:10:21对应的时间戳1665367821000将会触发定时任务
控制台输出:定时器触发了!
命名行打印输入
[root@master ~]# nc -lk 9999
a 1665367810000
a 1665367821000
idea打印输入
注册了一个时间戳是:2022-10-10 10:10:15.0 的定时器
注册了一个时间戳是:2022-10-10 10:10:26.0 的定时器
定时器触发了!
案例2,水位线当前时间戳大于窗口结束时间触发窗口闭
案例day3.Example4
public class ExampleTest4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.socketTextStream("192.168.117.211", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(" ");
return Tuple2.of(arr[0], Long.parseLong(arr[1]));
}
})
.assignTimestampsAndWatermarks(
// 最大延迟时间设置为5秒
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1; // 告诉flink事件时间是哪一个字段
}
})
)
.keyBy(r -> r.f0)
// 5秒的事件时间滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
// System.out.println("当前窗口的结束值:" + context.currentWatermark());
// System.out.println("当前水位线的值:" + context.currentWatermark());
long count = elements.spliterator().getExactSizeIfKnown();
out.collect("用户" + key + " 在窗口" +
"" + new Timestamp(windowStart) + "~" + new Timestamp(windowEnd) + "" +
"中的pv次数是:" + count);
}
})
.print();
env.execute();
}
}
命令行输入:a 1665367810000 //flink将开启一个2022-10-10 10:10:10.0~2022-10-10 10:10:15的窗口,当水位线当前值(当前值指上面的当前时间)大于窗口结束时间对应的时间戳会触发窗口闭合。
命令行输入:a 1665367821000 //此时水位线当前值为:1665367821000 - 5000 -1 = 1665367815999,1665367815999转换为时间:2022-10-10 10:10:15,2022-10-10 10:10:15等于窗口结束时间,所以触发窗口闭合。
控制输出:用户a 在窗口2022-10-10 10:10:10.0~2022-10-10 10:10:15.0中的pv次数是:1
命令行
[root@master ~]# nc -lk 9999
a 1665367810000
a 1665367821000
idea
当前窗口的结束值:1665367815999
当前水位线的值:1665367815999
用户a 在窗口2022-10-10 10:10:10.0~2022-10-10 10:10:15.0中的pv次数是:1
如果根据"处理时间"来进行统计分析,窗口要闭合进行统计,肯定有一个参考的时间,只是这个时间是cpu帮忙产生的,窗口的闭合根据cpu产生的时间进行闭合,但逻辑时钟的某瞬间的值是程序计算出来的,这也是为什么把水位线称为逻辑时钟。
6、迟到数据的处理
6.1、什么叫迟到数据
事件时间小于水位线当前时间戳,比如当前数据流的数据xxx携带的事件时间是2022:20:50,逻辑时钟的此时的时间为2022:20:51,那么flink认为xxx就是一条迟到数据。
案例描述:手动发送水位线,手动发送携带事件时间的元素。
public class ExampleTest5 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<String> result = env
.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
// 发送数据携带事件时间的数据hello world
ctx.collectWithTimestamp("hello world", 1000L);
// 发送水位线
ctx.emitWatermark(new Watermark(999L));
// 发送数据携带事件时间的数据 hello flink
ctx.collectWithTimestamp("hello flink", 2000L);
// 发送水位线
ctx.emitWatermark(new Watermark(1999L));
// 发送数据携带事件时间的数据hello late
ctx.collectWithTimestamp("hello late", 1000L);
}
@Override
public void cancel() {
}
})
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
//System.out.println("当前水位线:" + ctx.timerService().currentWatermark());
//判断事件时间是否小于水位线
if (ctx.timestamp() < ctx.timerService().currentWatermark()) {
System.out.println("迟到元素:" + value);
} else {
System.out.println("正常元素:" + value);
}
}
});
env.execute();
}
}
控制台输出:
正常元素:hello world
正常元素:hello flink
迟到元素:hello late
6.2、迟到元素的处理
理解了什么叫迟到元素,至于怎么处理,flink提供了几种方案,如
案例:迟到数据发送到"侧输出流"中
public class ExampleTest {
// 定义侧输出流
private static OutputTag<String> lateElement = new OutputTag<String>("late-element") {
};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<String> result = env
.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
// 发送数据携带事件时间的数据hello world
ctx.collectWithTimestamp("hello world", 1000L);
// 发送水位线
ctx.emitWatermark(new Watermark(999L));
// 发送数据携带事件时间的数据 hello flink
ctx.collectWithTimestamp("hello flink", 2000L);
// 发送水位线
ctx.emitWatermark(new Watermark(1999L));
// 发送数据携带事件时间的数据hello late
ctx.collectWithTimestamp("hello late", 1000L);
}
@Override
public void cancel() {
}
})
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
//判断事件时间是否小于水位线
if (ctx.timestamp() < ctx.timerService().currentWatermark()) {
ctx.output(lateElement, "迟到元素发送到侧输出流:" + value);
} else {
out.collect("正常到达的元素:" + value);
}
}
});
result.print("主流:");
result.getSideOutput(lateElement).print("侧输出流:");
env.execute();
}
}
idea控制台输出:
主流:> 正常到达的元素:hello world
主流:> 正常到达的元素:hello flink
侧输出流:> 迟到元素发送到侧输出流:hello late
思考:窗口,迟到元素,水位线之间有什么关联?
7、总结
水位线类似生活中的时钟,通过时钟我们知道当前时间处于几点几分秒,这个"当前时间"在flink里面对应一个时间戳,通过时间戳来触发窗口的闭合,触发定时任务的执行。也类似一个参照物的角色。
边栏推荐
- CMD view the console output of hearts, diamonds, spades and clubs to solve the garbled code
- golang并发编程之原子操作详解
- [nvme2.0b 10] controller shutdown and NVM subsystem shutdown
- Project management software development project management
- 从根儿上理解虚拟内存
- Factory mode
- [pwn basics]pwntools learning
- FastDFS-6.0.6
- Why is setinterval so easy to get stuck in the high and low level
- [nvme2.0b 9] controller initialization process
猜你喜欢
随机推荐
基于Pytorch的图像分类总结:Swin Transformer
Redis6.0 new features (Part 2)
【爬虫笔记2】鼠标事件与截图方法、常用攻击方法
Primary key in efcore
AtCoder Beginner Contest 252(dijkstra,逆向思维)
【leetcode周赛总结】LeetCode第298场周赛总结(6.19)
[nvme2.0b 6] nvme queue model
[nvme2.0b 9] controller initialization process
std::make_shared特点
【NVMe2.0b 10】Controller Shutdown 与 NVM Subsystem Shutdown
[microservices | Nacos] quickly realize the configuration center function of Nacos, and complete configuration update and version iteration
Atcoder beginer contest 252 (Dijkstra, reverse thinking)
Fastdfs5.0.11 installation
Policy mode
Lectures explanation for unsupervised graph level representation learning (usib)
策略模式
深度学习期末复习
EU5, eu7, EX3, Ex5 install third-party apps
Will it take three months or half a year to buy financial products in 2022?
uv_ loop_ Init() process
![[nvme2.0b 5] sous - système nvm](/img/4f/e60e62a04e617b2e7858494917f390.png)








