01firstDemo

firstDemo

BatchDemo

package priv.king.chapter1;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BatchDemo {

    public static void main(String[] args) throws Exception {

        //获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //获取自定义的数据流
        DataStream<Tuple2<String, Integer>> dataStream =env.fromElements("Flink batch demo", "batch demo", "demo")
                .flatMap(new Splitter())
                .keyBy(x->x.f0)
                .sum(1);
        //打印数据到控制台
        env.setParallelism(2);
        System.out.println(env.getExecutionPlan());
        dataStream.print();
        //执行任务操作。因为flink是懒加载的,所以必须调用execute方法才会执行
        env.execute("WordCount");


    }

    //使用FlatMapFunction函数分割字符串
    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

StreamDemo

package priv.king.chapter1;


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Arrays;

public class StreamDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // socketStream   nc -lk 10086
        DataStreamSource<String> dataStream = env.socketTextStream("localhost", 10086);

        SingleOutputStreamOperator<Tuple2<String, Integer>> res = dataStream
                //lambda表达式这里必须指定FlatMapFunction 和 .returns
                //不能推断
                .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (in, out) -> Arrays.stream(in.split(" ")).forEach(x -> out.collect(new Tuple2<>(x, 1))))
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(x->x.f0)
                .timeWindow(Time.seconds(10))
                .sum(1);
        //sink
        res.print();
        //执行任务
        env.execute("word count");
    }
}

TableDemo

package priv.king.chapter1;


import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

public class TableDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner() //使用BlinkPlaner
                .inStreamingMode() //设置流式模式 默认
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        DataStreamSource<String> dataStream = env.socketTextStream("localhost", 10086);

        Table table = tEnv.fromDataStream(dataStream, $("word"));


        Table res = table.where($("word").like("%a%"));

        DataStream<Row> resStream = tEnv.toAppendStream(res, Row.class);

        resStream.print("tb");

        env.execute();

    }
}

SqlDemo

package priv.king.chapter1;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

//需要手动导入
import static org.apache.flink.table.api.Expressions.$;

public class SqlDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> dataStream = env.socketTextStream("localhost", 10086);

        Table table = tEnv.fromDataStream(dataStream, $("word"));

        Table table1 = tEnv.sqlQuery("select * from " + table + " where word like '%a%'");

        tEnv.toAppendStream(table1, Row.class).print();

        env.execute();
    }
}

01firstDemo
https://jiajun.xyz/2022/08/28/bigdata/11Flink/02flink_study2/01firstDemo/
作者
Lambda
发布于
2022年8月28日
许可协议