检查点 - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

检查点

检查点是 Flink 用来确保应用程序状态容错的机制。该机制允许 Flink 在作业失败时恢复操作员状态,并为应用程序提供与无故障执行相同的语义。借助 Kinesis Data Analytics,应用程序的状态存储在 RocksDB 中,RocksDB 是一种嵌入式键/值存储,可将其工作状态保留在磁盘上。使用检查点时,状态也会上传到 Amazon S3,因此即使磁盘丢失,也可以使用检查点来恢复应用程序状态。

有关更多信息,请参阅 。状态快照如何工作?.

检查点阶段

对于 Flink 中的检查点操作员子任务,主要有 5 个阶段:

  • 等待 [启动延迟] — Flink 使用插入到流中的检查点屏障,因此此阶段的时间是操作员等待检查点屏障到达的时间。

  • 对齐 [对齐Durat] — 在这个阶段,子任务已经达到了一个障碍,但它正在等待来自其他输入流的屏障。

  • 同步检查点 [Sync] — 此阶段是指子任务实际拍摄操作员状态的快照,并阻止子任务上的所有其他活动。

  • Async 检查点 [异步Duration] — 此阶段的大部分时间是将状态上传到 Amazon S3 的子任务。在此阶段,子任务不再被阻止,可以处理记录。

  • 确认 — 这通常是一个短暂的阶段,只是子任务向 JobManager 并执行任何提交消息(例如使用Kafka接收器)。

每个阶段(确认除外)都映射到 Flink WebUI 提供的检查点持续时间度量,这有助于隔离检查点长的原因。

要查看检查点上每个可用指标的精确定义,请转到历史选项卡.

正在调查

在调查检查点持续时间较长时,要确定的最重要的事情是检查点的瓶颈,即哪个操作员和子任务在检查点上花费的时间最长,以及该子任务的哪个阶段需要较长的时间。这可以通过作业检查点任务下的 Flink WebUI 来确定。Flink 的 Web 界面提供了有助于调查检查点问题的数据和信息。有关完整的细分,请参阅监控检查点.

首先要看的是端到端持续时间Job 图中的每个操作员,以确定哪个操作员需要很长时间才能进行检查并值得进一步调查。根据 Flink 文档,持续时间的定义是:

从触发时间戳到最后一次确认的持续时间(如果尚未收到确认,则为 n/a)。完整检查点的端到端持续时间由确认该检查点的最后一个子任务决定。此时间通常大于单个子任务实际检查点状态所需的时间。

检查点的其他持续时间也提供了有关时间花在何处的更精细的信息。

如果Sync为高,则表示快照期间发生了什么事。在此阶段snapshotState()为实现 SnapshotState 接口的类调用;这可以是用户代码,因此线程转储可用于调查此问题。

长整型异步Duration这表明在将状态上传到 Amazon S3 上花费了大量时间。如果状态很大,或者有大量的状态文件正在上传,则可能会出现这种情况。如果是这样的话,那么值得研究一下应用程序是如何使用状态的,并确保在可能的情况下使用 Flink 原生数据结构(使用键控状态)。Kinesis Data Analytics 配置 Flink 的方式可以最大限度地减少 Amazon S3 调用的次数,以确保调用时间不会太长。下面是一个操作员检查点统计信息的示例。它表明异步Duration与前面的操作员检查点统计数据相比相对较长。


                    调查检查点

这些区域有:启动延迟较高将表明大部分时间都花在等待检查站屏障到达操作员身上。这表明应用程序需要一段时间来处理记录,这意味着屏障正在缓慢地流过作业图。如果Job 压力过大或操作员经常忙碌,通常会出现这种情况。下面是一个示例 JobGraph second KeyedProcess 运算符正忙。


                    调查检查点

你可以使用 Flink Flame Graphs 或 TaskManager 线程转储。一旦确定了瓶颈,就可以使用 Flame-Graphs 或线程转储对其进行进一步调查。

线程转储

