参数处理
参数传递给函数
构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class Parameter1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStream = env.socketTextStream("localhost", 10086);
SingleOutputStreamOperator<String> filter = dataStream.filter(new MyFilter(2)); filter.print();
env.execute("word limit");
}
private static class MyFilter implements FilterFunction<String> { private int min; public MyFilter(Integer min) { this.min = min; }
@Override public boolean filter(String data) throws Exception { return Integer.parseInt(data)>min; } } }
|
ExecutionConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class Parameter2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Configuration configuration = new Configuration(); configuration.setInteger("limit",2);
env.getConfig().setGlobalJobParameters(configuration);
DataStreamSource<String> dataStream = env.socketTextStream("localhost", 10086); SingleOutputStreamOperator<String> filter = dataStream.filter(new MyFilter()); filter.print();
env.execute("word limit"); } private static class MyFilter extends RichFilterFunction<String> { private int min;
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
int limit = ((Configuration) globalJobParameters).getInteger(ConfigOptions.key("limit").intType().defaultValue(1));
min = limit; }
@Override public boolean filter(String data) throws Exception { return Integer.parseInt(data)>min; } } }
|
读取参数
和Configuration一样继承自 GlobalJobParameters ,可以env.getConfig().setGlobalJobParameters(configuration)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class Parameter3 { public static void main(String[] args) throws IOException { ParameterTool parameterTool = ParameterTool.fromArgs(args);
ParameterTool parameterTool1 = ParameterTool.fromMap(new HashMap<String, String>() {{ put("a", "a"); }}); ParameterTool parameterTool2 = ParameterTool.fromPropertiesFile("xxxx.properties"); ParameterTool parameterTool3 = ParameterTool.fromSystemProperties();
parameterTool1.get("a","a"); } }
|