使用 API 安排 Amazon Sag

使用 API 安排 Amazon SageMaker 笔记本作业并管理多步骤工作流

关键要点

  • Amazon SageMaker 增加了支持通过 API 调用程序化运行笔记本作业。
  • 新功能使数据科学家能够轻松创建多步骤的机器学习工作流。
  • 使用 SageMaker Pipelines 集成笔记本作业,并在 SageMaker Studio 中管理和可视化它们。

作者:Anchit Gupta、Ram Vegiraju 和 Edward Sun,发表于 2023 年 11 月 29 日, | | |


Amazon SageMaker Studio 提供了一个完全托管的解决方案,让数据科学家可以交互式地构建、训练和部署机器学习(ML)模型。通过新的 Amazon SageMaker 笔记本作业功能,数据科学家可以在 SageMaker Studio 中轻松地按需或按计划运行笔记本。此外,借助 Amazon SageMaker Pipelines 中提供的 API,您可以通过代码程序化地运行笔记本作业,并创建包含多个依赖笔记本的多步骤 ML工作流。

SageMaker Pipelines 是一个原生的工作流编排工具,用于构建 ML 管道,充分利用 SageMaker 的直接集成。每个 SageMaker管道由多个组成,对应于单个任务,例如使用 进行处理、训练或数据处理。SageMaker 笔记本作业现在作为 SageMaker 管道中的内置步骤类型提供,您可以使用 轻松运行这些笔记本作业步骤。此外,您可以将多个依赖的笔记本结合在一起,创建有向无环图(DAG)形式的工作流,从而在 SageMaker Studio中运行这些笔记本作业或 DAG,同步进行管理和可视化。

数据科学家目前使用 SageMaker Studio 互动开发 Jupyter 笔记本,然后使用 SageMaker笔记本作业按计划运行这些笔记本。这些作业可以立即运行或按规则定期执行,而无需数据工作者将代码重构为 Python 模块。常见用例包括:

  • 在后台运行长时间的笔记本
  • 定期进行模型推理,生成报告
  • 从准备小样本数据集扩展到处理 PB 级别的大数据
  • 在适当时间重新训练和部署模型
  • 为模型质量或数据漂移监控安排作业
  • 探索参数空间以优化模型

尽管这一功能使得数据工作者能够轻松自动化独立的笔记本,但 ML工作流常常由多个笔记本组成,每个笔记本执行特定任务并存在复杂依赖。例如,一个监控模型数据漂移的笔记本应该有一个预步骤来处理新数据,并在检测到显著漂移时进行模型刷新和培训。此外,数据科学家可能希望定期触发这个整体工作流,以基于新数据更新模型。为了帮助您轻松自动化笔记本并创建此类复杂工作流,SageMaker笔记本作业现在作为 SageMaker Pipelines 中的步骤可用。本文展示了如何通过几行代码解决以下用例:

  • 程序化地立即或按计划运行独立的笔记本
  • 创建多步骤笔记本工作流的 DAG,用于持续集成和持续交付(CI/CD),并通过 SageMaker Studio 用户界面进行管理

解决方案概述

以下图表展示了我们的解决方案架构。您可以使用 SageMaker Python SDK 运行单个笔记本作业或工作流。此功能创建一个 SageMaker训练作业来运行笔记本。

删除)

在接下来的部分中,我们将通过一个示例 ML 用例演示如何创建笔记本作业工作流,传递不同笔记本步骤之间的参数,安排工作流并通过 SageMakerStudio 进行监控。

在我们的 ML问题示例中,构建的是情感分析模型,这是一种文本分类任务。情感分析的常见应用包括社交媒体监控、客户支持管理和客户反馈分析。我们使用的示例数据集是斯坦福情感树库(SST2)数据集,包含电影评论和一个整数(0或 1),表示评论的积极或消极情感。

以下是与 SST2 数据集对应的 data.csv 文件示例,显示了其前两列中的值。请注意,该文件不应包含任何标题。

列 1列 2
0隐藏来自父母的新的分泌物
0仅包含艰涩的笑话,没有机智
1令人爱不释手,传达出对人性的深刻美好
0对保持不变感到彻底满意
0作者引用了最糟糕“书呆子复仇”陈词滥调
0这是太悲惨了,不值得碰这样的肤浅主题
1证明这样好莱坞大片的导演仍然可以制作出情感深重的小型个性电影

