使用 Athena 查询 Apache Hudi 数据集 - Amazon Athena
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

使用 Athena 查询 Apache Hudi 数据集

Apache Hudi是一个开源数据管理框架,可简化增量递增数据的处理。记录级别的插入、更新、加入和删除操作的处理更加精细,从而降低了开销。Upsert 指的是将记录插入到现有数据集中(如果它们不存在)或对数据集进行更新(如果它们存在)的功能。

Hudi 处理数据插入和更新事件,而不会创建许多可能导致分析性能问题的小文件。Apache Hudi 会自动跟踪更改并合并文件,以便它们保持最佳大小。这样就不需要构建自定义解决方案来监控许多小文件并将其重写为大文件以减少其数量。

Hudi 数据集适用于以下使用案例:

Hudi 管理的数据集使用开放存储格式存储在 S3 中。目前,Athena 可以读取压缩的 Hudi 数据集,但不能写 Hudi 数据。Athena 使用 Apache Hudi 版本 0.8.0,可能会产生更改。有关此 Hudi 版本的更多信息,请参阅 Apache 网站上的 0.8.0 版本(文档)

Hudi 数据集表类型

Hudi 数据集可以采用以下类型之一:

  • 写入时复制 (CoW) – 数据以列状格式存储 (Parquet),并且每次更新都会在写入过程中创建一个新版本的文件。

  • 读取时合并 (MOR) – 数据使用列式 (Parquet) 和基于行 (Avro) 的格式的组合进行存储。更新记录到基于行的 delta 文件中,并根据需要进行压缩以创建新版本的列式文件。

对于 CoW 数据集,每次更新记录时,包含该记录的文件都会使用更新后的值进行重写。对于 MoR 数据集,每次进行更新时,Hudi 仅写入已更改记录对应的行。MoR 更适合写入或更改繁重而读取量较少的工作负载。CoW 更适合更改频率较低但读取量繁重的工作负载。

Hudi 提供三个查询类型用于访问数据:

  • 快照查询 – 该查询用于查看截至给定提交或压缩操作时表的最新快照。对于 MOR 表,快照查询通过在查询时合并最新文件切片的基本文件和增量文件来显示表的最新状态。

  • 增量查询 – 查询只能看到自给定提交/压缩以来,写入表的新数据。这有效地提供了更改流以启用增量数据管道。

  • 读取优化查询 – 对于 MOR 表,查询将看到最新压缩的数据。对于 CoW 表,查询将看到最新提交的数据。

下表显示了适用于每种表类型的可用 Hudi 查询类型。

表类型 可用的 Hudi 查询类型
写入时复制 快照、增量
读取时合并 快照、增量、读取优化

目前,Athena 支持快照查询和读取优化查询,但不支持增量查询。在 MOR 表上,所有在读取优化查询中显示的 数据均已压缩。这提供了良好的性能,但不包括最新的增量提交。快照查询包含最新的数据,但会但产生一些计算开销,这使得这些查询的性能降低。

有关在表和查询类型之间权衡的更多信息,请参阅 Apache Hudi 文档中的表和查询类型

Hudi 术语变更:视图现在为查询

从发行版 0.5.1 开始,Apache Hudi 改变了一些术语。以前的视图在较新的版本中称为查询。下表总结了旧新术语之间的变更。

旧术语 新术语

CoW:读取优化视图

MoR:实时视图

快照查询

增量视图 增量查询
MoR 读取优化视图 读取优化的查询

引导启动操作中的表

从 Apache Hudi 版本 0.6.0 开始,引导启动操作功能可为现有的 Parquet 数据集提供更好的性能。引导操作只能生成元数据,使数据集保持原位,而不是重写数据集。

您可以使用 Athena 从引导启动操作中查询表,就像基于 Amazon S3 中数据的其他表一样。在您的 CREATE TABLE 语句中,在 LOCATION 子句指定 Hudi 表路径。

