12状态一致性
本文最后更新于 2022-08-28 11:30:56
状态一致性
状态一致性实际上是”正确性级别”的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确
状态一致性级别
AT_MOST_ONCE(最多一次),当任务故障时最简单做法是什么都不干,既不恢复丢失状态,也不重播丢失数据。At-most-once语义的含义是最多处理一次事件。
AT_LEAST_ONCE(至少一次),在大多数真实应用场景,我们希望不丢失数据。这种类型的保障称为at-least-once,意思是所有的事件都得到了处理,而一些事件还可能被处理多次。
EXACTLY_ONCE(精确一次),恰好处理一次是最严格的的保证,也是最难实现的。恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次。

端到端(end-to-end)
在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。
端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。
- 内部保证:依赖checkpoint
- source 端:需要外部源可重设数据的读取位置
- sink 端:需要保证从故障恢复时,数据不会重复写入外部系统。 而对于sink端,又有两种具体的实现方式:
- 幂等(Idempotent)写入:所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。
- 事务性(Transactional)写入:需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。
对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。Flink DataStream API 提供了GenericWriteAheadSink 模板类和 TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。
预写日志
把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统;
简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink系统,都能用这种方式一批搞定;
瑕疵:
- sink系统没说它支持事务,有可能出现一部分写进去了一部分没写进去(如果算失败,再写一次就写了两次了);
- checkpoint做完了sink才去真正写入(但其实得等sink都写完checkpoint才能生效,所以WAL这个机制jobmanager确定它写完还不算真正写完,还得有一个外部系统已经确认完成的checkpoint)
两段提交
对于每个checkpoint,sink任务会启动一个事务,并将接下来所有接收的数据添加到事务里;
然后将这些数据写入外部sink系统,但不提交他们 – 这时只是预提交;
当它收到checkpoint完成时的通知,它才正式提交事务,实现结果的真正写入;
这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统,Flink提供了TwoPhaseCommitSinkFunction接口。
对sink系统要求
- 外部sink系统必须事务支持,或者sink任务必须能够模拟外部系统上的事务;
- 在checkpoint的间隔期间里,必须能够开启一个事务,并接受数据写入;
- 在收到checkpoint完成的通知之前,事务必须是“等待提交”的状态,在故障恢复的情况下,这可能需要一些时间。如果这个时候sink系统关闭事务(例如超时了),那么未提交的数据就会丢失;
- sink任务必须能够在进程失败后恢复事务;
- 提交事务必须是幂等操作;
两段提交流程:
Flink由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。

当 checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流;barrier会在算子间传递下去。

每个算子会对当前的状态做个快照,保存到状态后端;
对于source任务而言,就会把当前的offset作为状态保存起来。下次从checkpoint恢复时,source任务可以重新提交偏移量,从上次保存的位置开始重新消费数据。

每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里。
sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务(还不能被消费);当遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务。(以barrier为界之前的数
据属于上一个事务,之后的数据属于下一个新的事务);

当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成。
当sink 任务收到确认通知,就会正式提交之前的事务,kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了。

所以我们看到,执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。