在这个 ML 示例中,我们需要执行几个任务:

  1. 进行特征工程,将数据集准备为模型可以理解的格式。
  2. 特征工程完成后,运行使用 Transformers 的训练步骤。
  3. 设置批量推理,与微调后的模型一起预测新评论的情感。
  4. 设置数据监控步骤,定期监控新数据的质量漂移,必要时重新训练模型权重。

通过将笔记本作业作为 SageMaker管道中的一个步骤引入,我们可以组织这一包含三个独特步骤的工作流。工作流的每一步都在不同的笔记本中开发,然后转换为独立的笔记本作业步骤并连结为管道:

  • 预处理 – 从 (Amazon S3)下载公共 SST2 数据集,并为后续步骤创建 CSV 文件以供执行。SST2 数据集是一个具有两个标签(0 和 1)以及可分类文本的文本分类数据集。
  • 训练 – 获取调整好的 CSV 文件,使用 BERT 进行文本分类的微调,利用 Transformers 库。我们在这一步骤中使用一个测试数据准备笔记本,作为微调和批量推理步骤的依赖项。微调完成后,这个笔记本将通过运行魔法准备供微调模型进行样本推理的测试数据集。
  • 转换与监控 – 执行批量推理,并通过模型监控设置数据质量,以取得基线数据集的建议。

删除)

运行笔记本

此解决方案的示例代码可在 上找到。

创建 SageMaker 笔记本作业步骤与创建其他 SageMaker 管道步骤类似。在这个笔记本示例中,我们使用 SageMaker Python SDK组织工作流。要在 SageMaker Pipelines 中创建笔记本步骤,可以定义以下参数:

  • 输入笔记本 – 当前步骤将要协调的笔记本名称。您可以传递本地输入笔记本的路径。可选地,如果此笔记本运行其他笔记本,可以将其传递到 AdditionalDependencies 参数。
  • 镜像 URI – 笔记本作业步骤后面的 Docker 镜像。这可以是 SageMaker 已提供的预定义镜像,或者是您定义并推送到 (Amazon ECR)的自定义镜像。有关支持的镜像,请参见本文末尾的注意事项部分。
  • 内核名称 – 在 SageMaker Studio 中使用的内核名称。此内核规范在您提供的镜像中注册。
  • 实例类型(可选) – 笔记本作业中定义并将运行的 (Amazon EC2)实例类型。
  • 参数(可选) – 允许传递到笔记本中的参数。这些参数可以以键值对的形式定义。此外,这些参数可以在不同的笔记本作业运行或管道运行之间修改。

我们的示例包含五个笔记本:

  • nb-job-pipeline.ipynb – 这是我们的主笔记本,在其中定义我们的管道和工作流。
  • preprocess.ipynb – 这是工作流中的第一步笔记本,包含从公共 AWS 数据集提取并创建 CSV 文件的代码。
  • training.ipynb – 这是工作流中的第二步笔记本,包含独立进行训练和微调的代码,使用前一步的 CSV 文件。此步骤还依赖于 prepare-test-set.ipynb 笔记本,以便从中获取测试数据集进行样本推理。
  • prepare-test-set.ipynb – 该笔记本创建一个训练笔记本在第二步管道中使用的测试数据集,并用于微调模型的样本推理。
  • transform-monitor.ipynb – 这是工作流中的第三步笔记本,使用基本 BERT 模型运行 SageMaker 批量转换作业,同时设置模型监控的数据质量。

接下来,我们将展示主笔记本 nb-job- pipeline.ipynb,它将所有子笔记本整合成一个管道,并运行端到端工作流。请注意,尽管以下示例仅运行一次笔记本,但您还可以安排管道重复执行该笔记本。有关详细说明,请参考 。

在我们第一个笔记本作业步骤中,我们传递默认 S3桶的参数,可以利用该桶保存其他管道步骤的可用工件。在第一个笔记本(preprocess.ipynb)中,我们下载 AWS 公共 SST2训练数据集,并创建相应的训练 CSV 文件,上传到 S3 桶中。如以下代码所示:


# 参数

print(default_s3_bucket)

!aws s3 cp s3://sagemaker-sample-files/datasets/text/SST2/sst2.trainsst2.train

# 为了加快执行,只读取前 500 行

with open('sst2.train', 'r') as f: lines = f.readlines()[:500]

data = [] for line in lines: label, text = line.strip().split(' ', 1)
data.append((int(label), text))

