11广播流

本文最后更新于 2022-08-28 11:30:56

Flink广播流

存在两个流,ActionStream和RuleStream,将RuleStream流中数据下发到ActionStream流中,使得在ActionStream流中每一个Task都能获取到RuleStream流中所有数据。这种行为称为广播,RuleStream流称为广播流,ActionStream称为非广播流,流入到ActionStream流中的rule数据称之为广播数据,放入到Flink的状态中就称之为广播状态。

定义一个广播流

将一个正常非广播流转化为广播流时需要指定它的广播状态描述,并且只能是 MapStateDescriptor类型,在后续的处理中可通过该描述获取到广播状态。

1
2
3
4
5
// 广播状态描述
val broadcastStateDesc: MapStateDescriptor[String, String] =
new MapStateDescriptor[String, String]("broadcast-desc", classOf[String], classOf[String])
// 将普通的非广播流转为广播流
val ruleStream: BroadcastStream[String] = normalStream.broadcast(broadcastStateDesc)

连接非广播流和广播流

通过connect算子来将两条流连接在一起,此时广播流ruleStream就会被广播到非广播流actionStream中,得到的是一个BroadcastConnectedStream的流。BroadcastConnectedStream流本质上包含了广播流ruleStream和非广播流actionStream。

1
val connectedStream: BroadcastConnectedStream[(String, Int), String] = actionStream.connect(ruleStream)

process

此时process算子中的参数类型会根据非广播流actionStream的类型分为两种。如果actionStream有经过keyBy算子操作后转为KeyedStream类型那么process()中为KeyedBroadcastProcessFunction否则为BroadcastProcessFunction。在使用上都有两个方法:processElement处理非connected流数据并且只可读取广播状态,processBroadcastElement处理connectedStream流数据并且可读写广播状态。因为flink里面没有跨任务通信的机制,在一个任务实例中的修改不能在并行任务间传递。 得保证BroadcastState在算子的并行实例是相同的,所以不能让单个任务去修改状态,只能让广播方修改。

1
connectedStream.process(...)

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
  def main(args: Array[String]): Unit = {
// 获取执行流处理引擎
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// 行为流 -- 非广播流
val actionStream: KeyedStream[(String, Int), String] = env
.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), initProps()))
.map((_, 1))
.keyBy(new KeySelector[(String, Int), String] {
override def getKey(in: (String, Int)): String = in._1
})

// 广播状态描述
val broadcastStateDesc: MapStateDescriptor[String, String] =
new MapStateDescriptor[String, String]("broadcast-desc", classOf[String], classOf[String])

// 规则流 -- 广播流
val ruleStream: BroadcastStream[String] = env
.addSource(new FlinkKafkaConsumer010[String]("test_1", new SimpleStringSchema(), initProps()))
.broadcast(broadcastStateDesc) // 将基础流转为广播流的时候需要指定广播流的描述信息

// 使用connect算子将 主体基本流 和 广播流连接起来
val connectedStream: BroadcastConnectedStream[(String, Int), String] = actionStream.connect(ruleStream)

// 处理连接流数据
connectedStream
.process(new MyKeyedBroadcastProcessFunction(broadcastStateDesc))
.print()

env.execute("broadcast_stream")
}
//////////////////////////////////////////////////////////////////
class MyKeyedBroadcastProcessFunction(broadcastStateDesc: MapStateDescriptor[String, String])
extends KeyedBroadcastProcessFunction[String, (String, Int), String, String] {
// 每当 主体基本流新增一条记录,该方法就会执行一次
override def processElement(in1: (String, Int),
readOnlyCtx: KeyedBroadcastProcessFunction[String, (String, Int), String, String]#ReadOnlyContext,
collector: Collector[String]): Unit = {
// 从 广播状态中根据key获取数据(规则数据)
val ruleString: String = readOnlyCtx.getBroadcastState(broadcastStateDesc).get("rule")
collector.collect(in1 + ruleString)
}

// 每当 广播流新增一条记录,该方法就会执行一次
override def processBroadcastElement(in2: String,
ctx: KeyedBroadcastProcessFunction[String, (String, Int), String, String]#Context,
collector: Collector[String]): Unit = {
// 获取广播状态并更新状态数据(规则数据)
ctx.getBroadcastState(broadcastStateDesc).put("rule", in2)
}
}

Demo2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class State_BroadcastState {
public static void main(String[] args) {
//控制流发送到普通流后,普通流会收到一个广播状态
//1.创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStreamSource<String> inputDS = env.socketTextStream("localhost", 8888);
DataStreamSource<String> controlDS = env.socketTextStream("localhost", 9999);
//TODO 1.把其中一条流(控制流) 广播出去
//定义一个Map状态描述器,控制流会把这个状态广播出去
MapStateDescriptor<String, String> broadcast = new MapStateDescriptor<>("boradcast-state", Types.STRING, Types.STRING);
BroadcastStream<String> contrlBS = controlDS.broadcast(broadcast);

//TODO 2.把另一条流和广播流关联起来
BroadcastConnectedStream<String, String> inputBCS = inputDS.connect(contrlBS);

//TODO 3.调用Process

inputBCS.process(
new BroadcastProcessFunction<String, String, String>() {
/*
获取广播状态,获取数据进行处理
*/
@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
//TODO 5.通过上下文获取广播状态,取出里面的值
ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(broadcast);
String aSwitch = broadcastState.get("switch");
if("1".equals(aSwitch)){
out.collect("切换到1的逻辑");
}else if("2".equals(aSwitch)){
out.collect("切换到2的逻辑");
}


}

/**
* 处理广播流的数据:这里主要定义,什么数据往广播状态存
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
// TODO 4.通过上下文获取广播状态,并往广播状态里存数据
BroadcastState<String, String> broadcastState = ctx.getBroadcastState(broadcast);
broadcastState.put("switch",value);
}
}
).print();
//提交任务
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}


11广播流
https://jiajun.xyz/2021/08/11/bigdata/11Flink/01flink_study1/11广播流/
作者
Lambda
发布于
2021年8月11日
更新于
2022年8月28日
许可协议