DynamicFrame 类
Apache Spark 中的主要抽象之一是 SparkSQL DataFrame,它类似于在 R 和 Pandas 中找到的 DataFrame 构造。DataFrame 类似于表格,支持函数风格(map/reduce/filter/等)操作和 SQL 操作(select、project、aggregate)。
尽管 DataFrames 功能强大且运用广泛,但在提取、转换和加载 (ETL) 操作方面存在局限性。最明显的是,它们需要指定了架构后才能加载任何数据。SparkSQL 通过对数据进行两次扫描解决了这一问题 – 第一次为了推断架构,第二次为了加载数据。然而,此推断仍有局限性,且不能解决数据混乱的实际问题。例如,同样的字段在不同记录中可能属于不同类型。对此,Apache Spark 通常没有好的办法,只能使用原始字段文本将类型报告为 string。这或许不正确,同时您可能希望更精细地控制解决架构差异的方式。对于大型数据集而言,每扫描一次源数据都会付出高昂代价。
为了解决这些局限性,Amazon Glue 推出了 DynamicFrame。DynamicFrame 类似于 DataFrame,不同之处在于每个记录都是自描述的,因此初始并不需要架构。相反,Amazon Glue 会在需要时实时计算一个架构,并使用选择(或联合)类型显式编码架构不一致之处。您可以解决这些不一致之处,以使您的数据集兼容需要固定架构的数据存储。
同样,一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。
在解决任何架构不一致问题后,就可以在 DynamicFrames 和 DataFrames 之间来回转换。
– 构造 –
__init__
__init__(jdf, glue_ctx, name)
-
jdf– 对 Java 虚拟机 (JVM) 中数据帧的引用。 -
glue_ctx– 一个 GlueContext 类 对象。 -
name– 可选的名称字符串,默认为空。
fromDF
fromDF(dataframe, glue_ctx, name)
通过将 DataFrame 字段转换为 DynamicRecord 字段将 DataFrame 转换为 DynamicFrame。返回新的 DynamicFrame。
一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。
-
dataframe– 要转换的 Apache Spark SQLDataFrame(必需)。 -
glue_ctx– 为此转换指定上下文的 GlueContext 类 对象 (必需)。 -
name– 生成的DynamicFrame的名称 (必需)。
toDF
toDF(options)
通过将 DynamicRecords 转换为 DataFrame 字段将 DynamicFrame 转换为 Apache Spark DataFrame。返回新的 DataFrame。
一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。
-
options– 一个选项列表。如果您选择Project和Cast操作类型,则指定目标类型。示例包括以下内容。>>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])
– 信息 –
计数
count( ) – 返回底层 DataFrame 中的行数。
Schema
schema( ) – 返回此 DynamicFrame 的架构,如果此架构不可用,则返回底层 DataFrame 的架构。
printSchema
printSchema( ) – 打印底层 DataFrame 的架构。
Show
show(num_rows) – 打印底层 DataFrame 中的行数。
Repartition
repartition(numPartitions) – 返回具有 numPartitions 个分区的新 DynamicFrame。
Coalesce
coalesce(numPartitions) – 返回具有 numPartitions 个分区的新 DynamicFrame。
– 转换 –
apply_mapping
apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
将声明映射应用于此 DynamicFrame 并返回已应用那些映射的新的 DynamicFrame。
-
mappings– 映射元组列表,每个元组包括:(源列,源类型,目标列,目标类型)。必需。 -
transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。 -
info– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold– 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
totalThreshold– 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。
drop_fields
drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
调用 FlatMap 类 转换以从 DynamicFrame 中删除字段。返回指定字段已删除的新的 DynamicFrame。
-
paths– 字符串列表,每个字符串包含要删除的字段节点的完整路径。 -
transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。 -
info– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold– 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
totalThreshold– 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。
筛选条件
filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
返回通过选择输入 DynamicFrame 内满足指定谓词函数 f 的所有 DynamicRecords 而构建的新的 DynamicFrame
-
f– 应用到DynamicFrame的谓词函数。该函数必须采用DynamicRecord作为参数,如果DynamicRecord满足筛选要求,则返回 True,否则返回 False (必需)。一个
DynamicRecord表示DynamicFrame中的一条逻辑记录。它类似于 SparkDataFrame中的一行,只不过它是自描述的,可用于不符合固定架构的数据。 -
transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。 -
info– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold– 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
totalThreshold– 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。
有关如何使用 filter 转换的示例,请参阅 Filter 类。
Join
join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
执行与另一个 DynamicFrame 的等式联接并返回生成的 DynamicFrame。
-
paths1– 此帧中要联接的键列表。 -
paths2– 另一帧中要联接的键列表。 -
frame2– 要联接的另一个DynamicFrame。 -
transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。 -
info– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold– 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
totalThreshold– 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。
映射
map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
返回由于将指定映射函数应用到原始 DynamicFrame 中的所有记录而产生的新的 DynamicFrame。
-
f– 应用到DynamicFrame中所有记录的映射函数。该函数必须采用DynamicRecord作为参数并返回一个新的DynamicRecord(必需)。一个
DynamicRecord表示DynamicFrame中的一条逻辑记录。它类似于 Apache SparkDataFrame中的一行,只不过它是自描述的,可用于不符合固定架构的数据。 transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。info– 与转换中的错误关联的字符串 (可选)。stageThreshold– 在转换出错之前可能在其中发生的最大错误数 (可选;默认值为零)。totalThreshold– 在处理出错之前可能全面发生的最大错误数 (可选;默认值为零)。
有关如何使用 map 转换的示例,请参阅 Map 类。
mergeDynamicFrame
mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)
基于指定主键的将此 DynamicFrame 与暂存 DynamicFrame 合并以标识记录。不会对重复记录(具有相同主键的记录)去除重复。如果暂存帧中没有匹配的记录,则从源中保留所有记录(包括重复记录)。如果暂存帧具有匹配的记录,则暂存帧中的记录将覆盖 Amazon Glue 中的源中的记录。
stage_dynamic_frame– 要合并的暂存DynamicFrame。primary_keys– 要匹配源和暂存动态帧中的记录的主键字段列表。transformation_ctx– 用于检索有关当前转换的元数据的唯一字符串(可选)。options– 为此转换提供其他信息的 JSON 名称-值对的字符串。当前未使用此参数。info– 一个String。要与此转换中的错误关联的任何字符串。stageThreshold– 一个Long。给定转换中处理需要排除的错误的数目。totalThreshold– 一个Long。此转换中处理需要排除的错误的总数。
返回通过将此 DynamicFrame 与暂存 DynamicFrame 合并获取的新 DynamicFrame。
在这些情况下,返回的 DynamicFrame 将包含记录 A:
如果
A在源帧和暂存帧中都存在,则返回暂存帧中的A。如果
A在源表中,且A.primaryKeys不在stagingDynamicFrame中(这意味着,未在暂存表中更新A)。
源帧和暂存帧不需要具有相同的架构。
merged_frame = source_frame.mergeDynamicFrame(stage_frame, ["id"]) Relationalize
relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
通过生成由取消嵌套嵌套列和透视数组列而生成的帧列表来关系化 DynamicFrame。可使用在取消嵌套阶段生成的联接键将透视数组列联接到根表。
root_table_name– 根表的名称。staging_path– 要将 CSV 格式的透视表分区存储到的路径(可选)。从该路径读取透视表。options– 可选参数的词典。-
transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。 -
info– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold– 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
totalThreshold– 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。
rename_field
rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
重命名此 DynamicFrame 中的一个字段并返回包含该重命名字段的新的 DynamicFrame。
-
oldName– 要重命名的节点的完整路径。如果旧名称中包含点,则
RenameField将不起作用,除非使用反引号 (`) 将其引起来。例如,要将this.old.name替换为thisNewName,应按如下方式调用 rename_field。newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName") -
newName– 新名称,作为完整路径。 -
transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。 -
info– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold– 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
totalThreshold– 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。
resolveChoice
resolveChoice(specs = None, choice = "" , database = None , table_name = None ,
transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id =
None)
解析此 DynamicFrame 内的一个选择类型并返回新的 DynamicFrame。
-
specs– 要解析的特定歧义列表,每个歧义均采用元组形式:(field_path, action)。可通过两种方式使用
resolveChoice. 第一种是使用specs参数指定一系列特定字段以及如何解析它们。resolveChoice的另一种模式是使用choice参数为所有ChoiceTypes指定单个解析方法。specs的值被指定为由(field_path, action)对组成的元组。field_path值标识特定歧义元素,action值标识相应解析。可能的操作如下:-
cast:– 尝试将所有值转换为指定的类型。例如:typecast:int。 -
make_cols– 将每个不同的类型转换为名为的列。通过展平数据来解析潜在的歧义。例如,如果columnName_typecolumnA是int或string,则解析就是在结果DynamicFrame中生成名为columnA_int和columnA_string的两个列。 -
make_struct– 使用struct表示数据,解析潜在的歧义。例如,如果某个列中的数据是int或string,则使用make_struct操作会在结果DynamicFrame中生成结构列,而每个结构都同时包含int和string。 -
project:– 将所有数据投影到可能的数据类型之一,解析潜在的歧义。例如,如果某个列中的数据是typeint或string,则使用project:string操作会在结果DynamicFrame中生成一个列,其中所有int值都已转换为字符串。
如果
field_path识别到数组,则在数组名称后放置一个空的方括号可避免歧义。例如,假设您正在使用结构如下的数据:"myList": [ { "price": 100.00 }, { "price": "$100.00" } ]可以通过将
field_path设置为"myList[].price"、将action设置为"cast:double"来选择价格的数字而非字符串版本。注意 只能使用
specs和choice参数之一。如果specs参数不为None,则choice参数必须为空字符串。反过来,如果choice不为空字符串,则specs参数必须为None。 -
choice– 为所有ChoiceTypes指定单个解析方法。这可以在执行前不知道ChoiceTypes的完整列表的情况下使用。除了之前为specs列出的操作外,此参数还支持以下操作:-
match_catalog– 尝试将每个ChoiceType转换为指定数据目录表中的对应类型。
-
-
database– 与match_catalog操作一起使用的数据目录数据库。 -
table_name– 与match_catalog操作一起使用的数据目录表。 -
transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。 -
info– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold– 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
totalThreshold– 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
catalog_id– 正在访问的数据目录的目录 ID(数据目录的账户 ID)。如果设置为None(默认值),它使用调用账户的目录 ID。
df1 = df.resolveChoice(choice = "make_cols")
df2 = df.resolveChoice(specs = [("myList[].price", "make_struct"), ("columnA", "cast:double")])
select_fields
select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
返回包含选定字段的新的 DynamicFrame。
-
paths– 字符串列表,每个字符串就是一个到您要选择的顶层节点的路径。 -
transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。 -
info– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold– 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
totalThreshold– 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。
Spigot
spigot(path, options={})
在转换期间将示例记录写入指定目标位置,并返回包含一个额外写入步骤的输入 DynamicFrame。
-
path– 要向其中写入内容的目标位置的路径 (必需)。 -
options– 指定选项的键值对(可选)。"topk"选项指定应写入第一条k记录。"prob"选项指定选择任何给定记录的可能性(以十进制数字形式表示),用于选择要写入的记录。 transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。
split_fields
split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
返回一个新的 DynamicFrameCollection,其中包含两个 DynamicFrames:第一个包含所有已拆分的节点,第二个包含其余节点。
-
paths– 字符串列表,每个字符串是到一个您要拆分为新的DynamicFrame的节点的完整路径。 -
name1– 拆分的DynamicFrame的名称字符串。 -
name2– 指定节点拆分后留存的DynamicFrame的名称字符串。 -
transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。 -
info– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold– 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
totalThreshold– 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。
split_rows
将 DynamicFrame 中的一个或多个行拆分到新的 DynamicFrame 中。
split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
返回一个新的 DynamicFrameCollection,其中包含两个 DynamicFrames:第一个包含所有已拆分的行,第二个包含其余行。
-
comparison_dict– 一个词典,其中的键是到一个列的路径,值是另一个词典,用于将比较器映射到与该列值所比较的值。例如,{"age": {">": 10, "<": 20}}拆分其年龄列中的值大于 10 但小于 20 的行。 -
name1– 拆分的DynamicFrame的名称字符串。 -
name2– 指定节点拆分后留存的DynamicFrame的名称字符串。 -
transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。 -
info– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold– 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
totalThreshold– 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。
Unbox
unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)
取消装箱 DynamicFrame 中的一个字符串字段并返回包含已取消装箱 DynamicRecords 的新的 DynamicFrame。
一个 DynamicRecord 表示 DynamicFrame 中的一条逻辑记录。它类似于 Apache Spark DataFrame 中的一行,只不过它是自描述的,可用于不符合固定架构的数据。
-
path– 要取消装箱的字符串节点的完整路径。 format– 格式规范(可选)。这用于 Amazon Simple Storage Service(Amazon S3)或支持多种格式的 Amazon Glue 连接。有关支持的格式,请参阅 Amazon Glue 中的 ETL 输入和输出的数据格式选项。-
transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。 -
info– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold– 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
totalThreshold– 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
options– 下列一个或多个:separator– 包含分隔符字符的字符串。escaper– 包含转义字符的字符串。skipFirst– 指示是否跳过第一个实例的布尔值。withSchema– 包含架构的字符串;必须使用StructType.json( )调用。withHeader– 指示是否包括标头的布尔值。
例如:unbox("a.b.c", "csv", separator="|")
Unnest
取消嵌套 DynamicFrame 中的嵌套对象,使其成为顶级对象,并返回新的取消嵌套的 DynamicFrame。
unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
取消嵌套 DynamicFrame 中的嵌套对象,使其成为顶级对象,并返回新的取消嵌套的 DynamicFrame。
-
transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。 -
info– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold– 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
totalThreshold– 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。
例如:unnest( )
unnest_ddb_json
解除 DynamicFrame 中的嵌套列,其具体位于 DynamoDB JSON 结构中,并返回一个新的非嵌套 DynamicFrame。属于结构类型数组的列将不会被解除嵌套。请注意,这是一种特定类型的非嵌套转换,其行为与常规 unnest 转换不同,并且要求数据已存在于 DynamoDB JSON 结构中。有关更多信息,请参阅 DynamoDB JSON。
unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
-
transformation_ctx– 用于标识状态信息的唯一字符串 (可选)。 -
info– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold– 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
totalThreshold– 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。
例如,使用 DynamoDB JSON 结构读取导出的架构可能如下所示:
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
unnest_ddb_json() 转换会将此转换为:
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
以下代码示例演示了如何使用 Amazon Glue DynamoDB 导出连接器、调用 DynamoDB JSON 解除嵌套命令,以及打印分区数量:
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()
Write
write(connection_type, connection_options, format, format_options, accumulator_size)
从此 DynamicFrame 的 GlueContext 类 获得指定连接类型的 DataSink(object) 并将其用于格式化和写入此 DynamicFrame 的内容。返回按指定进行格式化和写入的新的 DynamicFrame。
-
connection_type– 要使用的连接类型。有效值包括s3、mysql、postgresql、redshift、sqlserver和oracle。 -
connection_options– 要使用的连接选项(可选)。对于connection_type的s3,将会定义 Amazon S3 路径。connection_options = {"path": "s3://aws-glue-target/temp"}对于 JDBC 连接,必须定义多个属性。请注意,数据库名称必须是 URL 的一部分。它可以选择性地包含在连接选项中。
connection_options = {"url": "jdbc-url/database", "user": "username", "password": "password","dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"} format– 格式规范(可选)。这用于 Amazon Simple Storage Service(Amazon S3)或支持多种格式的 Amazon Glue 连接。有关支持的格式,请参阅 Amazon Glue 中的 ETL 输入和输出的数据格式选项。format_options– 指定格式的格式选项。有关支持的格式,请参阅 Amazon Glue 中的 ETL 输入和输出的数据格式选项。accumulator_size– 要使用的可累积大小 (可选)。
– 错误 –
assertErrorThreshold
assertErrorThreshold( ) – 创建此 DynamicFrame 的转换中的错误的资产。从底层 DataFrame 返回 Exception。
errorsAsDynamicFrame
errorsAsDynamicFrame( ) – 返回其中包含嵌套的错误记录的 DynamicFrame。
errorsCount
errorsCount( ) – 返回 DynamicFrame 中的错误总数。
stageErrorsCount
stageErrorsCount – 返回生成此 DynamicFrame 的过程中发生的错误数。