03参数处理
参数处理
参数传递给函数
构造方法
public class Parameter1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// socketStream nc -lk 10086
DataStreamSource<String> dataStream = env.socketTextStream("localhost", 10086);
SingleOutputStreamOperator<String> filter = dataStream.filter(new MyFilter(2));
filter.print();
//执行任务
env.execute("word limit");
}
private static class MyFilter implements FilterFunction<String> {
private int min;
public MyFilter(Integer min) {
this.min = min;
}
@Override
public boolean filter(String data) throws Exception {
return Integer.parseInt(data)>min;
}
}
}ExecutionConfig
public class Parameter2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration configuration = new Configuration();
configuration.setInteger("limit",2);
env.getConfig().setGlobalJobParameters(configuration);
DataStreamSource<String> dataStream = env.socketTextStream("localhost", 10086);
SingleOutputStreamOperator<String> filter = dataStream.filter(new MyFilter());
filter.print();
//执行任务
env.execute("word limit");
}
private static class MyFilter extends RichFilterFunction<String> {
private int min;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
int limit = ((Configuration) globalJobParameters).getInteger(ConfigOptions.key("limit").intType().defaultValue(1));
min = limit;
}
@Override
public boolean filter(String data) throws Exception {
return Integer.parseInt(data)>min;
}
}
}读取参数
和Configuration一样继承自 GlobalJobParameters ,可以env.getConfig().setGlobalJobParameters(configuration)
public class Parameter3 {
public static void main(String[] args) throws IOException {
//从args读
ParameterTool parameterTool = ParameterTool.fromArgs(args);
//从map读
ParameterTool parameterTool1 = ParameterTool.fromMap(new HashMap<String, String>() {{
put("a", "a");
}});
//从properties读
ParameterTool parameterTool2 = ParameterTool.fromPropertiesFile("xxxx.properties");
//从系统属性读
ParameterTool parameterTool3 = ParameterTool.fromSystemProperties();
//使用参数
parameterTool1.get("a","a");
}
}03参数处理
https://jiajun.xyz/2022/08/28/bigdata/11Flink/02flink_study2/03参数处理/