04转换算子

转换算子

定义键

tuple

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple3> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));
//按照第一个聚合
UnsortedGrouping<Tuple3> g1 = dataSource.groupBy(0);
System.out.println("a=======");
g1.max(2).print();

//按照第一个和第二个聚合
UnsortedGrouping<Tuple3> g2 = dataSource.groupBy(0, 1);
System.out.println("b=======");
g2.max(2).print();

//按照第一个聚合(字符串)
UnsortedGrouping<Tuple3> g3 = dataSource.groupBy("f1");
System.out.println("c=======");
g3.max(2).print();

// a=======
// (3,b,f)
// (2,b,e)
// b=======
// (2,b,e)
// (3,b,f)
// (2,a,d)
// c=======
// (3,b,f)
// (2,b,e)

pojo

  • pojo类必须是public修饰
  • 必须包含空构造器
  • 字段必须是public或有 public getter
  • 字段类型必须是flink支持的
  • 如果pojo和tuple嵌套 支持 f1.word等格式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class WC {
public String word;
public Integer count;
public WC() {
}
public WC(String word, Integer count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WC{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}

ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<WC> dataSource = executionEnvironment.fromCollection(Arrays.asList(new WC("a",1), new WC("b",2), new WC("a",3)));
UnsortedGrouping<WC> word = dataSource.groupBy("word");
word.first(1).print();

keySelector

1
2
3
4
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<WC> dataSource = executionEnvironment.fromCollection(Arrays.asList(new WC("a",1), new WC("b",2), new WC("a",3)));
UnsortedGrouping<WC> word = dataSource.groupBy((KeySelector<WC, String>) value -> value.word);
word.first(1).print();

通用转换算子

Map

1
2
DataSource<Integer> dataSource = executionEnvironment.fromElements(1, 2, 3, 4);
dataSource.map(x->x+8).print();

FlatMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
DataSource<String> dataSource = executionEnvironment.fromElements("a a aa b b c", "ccc b a a aa");
dataSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String s : value.split(" ")) {
out.collect(s);
}
}
});

DataSource<String> dataSource = executionEnvironment.fromElements("a a aa b b c", "ccc b a a aa");
dataSource.flatMap((FlatMapFunction<String, String>) (value, out) -> {
for (String s : value.split(" ")) {
out.collect(s);
}
}).returns(Types.STRING).print();

Filter

1
2
DataSource<String> dataSource = executionEnvironment.fromElements("a a aa b b c", "ccc b a a aa");
dataSource.filter(x->x.length()>1).print();

Project

1
2
3
DataSource<Tuple3> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));
//映射第三个字段和第二个字段
dataSource.project(2,1).print();

DataSet专用转换算子

reduce

1
2
3
4
5
6
7
8
9
10
11
12
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));

dataSource.reduce(new ReduceFunction<Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer,String,String> reduce(Tuple3<Integer,String,String> value1, Tuple3<Integer,String,String> value2) throws Exception {
return new Tuple3(value1.f0+value2.f0,value1.f1,null);
}
}).print();

dataSource.reduce((ReduceFunction<Tuple3<Integer, String, String>>) (value1, value2) -> new Tuple3(value1.f0+value2.f0,value1.f1,null)).print();

aggregate

提供的内置功能:

  • sum:求和
  • min:最小值
  • max:最大值
1
2
3
4
5
6
7
8
9
10
11
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));

//应用多次聚合
dataSource.groupBy(1)//根据第二个字段分组
.aggregate(Aggregations.SUM,0)//第一个字段求和
.and(Aggregations.MIN,2)//第三个字段取最小值
.print();
//应用多层聚合 先求最大值,再求最大值的和
dataSource.groupBy(1).aggregate(Aggregations.MAX,0).aggregate(Aggregations.SUM,0).print();

distinct

1
2
3
4
5
6
7
8
9
dataSource.distinct();
dataSource.distinct(0,1);
dataSource.distinct("f0","f1");
dataSource.distinct(new KeySelector<Tuple3<Integer, String, String>, String>() {
@Override
public String getKey(Tuple3<Integer, String, String> value) throws Exception {
return value.f1;
}
});

partition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//partitionByHash

ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));

dataSource.partitionByHash(1).mapPartition(new MapPartitionFunction<Tuple3<Integer, String, String>, Long>() {
@Override
public void mapPartition(Iterable<Tuple3<Integer, String, String>> values, Collector<Long> out) throws Exception {
long count =0;
for (Tuple3<Integer, String, String> value : values) {
count++;
}
out.collect(count);
}
}).print();

