04转换算子

转换算子

定义键

tuple

ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple3> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));
//按照第一个聚合
UnsortedGrouping<Tuple3> g1 = dataSource.groupBy(0);
System.out.println("a=======");
g1.max(2).print();

//按照第一个和第二个聚合
UnsortedGrouping<Tuple3> g2 = dataSource.groupBy(0, 1);
System.out.println("b=======");
g2.max(2).print();

//按照第一个聚合(字符串)
UnsortedGrouping<Tuple3> g3 = dataSource.groupBy("f1");
System.out.println("c=======");
g3.max(2).print();

//        a=======
//        (3,b,f)
//        (2,b,e)
//        b=======
//        (2,b,e)
//        (3,b,f)
//        (2,a,d)
//        c=======
//        (3,b,f)
//        (2,b,e)

pojo

  • pojo类必须是public修饰
  • 必须包含空构造器
  • 字段必须是public或有 public getter
  • 字段类型必须是flink支持的
  • 如果pojo和tuple嵌套 支持 f1.word等格式
public class WC {
        public String word;
        public Integer count;
        public WC() {
        }
        public WC(String word, Integer count) {
            this.word = word;
            this.count = count;
        }
    @Override
    public String toString() {
        return "WC{" +
                "word='" + word + '\'' +
                ", count=" + count +
                '}';
    }
}

ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<WC> dataSource = executionEnvironment.fromCollection(Arrays.asList(new WC("a",1), new WC("b",2), new WC("a",3)));
UnsortedGrouping<WC> word = dataSource.groupBy("word");
word.first(1).print();

keySelector

ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<WC> dataSource = executionEnvironment.fromCollection(Arrays.asList(new WC("a",1), new WC("b",2), new WC("a",3)));
UnsortedGrouping<WC> word = dataSource.groupBy((KeySelector<WC, String>) value -> value.word);
word.first(1).print();

通用转换算子

Map

DataSource<Integer> dataSource = executionEnvironment.fromElements(1, 2, 3, 4);
dataSource.map(x->x+8).print();

FlatMap

DataSource<String> dataSource = executionEnvironment.fromElements("a a aa b b c", "ccc b a a aa");
dataSource.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        for (String s : value.split(" ")) {
            out.collect(s);
        }
    }
});

DataSource<String> dataSource = executionEnvironment.fromElements("a a aa b b c", "ccc b a a aa");
        dataSource.flatMap((FlatMapFunction<String, String>) (value, out) -> {
            for (String s : value.split(" ")) {
                out.collect(s);
            }
        }).returns(Types.STRING).print();

Filter

DataSource<String> dataSource = executionEnvironment.fromElements("a a aa b b c", "ccc b a a aa");
dataSource.filter(x->x.length()>1).print();

Project

DataSource<Tuple3> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));
//映射第三个字段和第二个字段
dataSource.project(2,1).print();

DataSet专用转换算子

reduce

ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));

dataSource.reduce(new ReduceFunction<Tuple3<Integer,String,String>>() {
    @Override
    public Tuple3<Integer,String,String> reduce(Tuple3<Integer,String,String> value1, Tuple3<Integer,String,String> value2) throws Exception {
        return new Tuple3(value1.f0+value2.f0,value1.f1,null);
    }
}).print();

dataSource.reduce((ReduceFunction<Tuple3<Integer, String, String>>) (value1, value2) -> new Tuple3(value1.f0+value2.f0,value1.f1,null)).print();

aggregate

提供的内置功能:

  • sum:求和
  • min:最小值
  • max:最大值
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));

//应用多次聚合
dataSource.groupBy(1)//根据第二个字段分组
  .aggregate(Aggregations.SUM,0)//第一个字段求和
  .and(Aggregations.MIN,2)//第三个字段取最小值
  .print();
//应用多层聚合 先求最大值,再求最大值的和
dataSource.groupBy(1).aggregate(Aggregations.MAX,0).aggregate(Aggregations.SUM,0).print();

distinct

dataSource.distinct();
dataSource.distinct(0,1);
dataSource.distinct("f0","f1");
dataSource.distinct(new KeySelector<Tuple3<Integer, String, String>, String>() {
    @Override
    public String getKey(Tuple3<Integer, String, String> value) throws Exception {
        return value.f1;
    }
});

partition

//partitionByHash

ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));

dataSource.partitionByHash(1).mapPartition(new MapPartitionFunction<Tuple3<Integer, String, String>, Long>() {
    @Override
    public void mapPartition(Iterable<Tuple3<Integer, String, String>> values, Collector<Long> out) throws Exception {
        long count =0;
        for (Tuple3<Integer, String, String> value : values) {
            count++;
        }
         out.collect(count);
    }
}).print();