有关在 Amazon EMR 中使用引导启动操作创建 Hudi 表的更多信息,请参阅 Amazon 大数据博客文章:New features from Apache Hudi available in Amazon EMR(在 Amazon EMR 中可用的 Apache Hudi 新功能)。

注意事项和限制

创建 Hudi 表

本节提供了 Athena 中针对 Hudi 数据的分区表和非分区表的 CREATE TABLE 语句的示例。

如果您有已在 Amazon Glue 中创建的 Hudi 表,您可以直接在 Athena 中查询它们。当您在 Athena 中创建分区 Hudi 表时,您必须运行 ALTER TABLE ADD PARTITION 以加载 Hudi 数据,然后再查询这些数据。

写入时复制(CoW)创建表示例

未分区 CoW 表

以下示例在 Athena 中创建了一个未分区的 CoW 表。

CREATE EXTERNAL TABLE `non_partition_cow`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/non_partition_cow'

分区 CoW 表

以下示例在 Athena 中创建了一个已分区的 CoW 表。

CREATE EXTERNAL TABLE `partition_cow`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/partition_cow'

以下 ALTER TABLE ADD PARTITION 示例将两个分区添加到了示例 partition_cow 表。

ALTER TABLE partition_cow ADD PARTITION (event_type = 'one') LOCATION 's3://bucket/folder/partition_cow/one/' PARTITION (event_type = 'two') LOCATION 's3://bucket/folder/partition_cow/two/'

读取时合并(MoR)创建表示例

Hudi 在元数据仓中为 MoR 创建了两个表:一个用于快照查询的表,一个用于读取优化查询的表。两个表都可查询。在 0.5.1 之前的 Hudi 版本中,用于读优化查询的表具有您在创建表时指定的名称。从 Hudi 版本 0.5.1 开始,表名将默认以 _ro 为后缀。用于快照查询的表的名称是指定的名称,其中附加了 _rt

未分区的读取时合并(MoR)表

以下示例在 Athena 中创建了一个未分区的 MoR 表用于读取优化查询。请注意,读取优化查询使用输入格式 HoodieParquetInputFormat

CREATE EXTERNAL TABLE `nonpartition_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/nonpartition_mor'

以下示例在 Athena 中创建了一个未分区的 MoR 表用于快照查询。对于快照查询,请使用输入格式 HoodieParquetRealtimeInputFormat

CREATE EXTERNAL TABLE `nonpartition_mor_rt`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/nonpartition_mor'

已分区的读取时合并(MoR)表

以下示例在 Athena 中创建了一个已分区的 MoR 表用于读取优化查询。

CREATE EXTERNAL TABLE `partition_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/partition_mor'

以下 ALTER TABLE ADD PARTITION 示例将两个分区添加到了示例 partition_mor 表。

ALTER TABLE partition_mor ADD PARTITION (event_type = 'one') LOCATION 's3://bucket/folder/partition_mor/one/' PARTITION (event_type = 'two') LOCATION 's3://bucket/folder/partition_mor/two/'

以下示例在 Athena 中创建了一个已分区的 MoR 表用于快照查询。

CREATE EXTERNAL TABLE `partition_mor_rt`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/partition_mor'

同样,以下 ALTER TABLE ADD PARTITION 示例将两个分区添加到了示例 partition_mor_rt 表。

ALTER TABLE partition_mor_rt ADD PARTITION (event_type = 'one') LOCATION 's3://bucket/folder/partition_mor/one/' PARTITION (event_type = 'two') LOCATION 's3://bucket/folder/partition_mor/two/'

另请参阅

要了解如何使用 Amazon Glue 自定义连接器和 Amazon Glue 2.0 任务来创建可以使用 Athena 查询的 Apache Hudi 表,请参阅 Amazon 大数据博客文章:Writing to Apache Hudi tables using Amazon Glue custom connector(使用 Amazon Glue 自定义连接器写入 Apache Hudi 表)。