08Function&Accumulators&Counters

本文最后更新于 2022-08-28 11:30:56

Function&Accumulator&Counter

Function

MapFunction,FilterFunction,FlatMapFunction…….

stream.map(new MyMapFuntion)

class MyMapFuntion extends MapFunction[String,(String,Int)]{
  override def map(value: String): (String, Int) = ???
}

RichMapFunction,RichFilterFunction……..

data.map(new MyRic hMapFunction());


class MyRichMapFunction extends RichMapFunction[String, Int] {
  //map
  def map(in: String):Int = { in.toInt }

  //开始时调用
  override def open(parameters: Configuration): Unit = super.open(parameters)

  override def setRuntimeContext(t: RuntimeContext): Unit = super.setRuntimeContext(t)
  //获取context
  override def getRuntimeContext: RuntimeContext = super.getRuntimeContext

  override def getIterationRuntimeContext: IterationRuntimeContext = super.getIterationRuntimeContext
  
  //结束时调用
  override def close(): Unit = super.close()
};

Accumulator&Counter

总结:只需要RichFunction即可,不需要keyBy也可以。在运行时每个算子都会有一个副本,在自己算子上累加,并行度增加,算子增加,副本也就增加(和一个function里面的成员变量一样),结束后可以getAccumulatorResult获取最终累加结果

累加器是具有加法运算最终累加结果的一种简单结构,可在作业结束后使用。

最简单的累加器就是计数器: 你可以使用 Accumulator.add(V value) 方法将其递增。在作业结束时,Flink 会汇总(合并)所有部分的结果并将其发送给客户端。 在调试过程中或在你想快速了解有关数据更多信息时,累加器作用很大。

IntCounter,LongCounter,DoubleCounter

首先,在需要使用累加器的用户自定义的转换 function 中创建一个累加器对象(此处是计数器)。

private IntCounter numLines = new IntCounter();

其次,你必须在 rich function 的 open() 方法中注册累加器对象。也可以在此处定义名称

getRuntimeContext().addAccumulator("num-lines", this.numLines);

现在你可以在操作 function 中的任何位置(包括 open()close() 方法中)使用累加器。

this.numLines.add(1);

最终整体结果会存储在由执行环境的 execute() 方法返回的 JobExecutionResult 对象中(当前只有等待作业完成后执行才起作用)。

myJobExecutionResult.getAccumulatorResult("num-lines")

单个作业的所有累加器共享一个命名空间。因此你可以在不同的操作 function 里面使用同一个累加器。Flink 会在内部将所有具有相同名称的累加器合并起来。

当前累加器的结果只有在整个作业结束后才可用

ProcessFunction&定时器

DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。

不同的Stream下调用process()方法需要传入对应的Process Function

常用Process Function:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessJoinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

自定义KeyedProcessFunction

实现连续10秒温度上升,就输出报警

class TempIncreWarning(interval: Long) extends KeyedProcessFunction[Tuple, SensorReading, String]{
  // 由于需要跟之前的温度值做对比,所以将上一个温度保存成状态
  lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
  // 为了方便删除定时器,还需要保存定时器的时间戳
  lazy val curTimerTsState: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("cur-timer-ts", classOf[Long]) )

  override def processElement(value: SensorReading, ctx: KeyedProcessFunction[Tuple, SensorReading, String]#Context, out: Collector[String]): Unit = {
    // 首先取出状态
    val lastTemp = lastTempState.value()
    val curTimerTs = curTimerTsState.value()

    // 将上次温度值的状态更新为当前数据的温度值
    lastTempState.update(value.temperature)

    // 判断当前温度值,如果比之前温度高,并且没有定时器的话,注册10秒后的定时器
    if( value.temperature > lastTemp && curTimerTs == 0 ){
      val ts = ctx.timerService().currentProcessingTime() + interval
      ctx.timerService().registerProcessingTimeTimer(ts)
      curTimerTsState.update(ts)
    }
    // 如果温度下降,删除定时器
    else if( value.temperature < lastTemp ){
      ctx.timerService().deleteProcessingTimeTimer(curTimerTs)
      // 清空状态
      curTimerTsState.clear()
    }
  }

  // 定时器触发,说明10秒内没有来下降的温度值,报警
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
    val key = ctx.getCurrentKey.asInstanceOf[Tuple1[String]]._1
    out.collect( "温度值连续" + interval/1000 + "秒上升" )
    curTimerTsState.clear()
  }
}
// 输入数据的样例类
case class SensorReading( id: String, timestamp: Long, temperature: Double )

image-20210811000554790

image-20210811000839905


08Function&Accumulators&Counters
https://jiajun.xyz/2021/08/11/bigdata/11Flink/01flink_study1/08Function&Accumulators&Counters/
作者
Lambda
发布于
2021年8月11日
更新于
2022年8月28日
许可协议