03参数处理

参数处理

参数传递给函数

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Parameter1 {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// socketStream nc -lk 10086
DataStreamSource<String> dataStream = env.socketTextStream("localhost", 10086);

SingleOutputStreamOperator<String> filter = dataStream.filter(new MyFilter(2));
filter.print();

//执行任务
env.execute("word limit");

}

private static class MyFilter implements FilterFunction<String> {
private int min;
public MyFilter(Integer min) {
this.min = min;
}

@Override
public boolean filter(String data) throws Exception {
return Integer.parseInt(data)>min;
}
}
}

ExecutionConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Parameter2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration configuration = new Configuration();
configuration.setInteger("limit",2);

env.getConfig().setGlobalJobParameters(configuration);

DataStreamSource<String> dataStream = env.socketTextStream("localhost", 10086);
SingleOutputStreamOperator<String> filter = dataStream.filter(new MyFilter());
filter.print();

//执行任务
env.execute("word limit");
}
private static class MyFilter extends RichFilterFunction<String> {
private int min;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

int limit = ((Configuration) globalJobParameters).getInteger(ConfigOptions.key("limit").intType().defaultValue(1));

min = limit;
}

@Override
public boolean filter(String data) throws Exception {
return Integer.parseInt(data)>min;
}
}
}

读取参数

Configuration一样继承自 GlobalJobParameters ,可以env.getConfig().setGlobalJobParameters(configuration)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Parameter3 {
public static void main(String[] args) throws IOException {
//从args读
ParameterTool parameterTool = ParameterTool.fromArgs(args);

//从map读
ParameterTool parameterTool1 = ParameterTool.fromMap(new HashMap<String, String>() {{
put("a", "a");
}});

//从properties读
ParameterTool parameterTool2 = ParameterTool.fromPropertiesFile("xxxx.properties");

//从系统属性读
ParameterTool parameterTool3 = ParameterTool.fromSystemProperties();

//使用参数
parameterTool1.get("a","a");
}
}

03参数处理
https://jiajun.xyz/2022/08/28/bigdata/11Flink/02flink_study2/03参数处理/
作者
Lambda
发布于
2022年8月28日
许可协议