08复杂事件处理

复杂事件处理

复杂事件处理库(Complex Event Processing),可以在无限时间流中检测出特定的事件模型

每个复杂的模式序列包括多个简单的模式,比如,寻找拥有相同属性事件序列的模式。从现在开始,我们把这些简单的模式称作模式, 把我们在数据流中最终寻找的复杂模式序列称作模式序列,你可以把模式序列看作是这样的模式构成的图, 这些模式基于用户指定的条件从一个转换到另外一个,比如 event.getName().equals("end")。 一个匹配是输入事件的一个序列,这些事件通过一系列有效的模式转换,能够访问到复杂模式图中的所有模式。

每个模式必须有一个独一无二的名字,你可以在后面使用它来识别匹配到的事件。

模式的名字不能包含字符":"

单个模式

一个模式可以是一个单例或者循环模式。单例模式只接受一个事件,循环模式可以接受多个事件。 在模式匹配表达式中,模式"a b+ c? d"(或者"a",后面跟着一个或者多个"b",再往后可选择的跟着一个"c",最后跟着一个"d"), ac?,和 d都是单例模式,b+是一个循环模式。默认情况下,模式都是单例的,你可以通过使用量词把它们转换成循环模式。 每个模式可以有一个或者多个条件来决定它接受哪些事件。

量词

在FlinkCEP中,你可以通过这些方法指定循环模式:

pattern.oneOrMore(),指定期望一个给定事件出现一次或者多次的模式(例如前面提到的b+模式); pattern.times(#ofTimes),指定期望一个给定事件出现特定次数的模式,例如出现4次a

pattern.times(#fromTimes, #toTimes),指定期望一个给定事件出现次数在一个最小值和最大值中间的模式,比如出现2-4次a

你可以使用pattern.greedy()方法让循环模式变成贪心的,但现在还不能让模式组贪心。

你可以使用pattern.optional()方法让所有的模式变成可选的,不管是否是循环模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 期望出现4次
start.times(4);

// 期望出现0或者4次
start.times(4).optional();

// 期望出现2、3或者4次
start.times(2, 4);

// 期望出现2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).greedy();

// 期望出现0、2、3或者4次
start.times(2, 4).optional();

// 期望出现0、2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).optional().greedy();

// 期望出现1到多次
start.oneOrMore();

// 期望出现1到多次,并且尽可能的重复次数多
start.oneOrMore().greedy();

// 期望出现0到多次
start.oneOrMore().optional();

// 期望出现0到多次,并且尽可能的重复次数多
start.oneOrMore().optional().greedy();

// 期望出现2到多次
start.timesOrMore(2);

// 期望出现2到多次,并且尽可能的重复次数多
start.timesOrMore(2).greedy();

// 期望出现0、2或多次
start.timesOrMore(2).optional();

// 期望出现0、2或多次,并且尽可能的重复次数多
start.timesOrMore(2).optional().greedy();

条件

对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,例如,它的value字段应该大于5,或者大于前面接受的事件的平均值。 指定判断事件属性的条件可以通过pattern.where()pattern.or()或者pattern.until()方法。 这些可以是IterativeCondition或者SimpleCondition

迭代条件: 这是最普遍的条件类型。使用它可以指定一个基于前面已经被接受的事件的属性或者它们的一个子集的统计数据来决定是否接受时间序列的条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
middle.oneOrMore()
.subtype(SubEvent.class)
.where(new IterativeCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
if (!value.getName().startsWith("foo")) {
return false;
}

double sum = value.getPrice();
for (Event event : ctx.getEventsForPattern("middle")) {
sum += event.getPrice();
}
return Double.compare(sum, 5.0) < 0;
}
});

调用ctx.getEventsForPattern(...)可以获得所有前面已经接受作为可能匹配的事件。 调用这个操作的代价可能很小也可能很大,所以在实现你的条件时,尽量少使用它。

简单条件:它决定是否接受一个事件只取决于事件自身的属性。

1
2
3
4
5
6
start.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return value.getName().startsWith("foo");
}
});

subType:可以通过pattern.subtype(subClass)方法限制接受的事件类型是初始事件的子类型。

1
2
3
4
5
6
start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value) {
return ...; // 一些判断条件
}
});

组合条件:你可以把subtype条件和其他的条件结合起来使用。这适用于任何条件,你可以通过依次调用where()来组合条件。 最终的结果是每个单一条件的结果的逻辑AND。如果想使用OR来组合条件,你可以像下面这样使用or()方法。

1
2
3
4
5
6
7
8
9
10
11
pattern.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ...; // 一些判断条件
}
}).or(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ...; // 一些判断条件
}
});

