在流计算场景下,数据会源源不断地流入Apache Flink系统,每一次数据进入Apache Flink系统都会触发计算。如果我们要进行计数聚合计算,是每个触发器计算都重新计算历史中的所有流入数据,还是每个计算都在前一个计算结果的基础上执行增量计算?答案是肯定的,Apache Flink根据上次的计算结果进行增量计算。那么问题来了:“最后的计算结果保存在哪里,可以保存在内存中吗?”答案是否定的,如果存储在内存中,如果某个计算节点由于网络、硬件等原因出现故障,最后的计算结果就会丢失。当节点恢复后,需要重新计算历史上的所有数据(可能是十天,也可能是几百天),所以为了避免这种灾难性的问题,Apache Flink会使用State来存储计算结果。本文将向您介绍Apache Flink State。
什么是状态
这个问题好像有点“弱智”?不管问题的答案是否明显,我还是想简单说一下Flink中的State是什么?状态是指流量计算过程中计算节点的中间计算结果或元数据属性。例如,在汇总过程中,中间汇总结果应记录在State中。比如以阿帕奇卡夫卡为数据源时,我们也要记录读取记录的偏移量。这些状态数据将在计算过程中保持(插入或更新)。所以Flink中的状态是与时间相关的Flink任务内部数据(计算数据和元数据属性)的快照。
你为什么需要国家?
与批处理计算相比,状态是流计算所独有的。批处理计算没有故障转移机制,它要么成功,要么重新计算。流量计算在大多数场景下是增量计算,数据是一个一个处理的(大多数场景下)。每个计算都是在最后一个计算结果之上处理的,所以这个机制必然会存储最后一个计算结果(生产模式应该是持久的)。此外,由于机器、网络、脏数据等引起的程序错误。重新启动作业时,有必要从成功的检查点(将在下一章中专门介绍)恢复状态。增量和故障转移机制都需要国家的支持。
状态存储实现
Flink中有三种状态存储实现,如下所示:
基于内存的heapstateback end-用于调试模式,不建议用于生产模式。
基于HDFS的fsstate back end-分布式文件持久化,每次读写都操作内存,还需要考虑OOM问题;
基于RocksDB的Rocksdbstatebackend本地文件+异步HDFS持久化;
状态存储模式
默认情况下,Apache Flink将状态存储在RocksDB+HDFS中。状态存储分为两个阶段,首先本地存储到RocksDB,然后异步同步到远程HDFS。这种设计不仅消除了Heapstateback的限制(内存大小、坏机等。),还减少了纯分布式存储的网络IO开销。
状态分类
keyed state——这里的key是我们在SQL语句中对应的groupby/PartitionBy中的字段,key的值是由group by/partition by字段组成的Row的字节数组。每个键都有自己的状态,键之间的状态是不可见的;
OperatorState将用于记录OperatorState-Flink内部源连接器实现中源数据读取的偏移量。
国家扩张时的再分配
Flink是一个大规模并行分布式系统,允许大规模有状态流处理。为了可扩展性,Flink作业在逻辑上分解为操作符图,每个操作符的执行在物理上分解为多个并行的操作符实例。从概念上讲,Flink中的每个并行操作符实例都是一个独立的任务,可以被其他连接到网络的机器调度到自己的机器上运行。
在Flink的DAG图中,只有边连接的节点有网络通信,即整个DAG在垂直方向有网络IO,水平方向有状态节点之间没有网络通信,如下图所示。该模型还确保每个操作符实例维护自己的状态,并将其保存在本地磁盘上(远程异步同步)。通过这种设计,任务的所有状态数据都是本地的,并且状态访问不需要任务之间的网络通信。避免这种流量对于像Flink这样的大规模并行分布式系统的可伸缩性至关重要。
我们知道Flink中有operator State和KeyedState,那么在扩容(增加并发)时如何分配状态呢?例如,外部源有五个分区,在Flink上从源的一个并发扩展为两个并发,中间有状态操作节点从两个并发扩展为三个并发,如下图所示:
在Flink中,不同类型的状态有不同的扩展方法。接下来我们就分别介绍一下。
运营商对容量扩展的处理
下面我们选择Flink中一个具体的连接器实现例子来介绍一下。以MetaQ为例。MetaQ在主题模式下订阅数据,每个主题会有N>0个分区。以上图为例。假设我们订阅的MetaQ的话题有5个分区,那么当我们的源从1并发调整为2并发时,状态如何恢复?
还原状态的方式必然与源中OperatorState的存储结构有关。我们先来看看MetaQSource的实现是如何存储状态的。首先,MetaQSource实现了ListCheckpointed,其中T是Tuple2。我们来看看ListCheckpointed接口的内部定义,如下所示:
publicinterface 友优资源网;ListCheckpointed{ListsnapshotState(longvar1,longvar3)throwsException;voidrestoreState(Listvar1)throwsException;}我们发现snapshotState方法的返回值是一个列表,t是Tuple2,即snapshotState方法返回一个列表。这个类型表明状态的存储是一个包含分区和偏移量信息的列表,InputSplit表示一个分区,Long表示当前分区读取的偏移量。InputSplit有如下方法:
publicinterfaceInputSplitextendsSerializable{ 友优资源网;intgetSplitNumber();}也就是说,InputSplit可以理解为一个分区索引。有了这个数据结构,我们就可以看到上图中的例子是如何工作的。当Source的并行度为1时,所有分区数据都在同一个线程中读取,所有分区的状态也保持在同一个状态。状态存储信息的格式如下: