本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
管道步骤
SageMaker 管道由步骤组成。这些步骤定义了管道执行的操作以及使用属性的步骤之间的关系。
步骤类型
下面介绍了每个步骤类型的要求,并提供了该步骤的示例实施。这些不是功能性的实现,因为它们不提供所需的资源和投入。有关实现这些步骤的教程,请参阅创建和管理 SageMaker 管道.
亚马逊 SageMaker 模型构建管道支持以下步骤类型:
处理步骤
您可以使用处理步骤创建用于数据处理的处理作业。有关处理作业的更多信息,请参阅处理数据和评估模型.
处理步骤需要处理器、定义处理代码的 Python 脚本、用于处理的输出以及作业参数。以下示例显示了如何创建ProcessingStep定义。有关处理步骤要求的更多信息,请参阅SageMaker.workflow .steps.处理步骤
from sagemaker.sklearn.processing import SKLearnProcessor sklearn_processor = SKLearnProcessor(framework_version='0.20.0', role=<role>, instance_type='ml.m5.xlarge', instance_count=1)
from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, inputs=[ ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="abalone/preprocessing.py" )
传递运行时参数
您可以使用将运行时参数传递给处理步骤get_run_argsSKLearnProcessor之外的压缩算法(例如PySparkProcessor和SparkJarProcessor.
以下示例显示如何从中传入运行时参数: PySpark 处理器到ProcessingStep.
from sagemaker.spark.processing import PySparkProcessor pyspark_processor = PySparkProcessor(framework_version='2.4', role=<role>, instance_type='ml.m5.xlarge', instance_count=1)
from sagemaker.processing import ProcessingInput, ProcessingOutput run_args = pyspark_processor.get_run_args( "preprocess.py", inputs=[ ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], arguments=None )
from sagemaker.workflow.steps import ProcessingStep step_process = ProcessingStep( name="AbaloneProcess", processor=pyspark_processor, inputs=run_args.inputs, outputs=run_args.outputs, job_arguments=run_args.arguments, code=run_args.code )
训练步骤
您可以使用训练步骤创建训练作业来训练模型。有关训练作业的更多信息,请参阅使用 Amazon SageMaker 训练模型.
训练步骤需要估算器以及训练和验证数据输入。以下示例显示了如何创建TrainingStep定义。有关培训练步骤要求的更多信息,请参阅SageMaker.workflow .steps.训练步骤
from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep step_train = TrainingStep( name="TrainAbaloneModel", estimator=xgb_train, inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) } )
优化步骤
您可以使用调整步骤创建超参数调整作业,也称为超参数优化 (HPO)。一个超参数调整作业运行多个训练作业,每个作业都生成一个模型版本。有关超参数优化的更多信息,请参阅使用 SageMaker 执行自动模型优化.
调整作业与 SageMaker 为管道进行试验,将培训工作作作为试验创造。有关更多信息,请参阅 实验集成。
调整步骤需要HyperparameterTunerwarm_start_config的参数HyperparameterTuner. 有关超参数调整和热启动的详细信息,请参阅运行热启动超参数优化作业.
您将get_top_model_s3_uri
亚马逊引入了调整步骤 SageMaker Python SDK v2.48.0 和亚马逊 SageMaker 工作室 v3.8.0。在使用调整步骤之前,必须更新 Studio,否则管道 DAG 不显示。要更新 Studio,请参阅关闭并更新 SageMaker 工作室.
以下示例显示了如何创建TuningStep定义。
from sagemaker.tuner import HyperparameterTuner from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TuningStep step_tuning = TuningStep( name = "HPTuning", tuner = HyperparameterTuner(...), inputs = TrainingInput(s3_data=input_path) )
获取最佳模型版本
以下示例说明如何使用get_top_model_s3_uri方法。最多可以根据以下条件排名排名前 50 的表演版本超参数调整 Jobbject. 这些区域有:top_k参数是版本的索引,其中top_k=0是性能最佳的版本top_k=49是性能最差的版本。
best_model = Model( image_uri=image_uri, model_data=step_tuning.get_top_model_s3_uri( top_k=0, s3_bucket=sagemaker_session.default_bucket() ), ... )
有关调整步骤要求的更多信息,请参阅SageMaker.workflow .steps.TuningStep
CreateModel 步骤
你使用CreateModel创建一个步骤 SageMaker 模型。有关 SageMaker 模型,请参阅使用 Amazon SageMaker 训练模型.
创建模型步骤需要模型工件和有关 SageMaker 创建模型时需要使用的实例类型。以下示例显示了如何创建CreateModel步骤定义。有关CreateModel步骤要求,请参阅SageMaker.workflow .steps.创建模型步骤
from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=best_model, inputs=inputs )
注册Model Step
你使用RegisterModel步骤注册SageMaker.Model。模型PipelineModel表示推理管道,这是由处理推理请求的容器的线性序列组成的模型。
有关如何注册模型的更多信息,请参阅使用模型注册表注册并部署模型. 有关RegisterModel步骤要求,请参阅SageMaker.workflow .step_Collections。注册模型
以下示例显示了如何创建RegisterModel注册 a 的步骤PipelineModel.
import time from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix) sklearn_model = SKLearnModel(model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='0.20.0', role=role, sagemaker_session=sagemaker_session, py_version='py3') xgboost_model = XGBoostModel(model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=sagemaker_session, role=role) from sagemaker.workflow.step_collections import RegisterModel from sagemaker import PipelineModel pipeline_model = PipelineModel(models=[sklearn_model,xgboost_model],role=role,sagemaker_session=sagemaker_session) step_register = RegisterModel( name="AbaloneRegisterModel", model=pipeline_model, content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', )
如果model未提供,则注册模型步骤需要估计器,如以下示例所示。
from sagemaker.workflow.step_collections import RegisterModel step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )
转换步骤
您可以使用转换步骤进行批处理转换,以便对整个数据集运行推理。有关批量转换的更多信息,请参阅运行使用推理管道进行 Batch 转换.
转换步骤需要转换器和运行批处理转换的数据。以下示例显示了如何创建Transform步骤定义。有关Transform步骤要求,请参阅SageMaker.workflow .steps.TransStep
from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep step_transform = TransformStep( name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data) )
条件步骤
您可以使用条件步骤来评估步骤属性的条件,以评估下一步应在管道中执行哪些操作。
条件步骤需要条件列表,如果条件的计算结果为,则要运行的步骤列表true,以及条件计算结果为时要运行的步骤列表false. 以下示例显示了如何创建Condition步骤定义。有关Condition步骤要求,请参阅SageMaker.workflow。条件 _Step. 条件步骤
限制
-
SageMaker Pipeline 不支持使用嵌套条件步骤。您不能通过条件步骤作为另一个条件步骤的输入。
-
条件步骤不能在两个分支中使用相同的步骤。如果您在两个分支中都需要相同的步骤功能,请复制该步骤并为其指定不同的名称。
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ( ConditionStep, JsonGet ) cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 ) step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[] )
回调步骤
你使用Callback步骤纳入其他流程和Amazon亚马逊不直接提供的服务纳入工作流程 SageMaker 模型构建管道。当您时Callback步骤运行时,将发生以下过程:
-
SageMaker Pipeline 会将消息发送到客户指定的 Amazon Simple Queue Service (Amazon SQS) 队列。邮件包含 SageMaker Pipeline-生成的令牌和客户提供的输入参数列表。发送消息后, SageMaker 管道等待客户的响应。
-
客户从 Amazon SQS 队列检索消息并启动其自定义流程。
-
流程完成后,客户会调用以下 API 之一并提交 SageMaker Pipeline — 生成的令牌:
-
发送 Pipeline 执行步骤成功,以及输出参数列表
-
发送 Pipeline 执行步骤失败,以及失败原因
-
-
API 调用导致 SageMaker 管道要么继续管道进程,要么使该流程失败。
有关Callback步骤要求,请参阅sagemaker.workflow .callback_Step. 回调步骤
CallbackAmazon 中引入了步骤 SageMaker Python SDK v2.45.0 和亚马逊 SageMaker 工作室 v3.6.2。您必须先更新 Studio 才能使用Callback步骤或管道 DAG 不显示。要更新 Studio,请参阅关闭并更新 SageMaker 工作室.
下面的示例演示了上述过程的实现。
from sagemaker.workflow.callback_step import CallbackStep step_callback = CallbackStep( name="MyCallbackStep", sqs_queue_url="https://sqs.us-east-2.amazonaws.com/012345678901/MyCallbackQueue", inputs={...}, outputs=[...] ) callback_handler_code = ' import boto3 import json def handler(event, context): sagemaker_client=boto3.client("sagemaker") for record in event["Records"]: payload=json.loads(record["body"]) token=payload["token"] # Custom processing # Call SageMaker to complete the step sagemaker_client.send_pipeline_execution_step_success( CallbackToken=token, OutputParameters={...} ) '
停止行为
管道过程不会停止Callback步骤正在运行。
当你打电话StopPipelineExecution在正在运行的管道进程中Callback步骤, SageMaker 管道向指定的 SQS 队列发送额外的 Amazon SQS 消息。SQS 消息的正文包含状态字段,设置为Stopping. 以下显示了示例 SQS 消息正文。
{ "token": "26vcYbeWsZ", "pipelineExecutionArn": "arn:aws:sagemaker:us-east-2:012345678901:pipeline/callback-pipeline/execution/7pinimwddh3a", "arguments": { "number": 5, "stringArg": "some-arg", "inputData": "s3://sagemaker-us-west-2-012345678901/abalone/abalone-dataset.csv" }, "status": "Stopping" }
您应该向 Amazon SQS 消息使用者添加逻辑,以便在收到消息后采取任何必要的操作(例如,资源清理),然后致电SendPipelineExecutionStepSuccess要么SendPipelineExecutionStepFailure.
只有当 SageMaker 管道会收到其中一个调用,它会停止管道进程吗?
Lambda 步骤
您可以使用 Lambda 步骤来运行Amazon Lambdafunction. 您可以运行现有的 Lambda 函数,或者 SageMaker 可以创建和运行新的 Lambda 函数。对于显示如何在中使用 Lambda 步骤的笔记本 SageMaker 管道,请参阅sagemaker-管道-lambda Step.ipynb
亚马逊推出了 Lambda 步骤 SageMaker Python SDK v2.51.0 和亚马逊 SageMaker 工作室 v3.9.1。在使用 Lambda 步骤之前,必须更新 Studio,否则管道 DAG 不显示。要更新 Studio,请参阅关闭并更新 SageMaker 工作室.
SageMaker 提供sagemaker.lambda_helper.lambdaLambda具有以下签名。
Lambda( function_arn, # Only required argument to invoke an existing Lambda function # The following arguments are required to create a Lambda function: function_name, execution_role_arn, zipped_code_dir, # Specify either zipped_code_dir and s3_bucket, OR script s3_bucket, # S3 bucket where zipped_code_dir is uploaded script, # Path of Lambda function script handler, # Lambda handler specified as "lambda_script.lambda_handler" timeout, # Maximum time the Lambda function can run before the lambda step fails ... )
这些区域有:sagemaker.workflow .lambda_step.lambdaSteplambda_func参数类型Lambda. 要调用现有的 Lambda 函数,唯一的要求是向以下条件提供函数的 Amazon Resource Name (ARN):function_arn. 如果您没有为提供值function_arn,您必须指定handler以及以下操作之一:
-
zipped_code_dir— 压缩的 Lambda 函数的路径s3_bucket— 其中 Amazon S3 存储桶zipped_code_dir是要上传 -
script— Lambda 函数脚本文件的路径
以下示例显示了如何创建Lambda调用现有 Lambda 函数的步骤定义。
from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:012345678910:function:split-dataset-lambda" ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )
以下示例显示了如何创建Lambda使用 Lambda 函数脚本创建和调用 Lambda 函数的步骤定义。
from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_name="split-dataset-lambda", execution_role_arn=execution_role_arn, script="lambda_script.py", handler="lambda_script.lambda_handler", ... ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )
输入和输出
如果您的Lambda函数有输入或输出,这些也必须在Lambda步骤。
在定义Lambda步骤,inputs必须是键/值对的字典。的每个值inputs字典必须是原始类型(字符串、整数或浮点型)。不支持嵌套对象。如果保持未定义,则inputs值默认为None.
这些区域有:outputs值必须是键列表。这些键指的是在输出中定义的字典Lambdafunction. LIKEinputs,这些键必须是原始类型,并且不支持嵌套对象。
超时和停止行为
这些区域有:Lambda班级有timeout参数,指定 Lambda 函数可以运行的最长时间。默认值为 120 秒,最大值为 10 分钟。如果达到超时时 Lambda 函数正在运行,则 Lambda 步骤将失败;但是,Lambda 函数将继续运行。
在 Lambda 步骤运行期间无法停止管道进程,因为 Lambda 步骤调用的 Lambda 函数无法停止。如果您尝试在 Lambda 函数运行期间停止该进程,则管道将等待 Lambda 函数完成或直到达到超时(以先发生者为准),然后停止。如果 Lambda 函数完成,则管道进程状态为Stopped. 如果达到超时,则管道进程状态为Failed.
CLARIFY 检查步骤
您可以使用ClarifyCheck步骤对照以前的基线进行基线漂移检查,以便进行偏差分析和模型可解释性。通过这样做,你可以生成和注册基准对于漂移检查RegisterModel以及在模型构建阶段通过管道训练的模型。亚马逊可以使用这些漂移检查基准 SageMaker 模型监视器适用于模型终端节点,这样您就不需要执行基线分别提供建议。这些区域有:ClarifyCheckstep 还可以从模型注册表中提取用于漂移检查的基线。这些区域有:ClarifyCheckstep 利用了亚马逊 SageMaker 要澄清预建容器,提供一系列模型监控功能,包括针对给定基准的约束验证。有关更多信息,请参阅 。入门 SageMaker 澄清容器.
配置 ClarifyCheck 步
您可以配置ClarifyCheck步骤来每次在管道中使用以下检查类型时只执行其中一种检查类型。
检查数据偏差
检查模型偏差
模型可解释性检查
要实现此目的,需要将clarify_check_config具有以下检查类型值之一的参数:
DataBiasCheckConfigModelBiasCheckConfigModelExplainabilityCheckConfig
这些区域有:ClarifyCheckstep 启动一个处理作业,该作业运行 SageMaker 澄清预构建的容器并需要专用支票和处理作业的配置.ClarifyCheckConfig和CheckJobConfig是这些配置的帮助函数,它们与 SageMaker 澄清处理作业计算,以检查模型偏差、数据偏差或模型可解释性。有关更多信息,请参阅 运行 SageMaker 澄清处理工作以进行偏见分析和可解释性。
控制漂移检查的步骤行为
这些区域有:ClarifyCheckstep 需要以下两个布尔标志来控制其行为:
skip_check:此参数表示是否跳过与之前基线的漂移检查。如果设置为False,必须可用已配置的检查类型的先前基准。register_new_baseline:此参数表示是否可以通过 step 属性访问新计算的基线BaselineUsedForDriftCheckConstraints. 如果设置为False,配置的检查类型的先前基准也必须可用。可通过BaselineUsedForDriftCheckConstraints财产。
有关更多信息,请参阅 基线计算、漂移检测和生命周期 ClarifyCheck 和 QualityCheck 亚马逊中的步骤 SageMaker 模型构建管道。
使用基准
您也可以指定model_package_group_name找到现有的基线和ClarifyCheck一步拉DriftCheckBaselines在模型包组中最新批准的模型包上。或者,您可以通过supplied_baseline_constraints参数。如果指定了model_package_group_name和supplied_baseline_constraints,ClarifyCheck步骤使用由supplied_baseline_constraints参数。
有关使用的更多信息ClarifyCheck步骤要求,请参阅SageMaker.workflow .steps.Clarify 检查步骤ClarifyCheck单步进入 SageMaker 管道,请参阅sagemaker-Pipeline 模型监视器-澄清-steps.ipynb
例 创建ClarifyCheck数据偏差检查步骤
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.clarify_check_step import DataBiasCheckConfig, ClarifyCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="ml.c5.xlarge", volume_size_in_gb=120, sagemaker_session=sagemaker_session, ) data_bias_data_config = DataConfig( s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, s3_output_path=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'databiascheckstep']), label=0, dataset_type="text/csv", s3_analysis_config_output_path=data_bias_analysis_cfg_output_path, ) data_bias_config = BiasConfig( label_values_or_threshold=[15.0], facet_name=[8], facet_values_or_threshold=[[0.5]] ) data_bias_check_config = DataBiasCheckConfig( data_config=data_bias_data_config, data_bias_config=data_bias_config, )h data_bias_check_step = ClarifyCheckStep( name="DataBiasCheckStep", clarify_check_config=data_bias_check_config, check_job_config=check_job_config, skip_check=False, register_new_baseline=False supplied_baseline_constraints="s3://sagemaker-us-west-2-111122223333/baseline/analysis.json", model_package_group_name="MyModelPackageGroup" )
质量检查步骤
您可以使用QualityCheck步骤进行基准建议以及对管道中的数据质量或模型质量的先前基线进行偏移检查。通过这样做,你可以生成和注册基准对于漂移检查注册Model Step与模型构建阶段通过管道训练的模型一起步进行培训。Model Monitor 可以使用这些基线对模型端点进行漂移检查,这样您就不需要单独执行基线建议。这些区域有:QualityCheckstep 还可以从模型注册表中提取用于漂移检查的基线。这些区域有:QualityCheckstep 利用了亚马逊 SageMaker Model Monitor 预构建容器,其具有一系列模型监控功能,包括约束建议、统计数据生成和基准约束验证。有关更多信息,请参阅 亚马逊 SageMaker 模型监控器预构建容器。
配置 QualityCheck 步
您可以配置QualityCheck步骤来每次在管道中使用以下检查类型时只执行其中一种检查类型。
数据质量检查
检查模型质量
要实现此目的,需要将quality_check_config具有以下检查类型值之一的参数:
DataQualityCheckConfigModelQualityCheckConfig
这些区域有:QualityCheckstep 启动一个处理作业,该作业运行 Model Monitor 预构建的容器,并且需要为检查和处理作业进行专用配置。这些区域有:QualityCheckConfig和CheckJobConfig是这些配置的帮助函数,它们与模型监视器为模型质量或数据质量监控创建基线的方式保持一致。有关模型监视器基准建议的更多信息,请参阅创建基准和创建模型质量基准.
控制漂移检查的步骤行为
这些区域有:QualityCheckstep 需要以下两个布尔标志来控制其行为:
skip_check:此参数表示是否跳过与之前基线的漂移检查。如果设置为False,必须可用已配置的检查类型的先前基准。register_new_baseline:此参数表示是否可以通过步骤属性访问新计算的基线BaselineUsedForDriftCheckConstraints和BaselineUsedForDriftCheckStatistics. 如果设置为False,配置的检查类型的先前基准也必须可用。可通过访问这些BaselineUsedForDriftCheckConstraints和BaselineUsedForDriftCheckStatistics属性。
有关更多信息,请参阅 基线计算、漂移检测和生命周期 ClarifyCheck 和 QualityCheck 亚马逊中的步骤 SageMaker 模型构建管道。
使用基准
您可以直接通过supplied_baseline_statistics和supplied_baseline_constraints参数,或者你可以简单地指定model_package_group_name和QualityCheck一步拉DriftCheckBaselines在模型包组中最新批准的模型包上。当你指定model_package_group_name,supplied_baseline_constraints, 和supplied_baseline_statistics,QualityCheckstep 使用指定的基线supplied_baseline_constraints和supplied_baseline_statistics在的支票类型QualityCheck你正在运行的步骤。
有关使用的更多信息QualityCheck步骤要求,请参阅sagemaker.workflow .steps.质量检查步骤QualityCheck单步进入 SageMaker 管道,请参阅sagemaker-Pipeline 模型监视器-澄清-steps.ipynb
例 创建QualityCheck数据质量检查步骤
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="ml.c5.xlarge", volume_size_in_gb=120, sagemaker_session=sagemaker_session, ) data_quality_check_config = DataQualityCheckConfig( baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"), output_s3_uri=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'dataqualitycheckstep']) ) data_quality_check_step = QualityCheckStep( name="DataQualityCheckStep", skip_check=False, register_new_baseline=False, quality_check_config=data_quality_check_config, check_job_config=check_job_config, supplied_baseline_statistics="s3://sagemaker-us-west-2-555555555555/baseline/statistics.json", supplied_baseline_constraints="s3://sagemaker-us-west-2-555555555555/baseline/constraints.json", model_package_group_name="MyModelPackageGroup" )
Amazon EMR 步骤
您可以使用 Amazon SageMaker 模型构建管道Amazon EMR步骤来处理EMR 步骤到运行中的 Amazon EMR 集群。有关更多信息,请参阅 。开始使用 Amazon EMR.
亚马逊 EMR 步骤需要EMRStepConfig具有将由 Amazon EMR 集群使用的 JAR 的 Amazon S3 位置以及要传递的任何参数以及 Amazon EMR 群集 ID。
Amazon EMR 步骤要求传递给管道的角色具有额外的权限。您应该附上Amazon托管策略:AmazonSageMakerPipelinesIntegrations转到管道角色,或者确保该角色包含该策略中的权限。
不支持 Amazon EMR(EKS)。
只能在处于以下某种状态的集群上运行 EMR 步骤:
STARTING、BOOTSTRAPPING、RUNNING,或者WAITING.您最多可拥有 256 个 EMR 步骤
PENDING在 EMR 群集上;提交超出该限制的 EMR 步骤会导致管道执行失败。您可以考虑使用管道步骤的重试策略.
例 创建 Amazon EMR 步骤定义,在 EMR 群集上启动新作业。
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_config = EMRStepConfig( jar="s3://path/to/jar/MyJar.jar", # required, S3 path to jar args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) step_emr = EMRStep ( name="EMRSampleStep", # required cluster_id="j-1YONHTCP3YZKC", # required step_config=emr_config, # required display_name="My EMR Step", description="Pipeline step to execute EMR job" )
失败步骤
你使用FailStep阻止亚马逊 SageMaker 在未达到所需条件或状态时执行模型构建管道,并将该管道的执行标记为失败。这些区域有:FailStep还允许您输入自定义错误消息,指示管道执行失败的原因。
当您时FailStep而其他管道步骤同时执行,则在所有并发步骤完成之前,管道才会终止。
使用的限制FailStep
您无法添加
FailStep到DependsOn其他步骤列表。有关更多信息,请参阅 步骤间的自定义依赖。其他步骤无法引用
FailStep. 这是总是是管道执行的最后一步。您无法重试以结尾的管道执行
FailStep.
您可以创建FailStep ErrorMessage采用静态文本字符串的形式。或者,您也可以使用管道参数一个加入
以下示例代码片段使用FailStep用一个ErrorMessage配置了管道参数和Joinoperation.
from sagemaker.workflow.fail_step import FailStep from sagemaker.workflow.functions import Join from sagemaker.workflow.parameters import ParameterInteger mse_threshold_param = ParameterInteger(name="MseThreshold", default_value=5) step_fail = FailStep( name="AbaloneMSEFail", error_message=Join( on=" ", values=["Execution failed due to MSE >", mse_threshold_param] ), )
步骤属性
这些区域有:properties属性用于在管道中的步骤之间添加数据依赖关系。然后将使用这些数据依赖关系 SageMaker 用于根据管道定义构造 DAG 的管道。这些属性可以作为占位符值引用,并在运行时解析。
这些区域有:properties属性 SageMaker Pipeline 步骤与返回的对象匹配Describe呼叫相应的 SageMaker 任务类型。对于每种作业类型,Describe调用将返回以下响应对象:
-
ProcessingStep–描述处理作业 -
TrainingStep–DescribeTrainingJob -
TransformStep–描述转换Job
步进并行度
当步骤不依赖于任何其他步骤时,它会在管道执行后立即运行。但是,parallel 行执行太多的管道步骤可以快速耗尽可用资源。使用控制管道执行的并发步骤数ParallelismConfiguration.
下面的示例使用ParallelismConfiguration将并发步长限制设置为 5。
pipeline.create( parallelism_config=ParallelismConfiguration(5), )
步骤之间的数据依赖
您可以通过指定步骤之间的数据关系来定义 DAG 的结构。要在步骤之间创建数据依赖关系,请将一个步骤的属性作为输入传递给管道中的另一个步骤。在提供输入的步骤完成运行之后,接收输入的步骤才会开始。
数据依赖关系使用 JsonPath 采用以下格式的符号。此格式遍历 JSON 属性文件,这意味着你可以追加尽可能多的<property>实例以访问文件中所需的嵌套属性。有关 JsonPath 符号,请参阅JSONPath 存储库
<step_name>.properties.<property>.<property>
以下内容介绍了如何使用指定 Amazon S3 存储桶。ProcessingOutputConfig处理步骤的属性。
step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri
要创建数据依赖关系,请按如下方式将存储桶传递给训练步骤。
step_train = TrainingStep( name="CensusTrain", estimator=sklearn_train, inputs=TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train_data"].S3Output.S3Uri ) )
步骤间的自定义依赖
指定数据依赖关系时, SageMaker Pipeline 提供了步骤之间的数据连接。或者,一个步骤可以访问上一步中的数据,而无需直接使用 SageMaker 管道。在这种情况下,您可以创建一个自定义依赖项,告诉 SageMaker 在另一个步骤完成运行之前,管道不要开始一个步骤。您可以通过指定步骤来创建自定义依赖项DependsOn属性。
例如,下面定义了一个步骤C只有在两个步骤之后才开始A单步进入B完成跑步。
{ 'Steps': [ {'Name':'A', ...}, {'Name':'B', ...}, {'Name':'C', 'DependsOn': ['A', 'B']} ] }
如果依赖项会创建循环依赖关系,则 SageMaker Pipeline 会抛出验证异常。
以下示例创建了一个训练步骤,该步骤在处理步骤完成运行后开始。
processing_step = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step])
以下示例创建了一个训练步骤,该步骤在两个不同的处理步骤完成运行之前才开始。
processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step_1, processing_step_2])
下面提供了创建自定义依赖关系的另一种方法。
training_step.add_depends_on([processing_step_1]) training_step.add_depends_on([processing_step_2])
以下示例创建了一个训练步骤,该步骤接收来自一个处理步骤的输入,并等待其他处理步骤完成运行。
processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep( ..., inputs=TrainingInput( s3_data=processing_step_1.properties.ProcessingOutputConfig.Outputs[ "train_data" ].S3Output.S3Uri ) training_step.add_depends_on([processing_step_2])
以下示例说明如何检索某个步骤的自定义依赖项的字符串列表。
custom_dependencies = training_step.depends_on
在步骤中使用自定义映像
您可以使用任何可用的 SageMaker深度学习容器映像
您也可以使用创建步骤 SageMaker Amazon S3 应用程序。一个 SageMaker Amazon S3 应用程序是一个 tar.gz 捆绑包,其中包含一个或多个可以在该捆绑包中运行的 Python 脚本。有关应用程序包捆绑的更多信息,请参阅直接从模型项目部署
您还可以将自己的容器与管道步骤一起使用。因为你无法在亚马逊内创建图片 SageMaker Studio,您必须先使用其他方法创建图片,然后才能与亚马逊一起使用 SageMaker 模型构建管道。
要在为管道创建步骤时使用自己的容器,请在估计器定义中包含图像 URI。有关将您的容器与 SageMaker 搭配使用的更多信息,请参阅在 SageMaker 中使用 Docker 容器.