停止条件:如果使用循环模式(oneOrMore()oneOrMore().optional()),你可以指定一个停止条件,例如,接受事件的值大于5直到值的和小于50。

为了更好的理解它,看下面的例子。给定

  • 模式如"(a+ until b)" (一个或者更多的"a"直到"b")
  • 到来的事件序列"a1" "c" "a2" "b" "a3"
  • 输出结果会是: {a1 a2} {a1} {a2} {a3}.

你可以看到{a1 a2 a3}{a2 a3}由于停止条件没有被输出。

模式操作 描述
where(condition) 为当前模式定义一个条件。为了匹配这个模式,一个事件必须满足某些条件。 多个连续的where()语句取与组成判断条件:java pattern.where(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return ... // 一些判断条件 } });
or(condition) 增加一个新的判断,和当前的判断取或。一个事件只要满足至少一个判断条件就匹配到模式:java pattern.where(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return ... // 一些判断条件 } }).or(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return ... // 替代条件 } });
until(condition) 为循环模式指定一个停止条件。意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了。只适用于和oneOrMore()同时使用。NOTE: 在基于事件的条件中,它可用于清理对应模式的状态。java pattern.oneOrMore().until(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return ... // 替代条件 } });
subtype(subClass) 为当前模式定义一个子类型条件。一个事件只有是这个子类型的时候才能匹配到模式:java pattern.subtype(SubEvent.class);
oneOrMore() 指定模式期望匹配到的事件至少出现一次。.默认(在子事件间)使用松散的内部连续性。NOTE: 推荐使用until()或者within()来清理状态。java pattern.oneOrMore();
timesOrMore(#times) 指定模式期望匹配到的事件至少出现**#times**次。.默认(在子事件间)使用松散的内部连续性。 java pattern.timesOrMore(2); ```
times(#ofTimes) 指定模式期望匹配到的事件正好出现的次数。默认(在子事件间)使用松散的内部连续性。java pattern.times(2); ```
times(#fromTimes, #toTimes) 指定模式期望匹配到的事件出现次数在**#fromTimes#toTimes**之间。默认(在子事件间)使用松散的内部连续性。java pattern.times(2, 4); ```
optional() 指定这个模式是可选的,也就是说,它可能根本不出现。这对所有之前提到的量词都适用。java pattern.oneOrMore().optional();
greedy() 指定这个模式是贪心的,也就是说,它会重复尽可能多的次数。这只对量词适用,现在还不支持模式组。java pattern.oneOrMore().greedy();

组合模式

可以增加更多的模式到模式序列中并指定它们之间所需的连续条件。FlinkCEP支持事件之间如下形式的连续策略:

  1. 严格连续: 期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。
  2. 松散连续: 忽略匹配的事件之间的不匹配的事件。
  3. 不确定的松散连续: 更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。

可以使用下面的方法来指定模式之间的连续策略:

  1. next(),指定严格连续
  2. followedBy(),指定松散连续
  3. followedByAny(),指定不确定的松散连续。
  4. notNext(),如果不想后面直接连着一个特定事件
  5. notFollowedBy(),如果不想一个特定事件发生在两个事件之间的任何地方。

模式序列不能以notFollowedBy()结尾。

一个 NOT 模式前面不能是可选的模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Pattern<Event, ?> start = Pattern.<Event>begin("start");
// 严格连续
Pattern<Event, ?> strict = start.next("middle").where(...);

// 松散连续
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);

// 不确定的松散连续
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);

// 严格连续的NOT模式
Pattern<Event, ?> strictNot = start.notNext("not").where(...);

// 松散连续的NOT模式
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);

松散连续意味着跟着的事件中,只有第一个可匹配的事件会被匹配上,而不确定的松散连接情况下,有着同样起始的多个匹配会被输出。 举例来说,模式"a b",给定事件序列"a","c","b1","b2",会产生如下的结果:

  1. "a""b"之间严格连续: {} (没有匹配),"a"之后的"c"导致"a"被丢弃。
  2. "a""b"之间松散连续: {a b1},松散连续会”跳过不匹配的事件直到匹配上的事件”。
  3. "a""b"之间不确定的松散连续: {a b1}, {a b2},这是最常见的情况。

within

可以通过pattern.within()方法指定一个模式应该在10秒内发生。

一个模式序列只能有一个时间限制。如果限制了多个时间在不同的单个模式上,会使用最小的那个时间限制。

1
next.within(Time.seconds(10));

连续性

连续性会被运用在被接受进入模式的事件之间。 用这个例子来说明上面所说的连续性,一个模式序列"a b+ c""a"后面跟着一个或者多个(不确定连续的)"b",然后跟着一个"c") 输入为"a","b1","d1","b2","d2","b3","c",输出结果如下:

  1. 严格连续: {a b1 c}, {a b2 c}, {a b3 c} - 没有相邻的 "b"
  2. 松散连续: {a b1 c}{a b1 b2 c}{a b1 b2 b3 c}{a b2 c}{a b2 b3 c}{a b3 c} - "d"都被忽略了。
  3. 不确定松散连续: {a b1 c}{a b1 b2 c}{a b1 b3 c}{a b1 b2 b3 c}{a b2 c}{a b2 b3 c}{a b3 c} - 注意{a b1 b3 c},这是因为"b"之间是不确定松散连续产生的。

对于循环模式(例如oneOrMore()times())),默认是松散连续。如果想使用严格连续,你需要使用consecutive()方法明确指定, 如果想使用不确定松散连续,你可以使用allowCombinations()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().consecutive()////////////////////////
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().allowCombinations() ///////////////////////
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});

