stream .windowAll(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (elsedefault trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/aggregate/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag"
window生命周期
简而言之,当第一个应该属于该窗口的元素(没有偏移量的情况下,每个窗口默认都是整点,比如一个小时大小的窗口,开始事件就是0分0秒,也就是当前时间戳能够整除窗口大小)到达时,就会创建一个窗口,当时间(event or processing time)超过它的结束时间戳加上用户指定的允许延迟(allowed lateness)时,该窗口将被完全删除。Flink保证只删除基于时间的窗口,而不删除其他类型的窗口,例如全局窗口(global windows)。例如,event-time-based窗口策略创建重叠(tumbling)窗户每5分钟,有一个允许迟到1分钟,Flink将创建一个新窗口为12点之间的间隔和12:05当第一个元素和一个时间戳,在这个区间内,当水印经过12:06时间戳时,它将删除它
此外,每个窗口将有一个触发器(Trigger)和一个函数(ProcessWindowFunction, ReduceFunction,或AggregateFunction)附加到它上。该函数将包含应用于窗口内容的计算,而Trigger指定了窗口被认为可以应用该函数的条件。触发策略可能类似于“when the number of elements in the window is more than 4”,或者“when the watermark passes the end of the window”。
// event-time session windows with static gap input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>)
// event-time session windows with dynamic gap input .keyBy(<key selector>) .window(EventTimeSessionWindows.withDynamicGap(newSessionWindowTimeGapExtractor[String] { overridedefextract(element: String): Long = { // determine and return session gap } })) .<windowed transformation>(<window function>)
// processing-time session windows with static gap input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>)
// processing-time session windows with dynamic gap input .keyBy(<key selector>) .window(DynamicProcessingTimeSessionWindows.withDynamicGap(newSessionWindowTimeGapExtractor[String] { overridedefextract(element: String): Long = { // determine and return session gap } })) .<windowed transformation>(<window function>)
abstractclassProcessWindowFunction[IN, OUT, KEY, W <: Window] extendsFunction{
/** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param context The context in which the window is being evaluated. * @param elements The elements in the window being evaluated. * @param out A collector for emitting elements. * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ defprocess( key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT])
//The context holding window metadata
abstractclassContext{ // Returns the window that is being evaluated. defwindow: W //Returns the current processing time. defcurrentProcessingTime: Long //Returns the current event-time watermark. defcurrentWatermark: Long // State accessor for per-key and per-window state. defwindowState: KeyedStateStore //State accessor for per-key global state. //可以获取其他同一个周期内的其他窗口状态 defglobalState: KeyedStateStore }
}
//////////////////////////////////////////////////////////////////// val input: DataStream[(String, Long)] = ...
/** * The accumulator is used to keep a running sum and a count. The [getResult] method * computes the average. */ classAverageAggregateextendsAggregateFunction[(String, Long), (Long, Long), Double] { overridedefcreateAccumulator() = (0L, 0L)
/** * Creates a trigger that fires once the number of elements in a pane reaches the given count. * * @param maxCount The count of elements at which to fire. * @param <W> The type of {@link Window Windows} on which this trigger can operate. */ publicstatic <W extendsWindow> CountTrigger<W> of(long maxCount) { returnnewCountTrigger<>(maxCount); }
/** * Creates a new purging trigger from the given {@code Trigger}. * * @param nestedTrigger The trigger that is wrapped by this purging trigger */ publicstatic <T, W extendsWindow> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) { returnnewPurgingTrigger<>(nestedTrigger); }
@VisibleForTesting public Trigger<T, W> getNestedTrigger() { return nestedTrigger; } }
/** * Optionally evicts elements. Called before windowing function. * * @param elements The elements currently in the pane. * @param size The current number of elements in the pane. * @param window The {@link Window} * @param evictorContext The context for the Evictor */ void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/** * Optionally evicts elements. Called after windowing function. * * @param elements The elements currently in the pane. * @param size The current number of elements in the pane. * @param window The {@link Window} * @param evictorContext The context for the Evictor */ void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);