DataStream API支持不同的运行时执行模式,您可以根据您的用例要求和作业特性进行选择。
有 DataStream API 的“经典”执行行为,我们称之为STREAMING
执行模式。这应该用于需要连续增量处理且预计会无限期在线的无界作业。
此外,还有一种我们称之为BATCH
执行模式的批处理执行模式。这种模式以更像MapReduce等批处理框架的方式执行作业。应该用于有已知固定输入且不连续运行的有界作业。
Apache Flink的统一流和批处理方法意味着,在有界输入上执行的DataStream应用程序将产生相同的最终结果,无论配置的执行模式如何。重要的是要注意这里的最终意味着什么:在“STREAMING”模式下执行的作业可能会产生增量更新(类似数据库中的upserts),而“BATCH”作业只会在最后产生一个最终结果。如果正确解释,最终结果将是相同的,但达到目标的方式可能会有所不同。
通过启用BATCH
执行,我们允许 Flink 应用额外的优化,这些优化只有在我们知道输入是有界的时候才能实现。例如,可以使用不同的连接/聚合策略,以及不同的洗牌实现,从而实现更高效的任务调度和故障恢复行为。我们将在下面详细介绍执行行为的一些细节。
何时可以/应该使用批处理执行模式?#
BATCH
执行模式只能用于有界的作业/Flink程序。有界性是数据源的一个属性,告诉我们在执行之前是否已知来自该源的所有输入,或者是否会出现新数据,可能是无限的。反过来,如果作业的所有源都是有界的,则该作业是有界的,否则是无界的。
STREAMING
执行模式,另一方面,可用于有界和无界作业。
一般来说,当程序是有界的时候,你应该使用BATCH
执行模式,因为这样更有效率。当程序是无界的时候,你必须使用STREAMING
执行模式,因为只有这种模式足够通用,能够处理连续的数据流。
一个明显的异常情况是当您想要使用有界作业来引导一些作业状态,然后希望在无界作业中使用该状态。例如,通过使用STREAMING
模式运行有界作业,获取一个保存点,然后在无界作业上恢复该保存点。这是一个非常特定的用例,可能很快就会过时,因为我们允许将保存点作为BATCH
执行作业的附加输出之一。
在使用STREAMING
模式运行有界作业的另一种情况是编写针对最终将使用无界源运行的代码的测试。在这些情况下,对于测试来说,使用有界源可能更自然。
配置批处理执行模式#
执行模式可以通过 execution.runtime-mode
设置进行配置。有三个可能的值:
STREAMING
: 经典的DataStream执行模式(默认值)BATCH
: 在DataStream API上进行批处理式执行AUTOMATIC
: 让系统根据数据源的有界性来决定
这可以通过bin/flink run ...
的命令行参数进行配置,也可以在创建/配置StreamExecutionEnvironment
时以编程方式进行配置。
通过命令行配置执行模式的方法如下:
$ bin/flink run -Dexecution.runtime-mode=BATCH <jarFile>
这个示例展示了如何在代码中配置执行模式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
我们建议用户不要在程序中设置运行模式,而是在提交应用程序时使用命令行进行设置。保持应用程序代码的无配置性能够提供更大的灵活性,因为同一应用程序可以在任何执行模式下运行。
执行行为#
任务调度和网络洗牌#
Flink 作业由不同操作组成,这些操作在数据流图中连接在一起。系统决定如何安排这些操作在不同进程/机器(TaskManagers)上的执行,并决定数据如何在它们之间洗牌(发送)。
多个操作/操作符可以使用称为chaining的功能链接在一起。Flink 将一组一个或多个(链接的)操作符视为调度的单元,称为 task。通常使用术语 subtask 来指代在多个 TaskManagers 上并行运行的任务的各个实例,但在这里我们只使用术语 task。
任务调度和网络洗牌在BATCH
和STREAMING
执行模式下有不同的工作方式。这主要是因为我们知道在BATCH
执行模式下我们的输入数据是有界的,这使得 Flink 能够使用更高效的数据结构和算法。
我们将使用这个例子来解释任务调度和网络传输方面的差异:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements(...);
source.name("source").map(...).name("map1").map(...).name("map2").rebalance().map(...).name("map3").map(...).name("map4").keyBy((value) -> value).map(...).name("map5").map(...).name("map6").sinkTo(...).name("sink");
操作之间存在一对一连接模式的操作,例如map()
、flatMap()
或filter()
,可以直接将数据传递给下一个操作,这使得这些操作可以链接在一起。这意味着 Flink 通常不会在它们之间插入网络洗牌。
keyBy()
或 rebalance()
等操作需要在不同的任务并行实例之间进行数据洗牌。这会引发网络洗牌。
对于上面的示例,Flink会将操作分组在一起,形成如下的任务:
- 任务1:
source
,map1
, 和map2
- 任务2:
map3
,map4
- 任务3:
map5
,map6
, 和sink
在任务1和2之间,以及任务2和3之间进行了网络重排。这是该作业的可视化表示:
流式执行模式#
在STREAMING
执行模式下,所有任务都需要始终在线/运行。这使得 Flink 能够立即通过整个流水线处理新记录,这对于连续和低延迟的流处理是必要的。这也意味着分配给作业的 TaskManagers 需要有足够的资源同时运行所有任务。
网络洗牌是_流水线化_的,这意味着记录会立即发送到下游任务,在网络层上会有一些缓冲。再次强调,这是必需的,因为在处理连续数据流时,不存在数据可以在任务(或任务流水线)之间实现的自然点(在时间上)。这与BATCH
执行模式形成对比,其中可以实现中间结果,如下所述。
批处理执行模式#
在BATCH
执行模式下,作业的任务可以分成阶段,可以一个接一个地执行。我们之所以能够这样做,是因为输入是有界的,因此 Flink 可以在继续下一个阶段之前完全处理管道的一个阶段。在上述示例中,作业将有三个阶段,对应于由洗牌障碍分隔的三个任务。
与上文中对STREAMING
模式的解释不同,分阶段处理要求Flink将任务的中间结果实体化到一些非短暂存储中,使得下游任务可以在上游任务已经下线后读取这些结果。这会增加处理的延迟,但带来其他有趣的特性。首先,这使得Flink能够在发生故障时回溯到最新可用的结果,而不是重新启动整个作业。另一个副作用是BATCH
作业可以在更少的资源上执行(就可用于TaskManagers的插槽而言),因为系统可以按顺序一个接一个地执行任务。
TaskManagers 将至少保留中间结果,直到下游任务消耗完它们。(从技术上讲,它们将被保留,直到消耗的 流水线区域 生成其输出。)之后,它们将根据空间允许的情况保留,以便在发生故障时允许前述回溯到较早的结果。
状态后端 / 状态#
在STREAMING
模式下,Flink 使用StateBackend来控制状态存储和检查点工作的方式。
在BATCH
模式下,配置的状态后端将被忽略。相反,按键对操作的输入进行分组(使用排序),然后依次处理一个键的所有记录。这样可以同时保留同一时间仅一个键的状态。在转移到下一个键时,给定键的状态将被丢弃。
查看FLIP-140以获取有关此事的背景信息。
处理顺序#
在操作符或用户定义函数(UDFs)中处理记录的顺序可能会在BATCH
和STREAMING
执行之间有所不同。
在STREAMING
模式下,用户定义的函数不应该对传入记录的顺序做任何假设。数据一到达就会被处理。
在BATCH
执行模式下,有一些操作在Flink中保证顺序。这种顺序可能是特定任务调度、网络洗牌和状态后端(见上文)的副作用,也可能是系统的有意选择。
我们可以区分三种一般类型的输入:
- 广播输入: 广播流中的输入(另请参阅Broadcast State)
- 常规输入: 既不是广播流也不是键控的输入
- 键控输入: 来自
KeyedStream
的输入
函数或运算符,消耗多个输入类型时,将按以下顺序处理它们:
- 广播输入首先处理
- 常规输入其次处理
- 键控输入最后处理
对于从多个常规或广播输入中消费的函数(例如 CoProcessFunction
),Flink 有权以任何顺序从该类型的任何输入中处理数据。
对于从多个键控输入中消费的函数,比如 KeyedCoProcessFunction
,Flink 在继续下一个键之前会处理来自所有键控输入的单个键的所有记录。
事件时间 / 水印#
在支持event time方面,Flink的流式运行时基于悲观假设,即事件可能无序到达,即时间戳为t
的事件可能在时间戳为t+1
的事件之后到达。由于这个原因,系统无法确定在未来不会再出现时间戳为t < T
的元素。为了在使系统实用的同时摊销这种无序性对最终结果的影响,在STREAMING
模式下,Flink使用一种称为Watermarks的启发式方法。时间戳为T
的水印表示不会再有时间戳为t < T
的元素到达。
在BATCH
模式下,输入数据集是预先知道的,因此不需要启发式算法,至少可以按时间戳对元素进行排序,以便按时间顺序处理。对于熟悉流处理的读者来说,在BATCH
中我们可以假设存在“完美水印”。
根据上述,在BATCH
模式下,我们只需要在输入的末尾与每个键相关联的MAX_WATERMARK
,或者在输入流没有键的情况下在输入的末尾。根据这个方案,所有注册的定时器将在时间结束时触发,并且用户定义的WatermarkAssigners
或WatermarkGenerators
将被忽略。尽管如此,指定WatermarkStrategy
仍然很重要,因为它的TimestampAssigner
仍然会被用来为记录分配时间戳。
处理时间#
处理时间是记录在特定实例被处理时机器上的挂钟时间。根据这个定义,我们可以看到基于处理时间的计算结果是不可复制的。这是因为同一记录处理两次会有两个不同的时间戳。
尽管如上所述,在STREAMING
模式中使用处理时间可能会很有用。原因在于流水线通常会实时摄取它们的无界输入,因此事件时间和处理时间之间存在相关性。此外,由于上述原因,在STREAMING
模式中,事件时间中的1h
通常几乎等于处理时间或挂钟时间中的1h
。因此,使用处理时间可以用于提前(不完整)触发,从而提供有关预期结果的提示。
在批处理世界中,输入数据集是静态且提前已知的,因此在“批处理”模式中,我们允许用户请求当前处理时间并注册处理时间计时器,但是,与事件时间一样,所有计时器都将在输入结束时触发。
从概念上讲,我们可以想象在执行作业期间处理时间不会推进,我们可以快进到整个输入被处理完的“时间尽头”。
失败恢复#
在STREAMING
执行模式下,Flink使用检查点进行故障恢复。查看检查点文档以获取关于此内容以及如何配置的实用文档。还有一个更简介的关于通过状态快照实现容错的部分,以更高层次解释这些概念。
故障恢复的检查点特性之一是,Flink 在发生故障时将从检查点重新启动所有正在运行的任务。这可能比我们在“BATCH”模式下需要做的更昂贵(如下所述),这也是您应该在作业允许的情况下使用“BATCH”执行模式的原因之一。
在BATCH
执行模式下,Flink 将尝试回溯到之前的处理阶段,其中仍然有中间结果可用。可能只需要重新启动失败的任务(或它们在图中的前置任务),这可以提高处理效率和作业的整体处理时间,相比于从检查点重新启动所有任务。
重要考虑事项#
与经典的STREAMING
执行模式相比,在BATCH
模式下,有些功能可能无法按预期工作。有些功能的工作方式会略有不同,而另一些则不受支持。
BATCH模式下的行为更改:
- “Rolling” 操作,如 reduce() 或 sum(),在
STREAMING
模式下,对每个到达的新记录发出增量更新。在BATCH
模式下,这些操作不是“rolling”。它们只发出最终结果。
在批处理模式下不支持:
自定义运算符应谨慎实现,否则可能会表现不当。有关更多详细信息,请参阅下面的附加说明。
检查点#
如上所述,批处理程序的故障恢复不使用检查点。
重要的是要记住,由于没有检查点,某些功能,如 CheckpointListener 以及因此,Kafka 的 EXACTLY_ONCE 模式或 File Sink
的 OnCheckpointRollingPolicy 将无法工作。如果您需要一个在BATCH
模式下工作的事务性接收器,请确保它使用了在 FLIP-143 中提出的统一接收器 API。
你仍然可以使用所有状态原语,只是用于故障恢复的机制会有所不同。
编写自定义运算符#
注意: 自定义操作符是 Apache Flink 的高级使用模式。对于大多数用例,请考虑使用(键控)处理函数。
在编写自定义运算符时,重要的是记住为BATCH
执行模式做出的假设。否则,一个在STREAMING
模式下正常工作的运算符,在BATCH
模式下可能会产生错误的结果。运算符从不限定于特定的键,这意味着它们会看到BATCH
处理的某些属性,Flink试图利用这些属性。
首先,您不应该在操作符内缓存最后一次的水印。在BATCH
模式下,我们逐个处理记录。因此,水印将在每个键之间从MAX_VALUE
切换到MIN_VALUE
。您不应该假设水印在操作符中总是递增的。出于同样的原因,定时器将按键顺序先触发,然后在每个键内按时间戳顺序触发。此外,不支持手动更改键的操作。