在事务中读取和写入数据湖 - Amazon Lake Formation
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在事务中读取和写入数据湖

Amazon Lake Formation读取和写入 Amazon S3 对象的写入 Amazon S3 对象以及创建和更新数据目录中的表元数据时,支持 ACID(原子、一致、隔离和持久)事务支持 ACID(原子、一致、隔离和持久)事务支持 ACID(原子、一致 事务保持受管理的表清单的完整性 (交易数据操作) 以及其他表元数据,例如架构 (事务元数据操作)。以下是针对受管表进行事务的典型用例:

  • ETL 将传递到新表— 在此用例中,您可能有Amazon Glue提取、转换和加载 (ETL) 任务,用于启动事务,从数据源读取,写入数据湖中已注册 Amazon S3 位置的数据接收器,并在数据目录中为数据接收器创建受管控表。如果 ETL 脚本在某个时候检测到故障,则该脚本可以取消事务,结果会发生以下情况:

    • 受管辖的表已从目录中删除。

    • 如果脚本调用了Lake FormationDeleteObjectsOnCancel在将每个新对象写入 Amazon S3 之前,API 操作,然后 Lake Formation 还会删除事务中写入到 Amazon S3 的所有对象。有关更多信息,请参阅 回滚Amazon S3 写入

  • 表更新— 假设对于现有的受管控表,您的 ETL 任务会启动事务,将新对象写入 Amazon S3 并使用更新表清单UpdateTableObjectsAPI 操作。如果脚本检测到故障,它可以取消事务,结果会发生以下情况:

    • 表清单将恢复到事务开始之前的状态。

    • 如果脚本调用了Lake FormationDeleteObjectsOnCancelAPI 操作在将每个新对象写入 Amazon S3 之前,Lake Formation 还会删除事务中写入到 Amazon S3 的所有对象。

  • 架构更新— 对于 Amazon S3 中具有流式数据接收器的现有受管控表,如果流式处理 ETL 任务确定数据中还有其他表列,则它可以在事务中更新表架构。如果出现故障,任务可以取消事务,在这种情况下,表架构将恢复到事务开始之前的状态。

  • 时间旅行查询— Lake Formation 维护多个版本(快照)表元数据,因为数据湖中的数据会发生变化。即使架构已更改,您也可以时光倒流并查询数据。

有关受管控表的更多信息,请参阅Lake Formation 中的受管桌子.

受管表中的提交流程

对受管控表的修改必须在事务的上下文中进行。如果您的 ETL 任务在未明确提供事务 ID 的情况下对受管表执行操作,则 Lake Formation 会自动启动事务并在操作结束时提交(或取消)该事务。这被称为单语句交易。

对于具有写入操作的交易,调用CommitTransaction会将交易移至COMMIT_IN_PROGRESS状态。内部后台进程负责将事务中的更改应用到受管控表中,然后再将事务移至受管控表COMMITTEDstatus。因此,在调用后立即执行读取操作CommitTransaction可能反映也可能不反映写入操作的结果。为了确定性地读取写入操作的结果,客户应等到交易状态更改为COMMITTED. 这可以通过以下任一调用来检查CommitTransaction要么DescribeTransactionAPI 操作。单语句事务的读取操作也演示了相同的行为。

回滚Amazon S3 写入

当交易被取消时,可以是自动取消的,也可以是通过调用CancelTransaction,未经您的许可,Lake Formation 绝不会删除写入 Amazon S3 的数据。要授予 Lake Formation 回滚交易期间写入的权限,你的代码必须调用DeleteObjectsOnCancel API 操作,其中列出了取消交易后可以删除的 Amazon S3 对象。建议你打电话DeleteObjectsOnCancel在写作之前。

这些区域有:Amazon GlueETL 库函数write_dynamic_frame.from_catalog()包括自动拨打电话的选项DeleteObjectsOnCancel在写入之前。在下面的示例中,callDeleteObjectsOnCancel选项包含在additional_options参数。因为价值False传递到read_only的参数start_transaction、事务不是只读事务。

transactionId = glueContext.start_transaction(False) try: datasink0 = glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database="MyDatabase", table_name="MyGovernedTable", additional_options={ "partitionKeys":["key1", "key2"], "transactionId":transactionId, "callDeleteObjectsOnCancel":"true" } ) glueContext.commit_transaction(transactionId) except: glueContext.cancel_transaction(transactionId)