本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在事务中读取和写入数据湖
Amazon Lake Formation读取和写入 Amazon S3 对象的写入 Amazon S3 对象以及创建和更新数据目录中的表元数据时,支持 ACID(原子、一致、隔离和持久)事务支持 ACID(原子、一致、隔离和持久)事务支持 ACID(原子、一致 事务保持受管理的表清单的完整性 (交易数据操作) 以及其他表元数据,例如架构 (事务元数据操作)。以下是针对受管表进行事务的典型用例:
-
ETL 将传递到新表— 在此用例中,您可能有Amazon Glue提取、转换和加载 (ETL) 任务,用于启动事务,从数据源读取,写入数据湖中已注册 Amazon S3 位置的数据接收器,并在数据目录中为数据接收器创建受管控表。如果 ETL 脚本在某个时候检测到故障,则该脚本可以取消事务,结果会发生以下情况:
-
受管辖的表已从目录中删除。
-
如果脚本调用了Lake Formation
DeleteObjectsOnCancel在将每个新对象写入 Amazon S3 之前,API 操作,然后 Lake Formation 还会删除事务中写入到 Amazon S3 的所有对象。有关更多信息,请参阅 回滚Amazon S3 写入。
-
-
表更新— 假设对于现有的受管控表,您的 ETL 任务会启动事务,将新对象写入 Amazon S3 并使用更新表清单
UpdateTableObjectsAPI 操作。如果脚本检测到故障,它可以取消事务,结果会发生以下情况:-
表清单将恢复到事务开始之前的状态。
-
如果脚本调用了Lake Formation
DeleteObjectsOnCancelAPI 操作在将每个新对象写入 Amazon S3 之前,Lake Formation 还会删除事务中写入到 Amazon S3 的所有对象。
-
-
架构更新— 对于 Amazon S3 中具有流式数据接收器的现有受管控表,如果流式处理 ETL 任务确定数据中还有其他表列,则它可以在事务中更新表架构。如果出现故障,任务可以取消事务,在这种情况下,表架构将恢复到事务开始之前的状态。
-
时间旅行查询— Lake Formation 维护多个版本(快照)表元数据,因为数据湖中的数据会发生变化。即使架构已更改,您也可以时光倒流并查询数据。
有关受管控表的更多信息,请参阅Lake Formation 中的受管桌子.
受管表中的提交流程
对受管控表的修改必须在事务的上下文中进行。如果您的 ETL 任务在未明确提供事务 ID 的情况下对受管表执行操作,则 Lake Formation 会自动启动事务并在操作结束时提交(或取消)该事务。这被称为单语句交易。
对于具有写入操作的交易,调用CommitTransaction会将交易移至COMMIT_IN_PROGRESS状态。内部后台进程负责将事务中的更改应用到受管控表中,然后再将事务移至受管控表COMMITTEDstatus。因此,在调用后立即执行读取操作CommitTransaction可能反映也可能不反映写入操作的结果。为了确定性地读取写入操作的结果,客户应等到交易状态更改为COMMITTED. 这可以通过以下任一调用来检查CommitTransaction要么DescribeTransactionAPI 操作。单语句事务的读取操作也演示了相同的行为。
回滚Amazon S3 写入
当交易被取消时,可以是自动取消的,也可以是通过调用CancelTransaction,未经您的许可,Lake Formation 绝不会删除写入 Amazon S3 的数据。要授予 Lake Formation 回滚交易期间写入的权限,你的代码必须调用DeleteObjectsOnCancel API 操作,其中列出了取消交易后可以删除的 Amazon S3 对象。建议你打电话DeleteObjectsOnCancel在写作之前。
这些区域有:Amazon GlueETL 库函数write_dynamic_frame.from_catalog()包括自动拨打电话的选项DeleteObjectsOnCancel在写入之前。在下面的示例中,callDeleteObjectsOnCancel选项包含在additional_options参数。因为价值False传递到read_only的参数start_transaction、事务不是只读事务。
transactionId = glueContext.start_transaction(False) try: datasink0 = glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database="MyDatabase", table_name="MyGovernedTable", additional_options={ "partitionKeys":["key1", "key2"], "transactionId":transactionId, "callDeleteObjectsOnCancel":"true" } ) glueContext.commit_transaction(transactionId) except: glueContext.cancel_transaction(transactionId)