//partitionByRange
        dataSource.partitionByRange(1).mapPartition(new MapPartitionFunction<Tuple3<Integer, String, String>, Long>() {
            @Override
            public void mapPartition(Iterable<Tuple3<Integer, String, String>> values, Collector<Long> out) throws Exception {
                long count =0;
                for (Tuple3<Integer, String, String> value : values) {
                    count++;
                }
                 out.collect(count);
            }
        }).print();

//sortPartition
//对每个分区数据排序
 dataSource.partitionByRange(1).sortPartition(1, Order.ASCENDING).mapPartition(new MapPartitionFunction<Tuple3<Integer, String, String>, Long>() {
            @Override
            public void mapPartition(Iterable<Tuple3<Integer, String, String>> values, Collector<Long> out) throws Exception {
                long count =0;
                for (Tuple3<Integer, String, String> value : values) {
                    count++;
                }
                 out.collect(count);
            }
        }).print();

minBy/Maxby

返回元祖中指定字段最大或最小的元祖

如果有多个相同的最大值或最小值,返回任意一个

ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));

dataSource.maxBy(1).print();

dataSource.groupBy(1).minBy(0);

First-n

ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

DataSource<Tuple3<Integer,String,String>> dataSource = executionEnvironment.fromCollection(Arrays.asList(new Tuple3(2, "a","d"), new Tuple3(2, "b","e"), new Tuple3(3, "b","f")));

dataSource.first(2).print();
//每个分组中第一个
dataSource.groupBy(0).sortGroup(2, Order.ASCENDING).first(1).print();

DataStream专用转换算子

union

将两个流或多个流合并,从而创建一个包含所有流中元素的新流

必须要求类型相同

datastream.union(otherStream1,otherStream2,...)

connect

连接两个数据流,但是两个数据流只是被放在同一个流(ConnectedStream)中,依然保持各自的数据形式,两个流相互独立

DataStream<Integer> oneStream =...
DataStream<String> twoStream=...
ConnectedStream<Integer,String> connectedStream = oneStream.connect(twoStream)

coMap/coFlatMap

将connectedStream转换为DataStream

split/select

split:将一个流拆分成多个流

select:从split流中获取一个或多个流

keyBy

将DataStream转换为KeyedStream

逻辑分区

dataStream.keyBy(0);
dataStream.keyBy("name")

aggregate

//键控流中的aggregate
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.max(0);
keyedStream.maxBy(0);


//窗口中的aggregate
windowStream.sum(0);
windowStream.maxBy(0);

reduce

对键控流的reduce

keyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return new Tuple2<>(value1.f0,value1.f1+value2.f1);
            }
        });

window

将keyedStream转换为windowStream

keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(2)));

windowAll

将dataStream转换为AllWindowStream

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(2)));

window apply

将windowStream或AllwindowStream转换为DataStream

window reduce

keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(2))).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<>(value1.f0,value1.f1+value2.f1);
    }
});

物理分区算子

partitionCustom

dataStream.partitionCustom(new Partitioner<String>() {
    @Override
    public int partition(String key, int numPartitions) {
        return key.length() % numPartitions;
    }
}, (KeySelector<String, String>) value -> value);

shuffle

根据均匀分布对元素随机划分

rebanlance

以循环的方式分发数据

rescale

以round-robin方式将元素分区

仅需要本地数据传输,而不是通过网络传输数据

上游算子所发送的元素被分区到下游算子的哪些子集,取决于上游算子和下游算子的并行度,比如上游算子的并发度2,下游算子并发度为6,则第一个上游算子将数据发送到其中3个下游算子,另外一个上游算子将数据发送到另外3个下游算子

broadcast

将元素广播到每个分区

算子链接

在默认情况下,算子会自动链接(多个算子共享一个slot)

只能在dataStream转换后使用,因为引用的是先前的转换

禁用链接

StreamExecutionEnvironment.disableOperatorChaining();

开始链接

两个map会被链接,并且filter不会被链接

stream.filter(...)
.map(....)
.startNewChain()
.map(....)

关闭链接

stream.map(...)
.disableChaining();

设置插槽共享组

将具有相同插槽共享组的算子放到同一插槽中

stram.filter(...)
.slotSharingGroup("name")

04转换算子
https://jiajun.xyz/2022/08/28/bigdata/11Flink/02flink_study2/04转换算子/
作者
Lambda
发布于
2022年8月28日
许可协议