04转换算子
转换算子
定义键
tuple
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple3> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));
//按照第一个聚合
UnsortedGrouping<Tuple3> g1 = dataSource.groupBy(0);
System.out.println("a=======");
g1.max(2).print();
//按照第一个和第二个聚合
UnsortedGrouping<Tuple3> g2 = dataSource.groupBy(0, 1);
System.out.println("b=======");
g2.max(2).print();
//按照第一个聚合(字符串)
UnsortedGrouping<Tuple3> g3 = dataSource.groupBy("f1");
System.out.println("c=======");
g3.max(2).print();
// a=======
// (3,b,f)
// (2,b,e)
// b=======
// (2,b,e)
// (3,b,f)
// (2,a,d)
// c=======
// (3,b,f)
// (2,b,e)pojo
- pojo类必须是public修饰
- 必须包含空构造器
- 字段必须是public或有 public getter
- 字段类型必须是flink支持的
- 如果pojo和tuple嵌套 支持
f1.word等格式
public class WC {
public String word;
public Integer count;
public WC() {
}
public WC(String word, Integer count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WC{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<WC> dataSource = executionEnvironment.fromCollection(Arrays.asList(new WC("a",1), new WC("b",2), new WC("a",3)));
UnsortedGrouping<WC> word = dataSource.groupBy("word");
word.first(1).print();keySelector
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<WC> dataSource = executionEnvironment.fromCollection(Arrays.asList(new WC("a",1), new WC("b",2), new WC("a",3)));
UnsortedGrouping<WC> word = dataSource.groupBy((KeySelector<WC, String>) value -> value.word);
word.first(1).print();通用转换算子
Map
DataSource<Integer> dataSource = executionEnvironment.fromElements(1, 2, 3, 4);
dataSource.map(x->x+8).print();FlatMap
DataSource<String> dataSource = executionEnvironment.fromElements("a a aa b b c", "ccc b a a aa");
dataSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String s : value.split(" ")) {
out.collect(s);
}
}
});
DataSource<String> dataSource = executionEnvironment.fromElements("a a aa b b c", "ccc b a a aa");
dataSource.flatMap((FlatMapFunction<String, String>) (value, out) -> {
for (String s : value.split(" ")) {
out.collect(s);
}
}).returns(Types.STRING).print();
Filter
DataSource<String> dataSource = executionEnvironment.fromElements("a a aa b b c", "ccc b a a aa");
dataSource.filter(x->x.length()>1).print();Project
DataSource<Tuple3> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));
//映射第三个字段和第二个字段
dataSource.project(2,1).print();DataSet专用转换算子
reduce
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));
dataSource.reduce(new ReduceFunction<Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer,String,String> reduce(Tuple3<Integer,String,String> value1, Tuple3<Integer,String,String> value2) throws Exception {
return new Tuple3(value1.f0+value2.f0,value1.f1,null);
}
}).print();
dataSource.reduce((ReduceFunction<Tuple3<Integer, String, String>>) (value1, value2) -> new Tuple3(value1.f0+value2.f0,value1.f1,null)).print();aggregate
提供的内置功能:
- sum:求和
- min:最小值
- max:最大值
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));
//应用多次聚合
dataSource.groupBy(1)//根据第二个字段分组
.aggregate(Aggregations.SUM,0)//第一个字段求和
.and(Aggregations.MIN,2)//第三个字段取最小值
.print();
//应用多层聚合 先求最大值,再求最大值的和
dataSource.groupBy(1).aggregate(Aggregations.MAX,0).aggregate(Aggregations.SUM,0).print();distinct
dataSource.distinct();
dataSource.distinct(0,1);
dataSource.distinct("f0","f1");
dataSource.distinct(new KeySelector<Tuple3<Integer, String, String>, String>() {
@Override
public String getKey(Tuple3<Integer, String, String> value) throws Exception {
return value.f1;
}
});partition
//partitionByHash
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));
dataSource.partitionByHash(1).mapPartition(new MapPartitionFunction<Tuple3<Integer, String, String>, Long>() {
@Override
public void mapPartition(Iterable<Tuple3<Integer, String, String>> values, Collector<Long> out) throws Exception {
long count =0;
for (Tuple3<Integer, String, String> value : values) {
count++;
}
out.collect(count);
}
}).print();
//partitionByRange
dataSource.partitionByRange(1).mapPartition(new MapPartitionFunction<Tuple3<Integer, String, String>, Long>() {
@Override
public void mapPartition(Iterable<Tuple3<Integer, String, String>> values, Collector<Long> out) throws Exception {
long count =0;
for (Tuple3<Integer, String, String> value : values) {
count++;
}
out.collect(count);
}
}).print();
//sortPartition
//对每个分区数据排序
dataSource.partitionByRange(1).sortPartition(1, Order.ASCENDING).mapPartition(new MapPartitionFunction<Tuple3<Integer, String, String>, Long>() {
@Override
public void mapPartition(Iterable<Tuple3<Integer, String, String>> values, Collector<Long> out) throws Exception {
long count =0;
for (Tuple3<Integer, String, String> value : values) {
count++;
}
out.collect(count);
}
}).print();minBy/Maxby
返回元祖中指定字段最大或最小的元祖
如果有多个相同的最大值或最小值,返回任意一个
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));
dataSource.maxBy(1).print();
dataSource.groupBy(1).minBy(0);First-n
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));
dataSource.first(2).print();
//每个分组中第一个
dataSource.groupBy(0).sortGroup(2, Order.ASCENDING).first(1).print();DataStream专用转换算子
union
将两个流或多个流合并,从而创建一个包含所有流中元素的新流
必须要求类型相同
datastream.union(otherStream1,otherStream2,...)connect
连接两个数据流,但是两个数据流只是被放在同一个流(ConnectedStream)中,依然保持各自的数据形式,两个流相互独立
DataStream<Integer> oneStream =...
DataStream<String> twoStream=...
ConnectedStream<Integer,String> connectedStream = oneStream.connect(twoStream)coMap/coFlatMap
将connectedStream转换为DataStream
split/select
split:将一个流拆分成多个流
select:从split流中获取一个或多个流
keyBy
将DataStream转换为KeyedStream
逻辑分区
dataStream.keyBy(0);
dataStream.keyBy("name")aggregate
//键控流中的aggregate
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.max(0);
keyedStream.maxBy(0);
//窗口中的aggregate
windowStream.sum(0);
windowStream.maxBy(0);reduce
对键控流的reduce
keyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<>(value1.f0,value1.f1+value2.f1);
}
});window
将keyedStream转换为windowStream
keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(2)));windowAll
将dataStream转换为AllWindowStream
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(2)));window apply
将windowStream或AllwindowStream转换为DataStream
window reduce
keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(2))).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<>(value1.f0,value1.f1+value2.f1);
}
});物理分区算子
partitionCustom
dataStream.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
return key.length() % numPartitions;
}
}, (KeySelector<String, String>) value -> value);shuffle
根据均匀分布对元素随机划分
rebanlance
以循环的方式分发数据
rescale
以round-robin方式将元素分区
仅需要本地数据传输,而不是通过网络传输数据
上游算子所发送的元素被分区到下游算子的哪些子集,取决于上游算子和下游算子的并行度,比如上游算子的并发度2,下游算子并发度为6,则第一个上游算子将数据发送到其中3个下游算子,另外一个上游算子将数据发送到另外3个下游算子
broadcast
将元素广播到每个分区
算子链接
在默认情况下,算子会自动链接(多个算子共享一个slot)
只能在dataStream转换后使用,因为引用的是先前的转换
禁用链接
StreamExecutionEnvironment.disableOperatorChaining();开始链接
两个map会被链接,并且filter不会被链接
stream.filter(...)
.map(....)
.startNewChain()
.map(....)关闭链接
stream.map(...)
.disableChaining();设置插槽共享组
将具有相同插槽共享组的算子放到同一插槽中
stram.filter(...)
.slotSharingGroup("name")04转换算子
https://jiajun.xyz/2022/08/28/bigdata/11Flink/02flink_study2/04转换算子/