本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Apache Beam 应用程序的检查点失败
如果您的 Beam 应用程序配置了shutdownSourcesAfterIdleMs
症状
转到 Kinesis Data Analytics 应用程序 CloudWatch 记录并检查是否记录了以下日志消息。以下日志消息表明,由于某些任务已完成,检查点无法触发。
{ "locationInformation": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:888)", "logger": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator", "message": "Failed to trigger checkpoint for jobyour job IDsince some tasks of jobyour job IDhas been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.", "threadName": "Checkpoint Timer", "applicationARN":your application ARN, "applicationVersionId": "5", "messageSchemaVersion": "1", "messageType": "INFO" }
这也可以在 Flink 仪表板上找到,其中一些任务已进入 “已完成” 状态,现在无法进行检查点操作。
原因
shutdownSourcesAfterIdleMs 是一个 Beam 配置变量,用于关闭已在配置的毫秒时间内处于空闲状态的源。一旦源被关闭,便无法再进行检查点操作。这可能导致检查点失败
任务进入 “已完成” 状态的原因之一是 shutdownSourcesAfterIdleMs 设置为 0ms,这表示空闲的任务将立即关闭。
解决方案
要防止任务立即进入 “已完成” 状态,请将 shutdownSourcesAfterIdleMs 改为 long.max_VALUE。可通过两种方式执行此任务:
-
选项 1:如果在 Kinesis Data Analytics 应用程序配置页面中设置了光束配置,则可以添加新的键值对进行设置 shutdpwnSourcesAfteridleMs 如下所示:
-
选项 2:如果在 JAR 文件中设置了光束配置,则可以设置 shutdownSourcesAfterIdleMs 如下所示:
FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class); // Initialize Beam Options object options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE); // set shutdownSourcesAfterIdleMs to Long.MAX_VALUE options.setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); // attach specified options to Beam pipeline