11广播流

本文最后更新于 2022-08-28 11:30:56

Flink广播流

存在两个流,ActionStream和RuleStream,将RuleStream流中数据下发到ActionStream流中,使得在ActionStream流中每一个Task都能获取到RuleStream流中所有数据。这种行为称为广播,RuleStream流称为广播流,ActionStream称为非广播流,流入到ActionStream流中的rule数据称之为广播数据,放入到Flink的状态中就称之为广播状态。

定义一个广播流

将一个正常非广播流转化为广播流时需要指定它的广播状态描述,并且只能是 MapStateDescriptor类型,在后续的处理中可通过该描述获取到广播状态。

// 广播状态描述
val broadcastStateDesc: MapStateDescriptor[String, String] =
    new MapStateDescriptor[String, String]("broadcast-desc", classOf[String], classOf[String])
// 将普通的非广播流转为广播流
val ruleStream: BroadcastStream[String] = normalStream.broadcast(broadcastStateDesc)

连接非广播流和广播流

通过connect算子来将两条流连接在一起,此时广播流ruleStream就会被广播到非广播流actionStream中,得到的是一个BroadcastConnectedStream的流。BroadcastConnectedStream流本质上包含了广播流ruleStream和非广播流actionStream。

val connectedStream: BroadcastConnectedStream[(String, Int), String] = actionStream.connect(ruleStream)

process

此时process算子中的参数类型会根据非广播流actionStream的类型分为两种。如果actionStream有经过keyBy算子操作后转为KeyedStream类型那么process()中为KeyedBroadcastProcessFunction否则为BroadcastProcessFunction。在使用上都有两个方法:processElement处理非connected流数据并且只可读取广播状态,processBroadcastElement处理connectedStream流数据并且可读写广播状态。因为flink里面没有跨任务通信的机制,在一个任务实例中的修改不能在并行任务间传递。 得保证BroadcastState在算子的并行实例是相同的,所以不能让单个任务去修改状态,只能让广播方修改。

connectedStream.process(...)

Demo

  def main(args: Array[String]): Unit = {
    // 获取执行流处理引擎
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 行为流 -- 非广播流
    val actionStream: KeyedStream[(String, Int), String] = env
      .addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), initProps()))
      .map((_, 1))
      .keyBy(new KeySelector[(String, Int), String] {
        override def getKey(in: (String, Int)): String = in._1
      })

    // 广播状态描述
    val broadcastStateDesc: MapStateDescriptor[String, String] =
      new MapStateDescriptor[String, String]("broadcast-desc", classOf[String], classOf[String])

    // 规则流 -- 广播流
    val ruleStream: BroadcastStream[String] = env
      .addSource(new FlinkKafkaConsumer010[String]("test_1", new SimpleStringSchema(), initProps()))
      .broadcast(broadcastStateDesc) // 将基础流转为广播流的时候需要指定广播流的描述信息

    // 使用connect算子将 主体基本流 和 广播流连接起来
    val connectedStream: BroadcastConnectedStream[(String, Int), String] = actionStream.connect(ruleStream)

    // 处理连接流数据
    connectedStream
      .process(new MyKeyedBroadcastProcessFunction(broadcastStateDesc))
      .print()

    env.execute("broadcast_stream")
  }
//////////////////////////////////////////////////////////////////
class MyKeyedBroadcastProcessFunction(broadcastStateDesc: MapStateDescriptor[String, String])
  extends KeyedBroadcastProcessFunction[String, (String, Int), String, String] {
  // 每当 主体基本流新增一条记录,该方法就会执行一次
  override def processElement(in1: (String, Int),
                              readOnlyCtx: KeyedBroadcastProcessFunction[String, (String, Int), String, String]#ReadOnlyContext,
                              collector: Collector[String]): Unit = {
    // 从 广播状态中根据key获取数据(规则数据)
    val ruleString: String = readOnlyCtx.getBroadcastState(broadcastStateDesc).get("rule")
    collector.collect(in1 + ruleString)
  }

  // 每当 广播流新增一条记录,该方法就会执行一次
  override def processBroadcastElement(in2: String,
                                       ctx: KeyedBroadcastProcessFunction[String, (String, Int), String, String]#Context,
                                       collector: Collector[String]): Unit = {
    // 获取广播状态并更新状态数据(规则数据)
    ctx.getBroadcastState(broadcastStateDesc).put("rule", in2)
  }
}

Demo2

public class State_BroadcastState {
    public static void main(String[] args) {
        //控制流发送到普通流后,普通流会收到一个广播状态
        //1.创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.socketTextStream("localhost", 8888);
        DataStreamSource<String> controlDS = env.socketTextStream("localhost", 9999);
        //TODO 1.把其中一条流(控制流) 广播出去
        //定义一个Map状态描述器,控制流会把这个状态广播出去
        MapStateDescriptor<String, String> broadcast = new MapStateDescriptor<>("boradcast-state", Types.STRING, Types.STRING);
        BroadcastStream<String> contrlBS = controlDS.broadcast(broadcast);

        //TODO 2.把另一条流和广播流关联起来
        BroadcastConnectedStream<String, String> inputBCS = inputDS.connect(contrlBS);
        
        //TODO 3.调用Process

        inputBCS.process(
                new BroadcastProcessFunction<String, String, String>() {
                    /*
                        获取广播状态,获取数据进行处理
                     */
                    @Override
                    public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                        //TODO 5.通过上下文获取广播状态,取出里面的值
                        ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(broadcast);
                        String aSwitch = broadcastState.get("switch");
                        if("1".equals(aSwitch)){
                            out.collect("切换到1的逻辑");
                        }else if("2".equals(aSwitch)){
                            out.collect("切换到2的逻辑");
                        }


                    }

                    /**
                     * 处理广播流的数据:这里主要定义,什么数据往广播状态存
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                        // TODO 4.通过上下文获取广播状态,并往广播状态里存数据
                        BroadcastState<String, String> broadcastState = ctx.getBroadcastState(broadcast);
                        broadcastState.put("switch",value);
                    }
                }
        ).print();
        //提交任务
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

11广播流
https://jiajun.xyz/2021/08/11/bigdata/11Flink/01flink_study1/11广播流/
作者
Lambda
发布于
2021年8月11日
更新于
2022年8月28日
许可协议