FLIP-134: DataStream API 的批处理执行 - Apache Flink

内容

状态

动机

正如FLIP-131中所描述的,我们的目标是废弃 DataSet API,转而支持 DataStream API 和 Table API。用户应该能够使用 DataStream API 编写一个程序,能够高效地在有界和无界输入数据上执行。要理解我们的意思,我们需要详细解释一下。

如果数据源会持续产生数据且永不关闭,我们称之为无界数据源。另一方面,有界数据源只会读取有限数量的数据并最终关闭。包含无界数据源的 Flink 作业/程序将是无界的,而仅包含有界数据源的作业将是有界的,最终会完成。传统上,处理系统要么针对有界执行进行优化,要么针对无界执行进行优化,它们要么是批处理器,要么是流处理器。原因在于框架可以根据计算的性质使用不同的运行时算法和数据结构。流处理器针对连续/增量计算进行了优化,可以快速提供结果(延迟低),而批处理器则针对快速处理整个批次的数据进行了优化,而由单个事件/记录引起的更新则不会以低延迟提供。Flink 可用于批处理和流处理,但用户需要使用 DataSet API 进行前者,使用 DataStream API 进行后者。

用户可以使用DataStream API编写有界程序,但目前运行时不会知道程序是有界的,也不会利用这一点来“决定”程序应该如何执行。这包括操作调度方式的选择以及数据在操作之间如何洗牌。

建议更改

我们建议为DataStream API 添加一种新的批处理执行模式,使用户能够编写有界程序,并以高效的方式执行,充分利用程序的有界特性。运行时执行语义应与DataSet API 中等效程序的执行方式相当。当前的执行模式应被追溯地称为流式执行模式。

DataStream API 的一些操作和语义在批处理执行中效果不佳。我们将在下文讨论这些问题,并提出我们在使用 DataStream API 进行批处理执行时需要的语义和行为更改。接下来,我们还将讨论提案的细节,并在最后总结提议的语义。

BATCH vs. STREAMING 和 BOUNDED vs. UNBOUNDED

在这份提案中,我们有时使用术语 BATCH 和 STREAMING,有时使用 BOUNDED 和 UNBOUNDED。这些术语看起来可能大多是可以互换的,但我们认为它们之间存在微妙的区别,并且用于稍有不同的目的。在 API/SDK 中,我们应该使用术语 BOUNDED 和 UNBOUNDED,一个数据源可以是有界的或无界的,一个 Flink 作业可以是有界的或无界的。对于运行时行为,我们建议使用 BATCH 和 STREAMING。这样做的主要原因是,对于具有不同执行语义的有界程序来说,以不同的执行语义执行有界程序是有意义的,即如果用户希望获得 DataStream API 在此提案之前的执行语义,则可以在 STREAMING 模式下执行有界程序。

配置新的执行模式

我们建议引入一个名为execution.runtime-mode的新设置,它有三个可能的值:

  • BATCH: 选择新的批处理运行模式。仅当所有源都是有界的时才有效。
  • STREAMING: 选择流处理运行模式。这是DataStream API当前展示的执行行为,我们在此FLIP生效后将会称之为流处理模式。
  • AUTOMATIC: 根据作业/程序中的源选择执行模式。如果所有源都是有界的,则选择BATCH,否则选择STREAMING。

理想情况下,默认设置应该是自动的,但我们认为在 Flink 2.0 之前,默认应该是流式处理,这样我们就不会破坏现有的设置:DataStream API 中已经存在的一些数据源是有界的,例如 fromCollection() 或将 FileProcessingMode 设置为 PROCESS_ONCE 的 readFile(),因此将 AUTOMATIC 作为默认值会为它们开启批处理执行,可能会产生令人惊讶的行为。

新设置将可以通过配置进行配置,但我们还建议引入编程 API:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeMode.BATCH);

BATCH 模式和 STREAMING 模式中的任务调度和洗牌模式

Flink 中应用程序的用户代码被翻译成类似于这里所示的任务图。Flink 的运行时将根据配置的 ScheduleMode 不同地调度这些任务。另一个重要的决定是 数据交换模式。这控制着数据在不同任务之间的“洗牌”方式,这对于通过 keyBy() 连接相连的操作非常重要。

在流式处理模式下,所有任务都应在数据流经管道之前进行调度。记录会随着到来而被处理,并且每个任务的输出会被积极地推送到下游的下一个任务,几乎没有缓冲。像Checkpointing这样的机制是基于所有任务在作业的整个生命周期中都是可用的这一事实来提供容错性。在内部,调度模式将是EAGER,洗牌模式将是PIPELINED。

在批处理模式下,任务图被分割为独立的区域,这些区域要么独立运行,例如“尴尬并行”任务,要么通过阻塞洗牌进行通信,例如包含 keyBy() 的管道。每个区域都会从头到尾处理其(有界的)输入并实现其输出。该输出的生命周期与生成它的任务的生命周期无关。这样可以实现更灵活的调度,其中任务可以被释放,释放的资源可以分配给下一个就绪的任务。此外,如果一个任务失败,其前置任务不一定需要重新执行,因为它们的输出已经被实现。更多详细信息请参见FLIP-119

上述每种调度算法都在特定环境中具有优势,但每种算法都对容错性和其他运行主题(如保存点的可用性)有其自己的影响。

我们还希望使洗牌模式可配置,因为可能存在用户希望批处理执行模式,但仍希望数据以流水线方式交换的情况。为此,我们提出了一个新的配置选项 execution.shuffle-mode。我们不会在StreamExecutionEnvironment或ExecutionConfig中公开专用的设置器

示例用法:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeMode.BATCH); Configuration conf = new Configuration(); conf.setString("execution.shuffle-mode", "ALL_EDGES_BLOCKING"); env.configure(conf);

在批处理执行模式中的处理时间支持

时间概念在流处理中至关重要。流处理中的时间有两种类型,处理时间和事件时间。简而言之,处理时间是记录被处理时机器上的挂钟时间,而事件时间是通常嵌入在记录本身中的时间戳,指示例如交易发生的时间或传感器采取测量的时间。有关更多详细信息,请参阅相应的文档页面

根据处理时间的定义,我们可以看到基于处理时间的计算结果是 不可复制 的。这是因为同一条记录处理两次会有两个不同的时间戳。

尽管如上所述,在流处理中,使用处理时间可能是有用的。原因在于流水线通常会以“实时”方式摄取它们的无界输入,并且预计作业将运行很长一段时间。举个例子,想象一种计算,它在1小时(事件时间)窗口中计算特定类型的传入记录。在这个例子中,用户可能需要等待约1小时才能获得窗口的结果,因为数据以实时方式摄取(按照数据到达的速度)。在这种情况下,为了获得计数器趋势的早期(不完整)指示(它是在增加还是在减少),用户可能希望在处理时间中每5分钟使用“早期触发”。

实质上,流式计算的“实时”特性导致系统响应性与墙钟/处理时间之间存在相关性,即我们需要等待多久才能获得1小时窗口计算结果。这种相关性在批处理世界中不存在,因为输入数据集是静态的且提前已知。

总结
本文介绍了 Apache Flink 计划废弃 DataSet API,转而推荐使用 DataStream API 和 Table API。提出了在 DataStream API 中添加 BATCH 执行模式的建议,以便用户编写高效的有界程序。还讨论了任务调度、数据交换模式、处理时间支持等方面的改进和配置。建议引入新的配置选项 execution.runtime-mode 和 execution.shuffle-mode,以及 pipeline.processing-time.allow 和 pipeline.processing-time.end-of-input 两个新的处理时间配置选项。