df = pd.DataFrame(data, columns=['label', 'text']) df.to_csv("train.csv",
index=False) # 创建 smaller 数据集的 CSV 文件 !aws s3 cp "train.csv"
{default_s3_bucket} ```

我们可以将此笔记本转换为 `NotebookJobStep`,并在主笔记本中如下执行:

```python

# 提供 S3 桶以存储工件

nb_job_params = {"default_s3_bucket": notebook_artifacts}

preprocess_nb_step = NotebookJobStep( name=preprocess_step_name,
description=preprocess_description, notebook_job_name=preprocess_job_name,
image_uri=image_uri, kernel_name=kernel_name, display_name=display_name,
role=role, input_notebook=preprocess_notebook, instance_type="ml.m5.4xlarge",
parameters=nb_job_params, ) ```

现在我们有了一个示例 CSV 文件,接下来可以在训练笔记本中开始训练模型。训练笔记本获取相同的 S3 桶参数并从中提取训练数据集。然后,我们利用
Transformers 的训练器对象执行微调,以下是代码片段:

```python from transformers import TrainingArguments, Trainer training_args =
TrainingArguments(output_dir="test_trainer", evaluation_strategy="epoch")

trainer = Trainer( model=model, args=training_args,
train_dataset=small_train_dataset, eval_dataset=small_eval_dataset,
compute_metrics=compute_metrics, )

trainer.train() ```

微调完成后,我们希望进行一些批量推理,以查看模型的表现。这通过同一本地路径中的单独笔记本(`prepare-test-
set.ipynb`)实现,该笔记本创建了用于推理的测试数据集。我们可以在训练笔记本中通过以下魔法单元运行额外的笔记本:

`python %run 'prepare-test-set.ipynb'`

我们在第二个笔记本作业步骤中的 `AdditionalDependencies` 参数中定义这一额外的笔记本依赖:

`python train_nb_step = NotebookJobStep( name=training_step_name,
description=training_description, notebook_job_name=training_job_name,
input_notebook=training_notebook,
additional_dependencies=[test_data_prep_notebook], image_uri=image_uri,
kernel_name=kernel_name, display_name=display_name,
instance_type="ml.m5.12xlarge", role=role, parameters=nb_job_params, )`

我们还必须通过如下使用 `add_depends_on` API 调用来指定训练笔记本作业步骤(步骤 2)依赖于预处理笔记本作业步骤(步骤 1):

`python train_nb_step.add_depends_on(或 步骤不同。我们此步骤的笔记本将执行这些相同的
API,但会以笔记本作业步骤的形式跟踪。此步骤依赖于先前定义的训练作业步骤,因此我们也需要用 `depends_on` 标志捕捉该依赖。

`python batch_monitor_step = NotebookJobStep( name=batch_monitor_step_name,
description=batch_monitor_description,
notebook_job_name=batch_monitor_job_name,
input_notebook=batch_monitor_notebook, image_uri=image_uri,
kernel_name=kernel_name, display_name=display_name,
instance_type="ml.m5.12xlarge", role=role, parameters=nb_job_params, )
batch_monitor_step.add_depends_on([train_nb_step])`

在定义工作流的各个步骤后,我们可以创建并运行端到端管道:

```python

# 创建管道

pipeline = Pipeline( name=pipeline_name, steps=[preprocess_nb_step,
train_nb_step, batch_monitor_step], )

# 执行管道

pipeline.create(session.get_execution_role()) execution =
pipeline.start(parameters={}) execution.wait(delay=30, max_attempts=60)
execution_steps = execution.list_steps() print(execution_steps) ```

## 监控管道运行

您可以通过 SageMaker Pipelines DAG 跟踪和监控笔记本步骤运行的情况,如下图所示。

删除)

此外,您还可以选择在笔记本作业仪表板上监控单个笔记本的运行,并通过 SageMaker Studio UI 切换已创建的输出文件。当在 SageMakerStudio 之外使用此功能时,可以通过使用标签定义可以跟踪笔记本作业仪表板运行状态的用户。有关要包含的标签的更多详细信息,请参见 。

删除)

在本示例中,我们将生成的笔记本作业输出到您本地路径下名为 `outputs` 的目录,如下图所示,这里可以查看输入笔记本及其定义的任何参数。

![输出结果](https://d2908

Leave a Reply

Required fields are marked *