本文最后更新于 2022-08-28 11:30:56
时间语义&WaterMark 时间语义 Processing time
执行相应操作的机器的系统时间。
Processing time是最简单的时间概念,不需要流和机器之间的协调。它提供了最好的性能和最低的延迟。
IngestionTime
进入flink的时间
Event time
Event time是每个事件在其产生设备上发生的时间。这个时间通常在记录进入Flink之前嵌入到记录中,并且可以从每个记录中提取事件时间戳 。在事件时间中,时间的进展取决于数据,而不是任何挂钟 。事件时间程序必须指定如何生成事件时间水印(Event time Watermarks),这是一种以事件时间表示进展程度的机制 。
解决数据在处理时是一种乱序状态的方式
WaterMark 水印就是一个时间戳
水印对于乱序流至关重要。一般来说,水印是一种声明,表示到流中的那个点,在某个时间戳之前的所有事件都应该已经到达 。相当于表示现在的时间为,当前事件时间-最大乱序程度
在并行流中的WaterMark
水印一般是在sourceFunction,或者在sourceFunction的紧挨后面,每一个子任务的都会独立的生成自己的水印
当进行shuffle时,水印以最小值为准
设置watermark 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 object WindowTest { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism(1 ) env.getConfig.setAutoWatermarkInterval(100 L) val inputStream = env.socketTextStream("hadoop102" , 7777 ) val dataStream: DataStream [SensorReading ] = inputStream .map( data => { val dataArray = data.split("," ) SensorReading ( dataArray(0 ), dataArray(1 ).toLong, dataArray(2 ).toDouble ) } ) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor [SensorReading ](Time .seconds(1 )) { override def extractTimestamp (element: SensorReading ): Long = element.timestamp * 1000 L } ).setParallelism(2 ) val resultStream = dataStream .keyBy("id" ) .timeWindow(Time .seconds(15 ), Time .seconds(5 )) .allowedLateness(Time .minutes(1 )) .sideOutputLateData(new OutputTag [SensorReading ]("late" )) .apply( new MyWindowFun () ) dataStream.print("data2" ) resultStream.getSideOutput(new OutputTag [SensorReading ]("late" )) resultStream.print("result2" ) env.execute("window test" ) } }
1 2 3 4 5 6 7 def assignTimestampsAndWatermarks (assigner: AssignerWithPeriodicWatermarks [T ]): DataStream [T ] def assignTimestampsAndWatermarks (assigner: AssignerWithPunctuatedWatermarks [T ]): DataStream [T ]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class MyWMAssigner (lateness: Long ) extends AssignerWithPeriodicWatermarks [SensorReading ] { var maxTs: Long = Long .MinValue + lateness override def getCurrentWatermark : Watermark = new Watermark (maxTs - lateness) override def extractTimestamp (element: SensorReading , previousElementTimestamp: Long ): Long = { maxTs = maxTs.max(element.timestamp * 1000 L) element.timestamp * 1000 L } }class MyWMAssigner2 extends AssignerWithPunctuatedWatermarks [SensorReading ] { val lateness: Long = 1000 L override def checkAndGetNextWatermark (lastElement: SensorReading , extractedTimestamp: Long ): Watermark = { if ( lastElement.id == "sensor_1" ){ new Watermark (extractedTimestamp - lateness) } else null } override def extractTimestamp (element: SensorReading , previousElementTimestamp: Long ): Long = element.timestamp * 1000 L }
在10.11中新增了WatermarkStrategy 1 2 3 4 5 6 7 8 9 10 11 12 13 stream.flatMap(_.split(" " )) .map((_, 1 )) .assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness[(String , Int )](Duration .ofSeconds(20 )) .withIdleness(Duration .ofMinutes(1 ))) .keyBy(_._1) .window(TumblingEventTimeWindows .of(Time .hours(1 ))) .sum(2 ) .print()def assignTimestampsAndWatermarks (watermarkStrategy: WatermarkStrategy [T ]): DataStream [T ]
WatermarkStrategy 这里面提供了很多静态的方法和带有缺省实现的方法,只有一个方法是非default和没有缺省实现的,就是下面的这个方法。
1 2 3 4 5 WatermarkGenerator<T> createWatermarkGenerator (WatermarkGeneratorSupplier.Context context) ; static <T> WatermarkStrategy<T> forGenerator (WatermarkGeneratorSupplier<T> generatorSupplier) { return generatorSupplier::createWatermarkGenerator; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Public public interface WatermarkGenerator <T > { void onEvent(T event, long eventTimestamp, WatermarkOutput output); void onPeriodicEmit(WatermarkOutput output); }
内置生成策略 固定延迟
1 2 DataStream dataStream = ...... ; dataStream.assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness(Duration .ofSeconds(3 )));
单调递增生成水印
1 2 3 DataStream dataStream = ...... ; dataStream.assignTimestampsAndWatermarks(WatermarkStrategy .forMonotonousTimestamps());
自定义时间抽取
1 2 3 4 5 6 DataStream dataStream = ...... ; dataStream.assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2 <String ,Long >>forBoundedOutOfOrderness(Duration .ofSeconds(5 )) .withTimestampAssigner((event, timestamp)->event.f1));
处理空闲数据源
在某些情况下,由于数据产生的比较少,导致一段时间内没有数据产生,进而就没有水印的生成,导致下游依赖水印的一些操作就会出现问题,比如某一个算子的上游有多个算子,这种情况下,水印是取其上游两个算子的较小值,如果上游某一个算子因为缺少数据迟迟没有生成水印,就会出现eventtime倾斜问题,导致下游没法触发计算。
所以filnk通过WatermarkStrategy.withIdleness()方法允许用户在配置的时间内(即超时时间内)没有记录到达时将一个流标记为空闲。这样就意味着下游的数据不需要等待水印的到来。
当下次有水印生成并发射到下游的时候,这个数据流重新变成活跃状态。
1 2 3 WatermarkStrategy .<Tuple2 <Long , String >>forBoundedOutOfOrderness(Duration .ofSeconds(20 )) .withIdleness(Duration .ofMinutes(1 ));