模式组

定义一个模式序列作为beginfollowedByfollowedByAnynext的条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Pattern<Event, ?> start = Pattern.begin(
Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);

// 严格连续
Pattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);

// 松散连续
Pattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();

// 不确定松散连续
Pattern<Event, ?> nonDetermin = start.followedByAny(
Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
模式操作 描述
begin(#name) 定义一个开始的模式:java Pattern start = Pattern.begin("start");
begin(#pattern_sequence) 定义一个开始的模式:java Pattern start = Pattern.begin( Pattern.begin("start").where(...).followedBy("middle").where(...) );
next(#name) 增加一个新的模式。匹配的事件必须是直接跟在前面匹配到的事件后面(严格连续):java Pattern next = start.next("middle");
next(#pattern_sequence) 增加一个新的模式。匹配的事件序列必须是直接跟在前面匹配到的事件后面(严格连续):java Pattern next = start.next( Pattern.begin("start").where(...).followedBy("middle").where(...) );
followedBy(#name) 增加一个新的模式。可以有其他事件出现在匹配的事件和之前匹配到的事件中间(松散连续):java Pattern followedBy = start.followedBy("middle");
followedBy(#pattern_sequence) 增加一个新的模式。可以有其他事件出现在匹配的事件序列和之前匹配到的事件中间(松散连续):java Pattern followedBy = start.followedBy( Pattern.begin("start").where(...).followedBy("middle").where(...) );
followedByAny(#name) 增加一个新的模式。可以有其他事件出现在匹配的事件和之前匹配到的事件中间, 每个可选的匹配事件都会作为可选的匹配结果输出(不确定的松散连续):java Pattern followedByAny = start.followedByAny("middle");
followedByAny(#pattern_sequence) 增加一个新的模式。可以有其他事件出现在匹配的事件序列和之前匹配到的事件中间, 每个可选的匹配事件序列都会作为可选的匹配结果输出(不确定的松散连续):java Pattern followedByAny = start.followedByAny( Pattern.begin("start").where(...).followedBy("middle").where(...) );
notNext() 增加一个新的否定模式。匹配的(否定)事件必须直接跟在前面匹配到的事件之后(严格连续)来丢弃这些部分匹配:java Pattern notNext = start.notNext("not");
notFollowedBy() 增加一个新的否定模式。即使有其他事件在匹配的(否定)事件和之前匹配的事件之间发生, 部分匹配的事件序列也会被丢弃(松散连续):java Pattern notFollowedBy = start.notFollowedBy("not");
within(time) 定义匹配模式的事件序列出现的最大时间间隔。如果未完成的事件序列超过了这个事件,就会被丢弃:java pattern.within(Time.seconds(10));

跳过策略

对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,你需要指定跳过策略AfterMatchSkipStrategy。 有五种跳过策略,如下:

  • NO_SKIP: 每个成功的匹配都会被输出。
  • SKIP_TO_NEXT: 丢弃以相同事件开始的所有部分匹配。
  • SKIP_PAST_LAST_EVENT: 丢弃起始在这个匹配的开始和结束之间的所有部分匹配。
  • SKIP_TO_FIRST: 丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。
  • SKIP_TO_LAST: 丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。

给定一个模式b+ c和一个数据流b1 b2 b3 c,不同跳过策略之间的不同如下:

跳过策略 结果 描述
NO_SKIP b1 b2 b3 c
b2 b3 c
b3 c
找到匹配b1 b2 b3 c之后,不会丢弃任何结果。
SKIP_TO_NEXT b1 b2 b3 c
b2 b3 c
b3 c
找到匹配b1 b2 b3 c之后,不会丢弃任何结果,因为没有以b1开始的其他匹配。
SKIP_PAST_LAST_EVENT b1 b2 b3 c 找到匹配b1 b2 b3 c之后,会丢弃其他所有的部分匹配。
SKIP_TO_FIRST[b] b1 b2 b3 c
b2 b3 c
b3 c
找到匹配b1 b2 b3 c之后,会尝试丢弃所有在b1之前开始的部分匹配,但没有这样的匹配,所以没有任何匹配被丢弃。
SKIP_TO_LAST[b] b1 b2 b3 c
b3 c
找到匹配b1 b2 b3 c之后,会尝试丢弃所有在b3之前开始的部分匹配,有一个这样的b2 b3 c被丢弃。

(a | b | c) (b | c) c+.greedy d,输入:a b c1 c2 c3 d,结果将会是:

跳过策略 结果 描述
NO_SKIP a b c1 c2 c3 d
b c1 c2 c3 d
c1 c2 c3 d
找到匹配a b c1 c2 c3 d之后,不会丢弃任何结果。
SKIP_TO_FIRST[c*] a b c1 c2 c3 d
c1 c2 c3 d
找到匹配a b c1 c2 c3 d之后,会丢弃所有在c1之前开始的部分匹配,有一个这样的b c1 c2 c3 d被丢弃。

模式:a b+,输入:a b1 b2 b3,结果将会是:

跳过策略 结果 描述
NO_SKIP a b1
a b1 b2
a b1 b2 b3
找到匹配a b1之后,不会丢弃任何结果。
SKIP_TO_NEXT a b1 找到匹配a b1之后,会丢弃所有以a开始的部分匹配。这意味着不会产生a b1 b2a b1 b2 b3了。

想指定要使用的跳过策略,只需要调用下面的方法创建AfterMatchSkipStrategy

方法 描述
AfterMatchSkipStrategy.noSkip() 创建NO_SKIP策略
AfterMatchSkipStrategy.skipToNext() 创建SKIP_TO_NEXT策略
AfterMatchSkipStrategy.skipPastLastEvent() 创建SKIP_PAST_LAST_EVENT策略
AfterMatchSkipStrategy.skipToFirst(patternName) 创建引用模式名称为patternNameSKIP_TO_FIRST策略
AfterMatchSkipStrategy.skipToLast(patternName) 创建引用模式名称为patternNameSKIP_TO_LAST策略

应用到流

1
2
3
4
5
DataStream<Event> input = ...;
Pattern<Event, ?> pattern = ...;
EventComparator<Event> comparator = ...; // 可选的

PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);

按照Map<String, List<IN>>的格式接收一个匹配,映射的键是你的模式序列中的每个模式的名称,值是被接受的事件列表(IN是输入事件的类型)。 模式的输入事件按照时间戳进行排序。为每个模式返回一个接受的事件列表的原因是当使用循环模式(比如oneToMany()times())时, 对一个模式会有不止一个事件被接受。

1
2
3
4
5
6
7
8
class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
@Override
public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
IN startEvent = match.get("start").get(0);
IN endEvent = match.get("end").get(0);
out.collect(OUT(startEvent, endEvent));
}
}

时间

CEP中,事件的处理顺序很重要。在使用事件时间时,为了保证事件按照正确的顺序被处理,一个事件到来后会先被放到一个缓冲区中, 在缓冲区里事件都按照时间戳从小到大排序,当水位线到达后,缓冲区中所有小于水位线的事件被处理。这意味着水位线之间的数据都按照时间戳被顺序处理。

为了保证跨水位线的事件按照事件时间处理,Flink CEP库假定水位线一定是正确的,并且把时间戳小于最新水位线的事件看作是晚到的。 晚到的事件不会被处理。你也可以指定一个侧输出标志来收集比最新水位线晚到的事件,你可以这样做:

1
2
3
4
5
6
7
8
9
10
11
PatternStream<Event> patternStream = CEP.pattern(input, pattern);

OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};

SingleOutputStreamOperator<ComplexEvent> result = patternStream
.sideOutputLateData(lateDataOutputTag)
.select(
new PatternSelectFunction<Event, ComplexEvent>() {...}
);

DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);

08复杂事件处理
https://jiajun.xyz/2022/08/28/bigdata/11Flink/02flink_study2/08复杂事件处理/
作者
Lambda
发布于
2022年8月28日
许可协议