03参数处理

参数处理

参数传递给函数

构造方法

public class Parameter1 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // socketStream   nc -lk 10086
        DataStreamSource<String> dataStream = env.socketTextStream("localhost", 10086);

        SingleOutputStreamOperator<String> filter = dataStream.filter(new MyFilter(2));
        filter.print();

        //执行任务
        env.execute("word limit");

    }

    private static class MyFilter implements FilterFunction<String> {
        private int min;
        public MyFilter(Integer min) {
            this.min = min;
        }

        @Override
        public boolean filter(String data) throws Exception {
            return Integer.parseInt(data)>min;
        }
    }
}

ExecutionConfig

public class Parameter2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration configuration = new Configuration();
        configuration.setInteger("limit",2);

        env.getConfig().setGlobalJobParameters(configuration);

        DataStreamSource<String> dataStream = env.socketTextStream("localhost", 10086);
        SingleOutputStreamOperator<String> filter = dataStream.filter(new MyFilter());
        filter.print();

        //执行任务
        env.execute("word limit");
    }
    private static class MyFilter extends RichFilterFunction<String> {
        private int min;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

            int limit = ((Configuration) globalJobParameters).getInteger(ConfigOptions.key("limit").intType().defaultValue(1));

            min = limit;
        }

        @Override
        public boolean filter(String data) throws Exception {
            return Integer.parseInt(data)>min;
        }
    }
}

读取参数

Configuration一样继承自 GlobalJobParameters ,可以env.getConfig().setGlobalJobParameters(configuration)

public class Parameter3 {
    public static void main(String[] args) throws IOException {
        //从args读
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        //从map读
        ParameterTool parameterTool1 = ParameterTool.fromMap(new HashMap<String, String>() {{
            put("a", "a");
        }});
        
        //从properties读
        ParameterTool parameterTool2 = ParameterTool.fromPropertiesFile("xxxx.properties");
        
        //从系统属性读
        ParameterTool parameterTool3 = ParameterTool.fromSystemProperties();

        //使用参数
        parameterTool1.get("a","a");
    }
}

03参数处理
https://jiajun.xyz/2022/08/28/bigdata/11Flink/02flink_study2/03参数处理/
作者
Lambda
发布于
2022年8月28日
许可协议