04自定义组件

本文最后更新于 2021-08-05 11:42:59

自定义组件

1
2
3
4
5
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
</dependency>

  • 打包后加到lib目录下
  • 配置时
    • 配置类全限定名 interceptor 要配置builder
    • 参数名配置拼接在别名后面 比如
1
2
3
a1.sources.r1.type=netcat      <-----myProp->type
a1.sources.r1.bind=hadoop102
a1.sources.r1.port=44444

source

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class MySource extends AbstractSource implements Configurable, PollableSource {
private String myProp;

@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");

// Process the myProp value (e.g. validation, convert to another type, ...)

// Store myProp for later retrieval by process() method
this.myProp = myProp;
}

@Override
public void start() {
// Initialize the connection to the external client
}

@Override
public void stop () {
// Disconnect from external client and do any additional cleanup
// (e.g. releasing resources or nulling-out field values) ..
}

@Override
public Status process() throws EventDeliveryException {
Status status = null;

try {
// This try clause includes whatever Channel/Event operations you want to do

// Receive new data
Event e = getSomeData();

// Store the Event into this Source's associated Channel(s)
getChannelProcessor().processEvent(e);

status = Status.READY;
} catch (Throwable t) {
// Log exception, handle individual exceptions as needed

status = Status.BACKOFF;

// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}

sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class MySink extends AbstractSink implements Configurable {
private String myProp;

@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");

// Process the myProp value (e.g. validation)

// Store myProp for later retrieval by process() method
this.myProp = myProp;
}

@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}

@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}

@Override
public Status process() throws EventDeliveryException {
Status status = null;

// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do

Event event = ch.take();

// Send the Event to the external repository.
// storeSomeData(e);

txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();

// Log exception, handle individual exceptions as needed

status = Status.BACKOFF;

// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
}
return status;
}
}

interceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class CustomInterceptor implements Interceptor {


//初始化
@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {

Map<String,String> map = event.getHeaders();
map.put("time",System.currentTimeMillis()+"");

return event;

}

@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder {

@Override
public Interceptor build() {
return new CustomInterceptor();
}

@Override
public void configure(Context context) {
}
}
}


04自定义组件
https://jiajun.xyz/2020/10/25/bigdata/04flume/04自定义组件/
作者
Lambda
发布于
2020年10月25日
更新于
2021年8月5日
许可协议