//partitionByRange
dataSource.partitionByRange(1).mapPartition(new MapPartitionFunction<Tuple3<Integer, String, String>, Long>() {
@Override
public void mapPartition(Iterable<Tuple3<Integer, String, String>> values, Collector<Long> out) throws Exception {
long count =0;
for (Tuple3<Integer, String, String> value : values) {
count++;
}
out.collect(count);
}
}).print();

//sortPartition
//对每个分区数据排序
dataSource.partitionByRange(1).sortPartition(1, Order.ASCENDING).mapPartition(new MapPartitionFunction<Tuple3<Integer, String, String>, Long>() {
@Override
public void mapPartition(Iterable<Tuple3<Integer, String, String>> values, Collector<Long> out) throws Exception {
long count =0;
for (Tuple3<Integer, String, String> value : values) {
count++;
}
out.collect(count);
}
}).print();

minBy/Maxby

返回元祖中指定字段最大或最小的元祖

如果有多个相同的最大值或最小值,返回任意一个

1
2
3
4
5
6
7
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));

dataSource.maxBy(1).print();

dataSource.groupBy(1).minBy(0);

First-n

1
2
3
4
5
6
7
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));

dataSource.first(2).print();
//每个分组中第一个
dataSource.groupBy(0).sortGroup(2, Order.ASCENDING).first(1).print();

DataStream专用转换算子

union

将两个流或多个流合并,从而创建一个包含所有流中元素的新流

必须要求类型相同

1
datastream.union(otherStream1,otherStream2,...)

connect

连接两个数据流,但是两个数据流只是被放在同一个流(ConnectedStream)中,依然保持各自的数据形式,两个流相互独立

1
2
3
DataStream<Integer> oneStream =...
DataStream<String> twoStream=...
ConnectedStream<Integer,String> connectedStream = oneStream.connect(twoStream)

coMap/coFlatMap

将connectedStream转换为DataStream

split/select

split:将一个流拆分成多个流

select:从split流中获取一个或多个流

keyBy

将DataStream转换为KeyedStream

逻辑分区

1
2
dataStream.keyBy(0);
dataStream.keyBy("name")

aggregate

1
2
3
4
5
6
7
8
9
10
//键控流中的aggregate
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.max(0);
keyedStream.maxBy(0);


//窗口中的aggregate
windowStream.sum(0);
windowStream.maxBy(0);

reduce

对键控流的reduce

1
2
3
4
5
6
keyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<>(value1.f0,value1.f1+value2.f1);
}
});

window

将keyedStream转换为windowStream

1
keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(2)));

windowAll

将dataStream转换为AllWindowStream

1
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(2)));

window apply

将windowStream或AllwindowStream转换为DataStream

window reduce

1
2
3
4
5
6
keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(2))).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<>(value1.f0,value1.f1+value2.f1);
}
});

物理分区算子

partitionCustom

1
2
3
4
5
6
dataStream.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
return key.length() % numPartitions;
}
}, (KeySelector<String, String>) value -> value);

shuffle

根据均匀分布对元素随机划分

rebanlance

以循环的方式分发数据

rescale

以round-robin方式将元素分区

仅需要本地数据传输,而不是通过网络传输数据

上游算子所发送的元素被分区到下游算子的哪些子集,取决于上游算子和下游算子的并行度,比如上游算子的并发度2,下游算子并发度为6,则第一个上游算子将数据发送到其中3个下游算子,另外一个上游算子将数据发送到另外3个下游算子

broadcast

将元素广播到每个分区

算子链接

在默认情况下,算子会自动链接(多个算子共享一个slot)

只能在dataStream转换后使用,因为引用的是先前的转换

禁用链接

1
StreamExecutionEnvironment.disableOperatorChaining();

开始链接

两个map会被链接,并且filter不会被链接

1
2
3
4
stream.filter(...)
.map(....)
.startNewChain()
.map(....)

关闭链接

1
2
stream.map(...)
.disableChaining();

设置插槽共享组

将具有相同插槽共享组的算子放到同一插槽中

1
2
stram.filter(...)
.slotSharingGroup("name")

04转换算子
https://jiajun.xyz/2022/08/28/bigdata/11Flink/02flink_study2/04转换算子/
作者
Lambda
发布于
2022年8月28日
许可协议