线程转储是另一种调试工具,其级别略低于火焰图。线程转储输出所有线程在某个时间点的执行状态。Flink 接受一个 JVM 线程转储,这是 Flink 进程中所有线程的执行状态。线程的状态由线程的堆栈跟踪以及一些其他信息表示。火焰图实际上是使用快速连续拍摄的多个堆栈轨迹构建的。该图是由这些轨迹构成的可视化项,可以轻松识别常见的代码路径。

"KeyedProcess (1/3)#0" prio=5 Id=1423 RUNNABLE at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:154) at $line33.$read$$iw$$iw$ExpensiveFunction.processElement(<console>>19) at $line33.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:14) at app//org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) at app//org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at app//org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ...

上面是从 Flink UI 中获取的单个线程的线程转储片段。第一行包含有关此线程的一些常规信息,包括:

  • ThreadKeyedProcess (1/3) #0

  • 线程的优先级prio=5

  • 一个唯一的话题编号Id=1423

  • 线程状态可运行

线程的名称通常提供有关线程一般用途的信息。操作员线程可以通过其名称来标识,因为操作员线程与运算符具有相同的名称,以及它与哪个子任务相关的指示,例如KeyedProcess (1/3) #0线程来自KeyedProcess运算符和来自第 1 个(共 3 个)子任务。

线程可以处于以下某种状态:

  • NEW — 线程已创建但尚未处理

  • 可运行 — 线程在 CPU 上执行

  • BLOCKED — 线程正在等待另一个线程释放其锁定

  • WAITING — 线程正在等待wait()join(),或者park()方法

  • TIMED_WAITING — 线程正在使用休眠、等待、加入或暂留方法等待,但等待时间最长。

注意

在 Flink 1.13 中,线程转储中单个堆栈跟踪的最大深度限制为 8。

注意

线程转储应该是调试 Flink 应用程序中性能问题的最后手段,因为线程转储很难阅读,需要采集多个样本并进行手动分析。如果可能的话,最好使用火焰图。

在 Flink 中,可以通过选择任务管理器选项,在 Flink UI 的左侧导航栏上选择一个特定的任务管理器,然后导航到线程转储选项卡。线程转储可以下载,复制到你最喜欢的文本编辑器(或线程转储分析器),或者直接在 Flink Web UI 的文本视图中进行分析(但是,最后一个选项可能有点笨拙。

要确定使用哪个任务管理器进行线程转储TaskManagers选项卡可以在选择特定运算符时使用。这表明操作员在操作员的不同子任务上运行,并且可以在不同的任务管理器上运行。


                        使用线程转储

转储将由多个堆栈跟踪组成。但是,在调查垃圾场时,与操作员有关的问题是最重要的。这些很容易找到,因为操作员线程与运算符具有相同的名称,并且指示它与哪个子任务相关。例如,以下堆栈跟踪来自KeyedProcess运算符和是第一个子任务。

"KeyedProcess (1/3)#0" prio=5 Id=595 RUNNABLE at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:155) at $line360.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:19) at $line360.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:14) at app//org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) at app//org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at app//org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ...

如果有多个具有相同名称的运算符,这可能会变得混乱,但是我们可以命名运算符来解决这个问题。例如:

.... .process(new ExpensiveFunction).name("Expensive function")

火焰图

火焰图是一种有用的调试工具,它可视化目标代码的堆栈跟踪,从而可以识别最频繁的代码路径。它们是通过对堆栈轨迹进行多次采样而创建的。火焰图的x轴显示不同的堆栈轮廓,而y轴显示堆叠深度和堆栈追踪中的调用。火焰图中的单个矩形表示堆叠框架,框架的宽度显示它在堆栈中出现的频率。有关火焰图及其使用方法的更多信息,请参阅火焰图.

在 Flink 中,可以通过 Web UI 访问操作员的火焰图,方法是选择一个操作员,然后选择FlameGraph选项卡。一旦采集了足够的样本,火焰图就会显示出来。以下为 FlameGraph (对于 ) ProcessFunction 这花了很多时间去检查站。


                    使用火焰图

这是一个非常简单的火焰图,它显示所有的 CPU 时间都花在 foreach 外观中processElement的 ExpensiveFunction 运算符。您还可以获得行号,以帮助确定代码执行的执行位置。