import os
import sagemaker
import logging
import boto3
import sagemaker
import pandas as pd
import json
import botocore
from botocore.exceptions import ClientError
= botocore.config.Config(user_agent_extra='dlai-pds/c2/w3')
config
# low-level service client of the boto3 session
= boto3.client(service_name='sagemaker',
sm =config)
config
= boto3.client('sagemaker-runtime',
sm_runtime =config)
config
= sagemaker.Session(sagemaker_client=sm,
sess =sm_runtime)
sagemaker_runtime_client
= sess.default_bucket()
bucket = sagemaker.get_execution_role()
role = sess.boto_region_name region
1 Introduction
In earlier articles we introduced AWS cloud services for data science, and showed how it can help with different stages of the data science & machine learning workflow.
In this project will look at the deploy and manage phase for the workflow using AWS Sagemaker Pipelines, which will actually involve all previous phases.
In particular we will do the following:
- Define and run a pipeline using a directed acyclic graph (DAG) with specific pipeline parameters and model hyper-parameters
- Define a processing step that cleans, balances, transforms, and splits our dataset into train, validation, and test dataset
- Define a training step that trains a model using the train and validation datasets
- Define a processing step that evaluates the trained model’s performance on the test dataset
- Define a register model step that creates a model package from the trained model
- Define a conditional step that checks the model’s performance and conditionally registers the model for deployment
Using the raw Women’s Clothing Reviews dataset - we will prepare it to train a deep learning BERT-based natural language processing (NLP) model. The model will be used to classify customer reviews into positive (1), neutral (0) and negative (-1) sentiment.
2 What are MLOPS ?
MLOPS stands for Machine Learning Operations - but what does that mean?
MLOps builds on DevOps practices that encompass people, process, and technology. However, MLOps also includes considerations and practices that are really unique to machine learning workloads. All of these practices aim to be able to deliver machine learning workloads quickly to production while still maintaining high quality consistency and ensuring end-to-end traceability.
It’s important to consider that the machine learning development life cycle is very different than the software development life cycle for a variety of reasons.
First, the model development life cycle is difficult to plan for from a project management perspective. It typically includes longer experimentation cycles than you would see in a standard agile software development process. Also the development of machine learning models includes data tasks like feature engineering and data preparation. You also have data processing code, as well as new inputs and artifacts to consider for versioning. You also have additional pipeline task as well. When you start to look at automating the machine learning workflow, the inputs and artifacts that are generated across these tasks result in multiple disparate pipelines with dependencies that can be a bit more challenging, stitched together than a typical software development workflow.
Second, some models exist by themselves where you might be manually reading prediction requests and getting responses through a batch process or even within your notebook on an ad hoc basis. This is especially true in research environments. However, in many cases, a model is typically a small part of an overall solution that incorporates machine-learning. While that model is still a very key component to that solution, most often there is a need for other components that need to be built or integrated. As an example, consider your product review use case and your model that is predicting the classes of sentiment for a product review. That model itself will be able to classify the sentiment related to a product, but you also need to consider how that prediction will actually be used and potentially integrated into other existing applications. For this, there may be additional tasks like creating a rest API as a common interface for other applications to integrate with your model or even building applications that can respond to those reviews. This could mean creating automation to initiate back-end processes that allow for customer support engineers to quickly react and respond to any negative reviews.
A third consideration is that where typically multiple personas span the machine learning development lifecycle, and all are really needed to ultimately be able to build, deploy, integrate, and operate a machine learning workload. This can create challenges as these personas often have competing priorities and needs. There may also be skill gaps in building an operating machine learning workloads. As an example, a data scientist may not have a traditional IT background. While they may be very comfortable in creating a model that meets the performance objectives that have been identified for your particular machine learning use case, they may not know how to host that model in a way that it can be consumed by other applications or other systems. In this case, there may be a need to have a deployment engineer that is also engaged to help in building out the infrastructure and the resources that are needed to operate and host that model.
Also, you might need to integrate that hosted model with another application. In this case, you’re likely to depend on a software engineer to perform that integration. If there isn’t a cross-functional team with the same project goals in place, competing priorities and skill gaps across these personas make it really difficult to provide that path to production for your model.
Finally, many teams have processes in place supporting different regulatory or even internal corporate requirements. This means that when you’re creating your machine learning pipeline, sometimes you also need to be able to ensure that traditional practices can be included inside the steps of your pipeline. Something like change management as an example here. This may mean that within your pipeline, you’re going to automatically open a change ticket anytime a new model gets deployed to production. Or maybe it’s a manual approval that’s required before your model can deploy to production. All of these processes may need to be incorporated inside your machine learning pipeline.
MLOps aims to provide the most efficient path to production by reducing manual hand-offs between the steps in your workflow, increasing automation within those steps in your workflow, and then going a step further to orchestrate the steps across your workflow.
3 AWS Pipelines Terminology
This project focuses on the following features of Amazon SageMaker Pipelines:
- Pipelines - a directed acyclic graph (DAG) of steps and conditions to orchestrate SageMaker jobs and resource creation
- Processing job steps - a simplified, managed experience on SageMaker to run data processing workloads, such as feature engineering, data validation, model evaluation, and model explainability
- Training job steps - an iterative process that teaches a model to make predictions on new data by presenting examples from a training dataset
- Conditional step execution - provides conditional execution of branches in a pipeline
- Registering models - register a model in a model registry to create a deployable models in Amazon SageMaker
- Parameterized pipeline executions - allows pipeline executions to vary by supplied parameters
- Model endpoint - hosts the model as a REST endpoint to serve predictions from new data
4 Creating a BERT Pipeline
The pipeline that we will create follows a typical machine learning application pattern of pre-processing, training, evaluation, and model registration.
In the processing step, we will perform feature engineering to transform the review_body
text into BERT embeddings using the pre-trained BERT model and split the dataset into train, validation and test files. The transformed dataset is stored in a feature store. To optimize for Tensorflow training, the transformed dataset files are saved using the TFRecord format in Amazon S3.
In the training step, we will fine-tune the BERT model to the customer reviews dataset and add a new classification layer to predict the sentiment
for a given review_body
.
In the evaluation step, we will take the trained model and a test dataset as input, and produce a JSON file containing classification evaluation metrics.
In the condition step, we will register the trained model if the accuracy of the model, as determined by our evaluation step, exceeds a given threshold value.
First, let’s install the required modules.
Let’s setup the pipeline name.
import time
= int(time.time())
timestamp
= 'BERT-pipeline-{}'.format(timestamp) pipeline_name
5 Configure the dataset and processing step
5.1 Configure S3 path for raw input data
The raw dataset is in the public S3 bucket. Let’s start by specifying the S3 location of it:
= 's3://dlai-practical-data-science/data/raw/'
raw_input_data_s3_uri print(raw_input_data_s3_uri)
s3://dlai-practical-data-science/data/raw/
List the files in the S3 bucket (in this case it will be just one file):
!aws s3 ls $raw_input_data_s3_uri
2021-04-30 02:21:06 8457214 womens_clothing_ecommerce_reviews.csv
5.2 Configure processing step
For the pipeline workflow we will need to create workflow parameters of a specific type: integer, string, or float.
from sagemaker.workflow.parameters import (
ParameterInteger,
ParameterString,
ParameterFloat, )
Now set the parameters for the processing step.
= ParameterString(
processing_instance_type ="ProcessingInstanceType",
name="ml.c5.2xlarge"
default_value
)
= ParameterInteger(
processing_instance_count ="ProcessingInstanceCount",
name=1
default_value
)
= ParameterFloat(
train_split_percentage ="TrainSplitPercentage",
name=0.90,
default_value
)
= ParameterFloat(
validation_split_percentage ="ValidationSplitPercentage",
name=0.05,
default_value
)
= ParameterFloat(
test_split_percentage ="TestSplitPercentage",
name=0.05,
default_value
)
= ParameterString(
balance_dataset ="BalanceDataset",
name="True",
default_value
)
= ParameterInteger(
max_seq_length ="MaxSeqLength",
name=128,
default_value
)
= ParameterString(
feature_store_offline_prefix ="FeatureStoreOfflinePrefix",
name="reviews-feature-store-" + str(timestamp),
default_value
)
= ParameterString(
feature_group_name ="FeatureGroupName",
name="reviews-feature-group-" + str(timestamp)
default_value
)
= ParameterString(
input_data ="InputData",
name=raw_input_data_s3_uri,
default_value )
Setting up scikit-learn-based processor, pass the SageMaker execution role, processing instance type and instance count.
from sagemaker.sklearn.processing import SKLearnProcessor
= SKLearnProcessor(
processor ='0.23-1',
framework_version=role,
role=processing_instance_type,
instance_type=processing_instance_count,
instance_count={'AWS_DEFAULT_REGION': region},
env )
Now we will use the processor instance to construct a ProcessingStep
, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to a processor instance’s run
method, for those familiar with the existing Python SDK.
Note the "sentiment-train"
, "sentiment-validation"
and "sentiment-test"
named channels specified in the output configuration for the processing job. Such step Properties
can be used in subsequent steps and will resolve to their runtime values at execution. In particular, we will call out this usage defining the training step.
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
=[
processing_inputs
ProcessingInput(='raw-input-data',
input_name=input_data,
source='/opt/ml/processing/input/data/',
destination='ShardedByS3Key'
s3_data_distribution_type
)
]
=[
processing_outputs='sentiment-train',
ProcessingOutput(output_name='/opt/ml/processing/output/sentiment/train',
source='EndOfJob'),
s3_upload_mode='sentiment-validation',
ProcessingOutput(output_name='/opt/ml/processing/output/sentiment/validation',
source='EndOfJob'),
s3_upload_mode='sentiment-test',
ProcessingOutput(output_name='/opt/ml/processing/output/sentiment/test',
source='EndOfJob')
s3_upload_mode
]
= ProcessingStep(
processing_step ='Processing',
name='src/prepare_data.py',
code=processor,
processor=processing_inputs,
inputs=processing_outputs,
outputs=['--train-split-percentage', str(train_split_percentage.default_value),
job_arguments'--validation-split-percentage', str(validation_split_percentage.default_value),
'--test-split-percentage', str(test_split_percentage.default_value),
'--balance-dataset', str(balance_dataset.default_value),
'--max-seq-length', str(max_seq_length.default_value),
'--feature-store-offline-prefix', str(feature_store_offline_prefix.default_value),
'--feature-group-name', str(feature_group_name.default_value)
]
)
print(processing_step)
ProcessingStep(name='Processing', step_type=<StepTypeEnum.PROCESSING: 'Processing'>)
Now we can call out the properties of the processing job as an object using the command processing_step.properties
. To print out and explore the attributes use __dict__
method.
# print out the list of the processing job properties
print(json.dumps(
processing_step.properties.__dict__,=4, sort_keys=True, default=str
indent ))
{
"AppSpecification": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5298a10>",
"AutoMLJobArn": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5431d10>",
"CreationTime": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5431950>",
"Environment": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5298690>",
"ExitMessage": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5431d50>",
"ExperimentConfig": "<sagemaker.workflow.properties.Properties object at 0x7fcdf52a0b10>",
"FailureReason": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5431750>",
"LastModifiedTime": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5431a10>",
"MonitoringScheduleArn": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5431190>",
"NetworkConfig": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5298b90>",
"ProcessingEndTime": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5431610>",
"ProcessingInputs": "<sagemaker.workflow.properties.PropertiesList object at 0x7fcdf5298350>",
"ProcessingJobArn": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5431410>",
"ProcessingJobName": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5298590>",
"ProcessingJobStatus": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5431310>",
"ProcessingOutputConfig": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5298510>",
"ProcessingResources": "<sagemaker.workflow.properties.Properties object at 0x7fcdf52984d0>",
"ProcessingStartTime": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5431ed0>",
"RoleArn": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5298650>",
"StoppingCondition": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5298a50>",
"TrainingJobArn": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5431bd0>",
"_path": "Steps.Processing",
"_shape_name": "DescribeProcessingJobResponse"
}
Pull the channel sentiment-train
from the output configuration of the processing job. Print out the attributes of the resulting object:
print(json.dumps(
'sentiment-train'].__dict__,
processing_step.properties.ProcessingOutputConfig.Outputs[=4, sort_keys=True, default=str
indent ))
{
"AppManaged": "<sagemaker.workflow.properties.Properties object at 0x7fcdf543c490>",
"FeatureStoreOutput": "<sagemaker.workflow.properties.Properties object at 0x7fcdf54d4510>",
"OutputName": "<sagemaker.workflow.properties.Properties object at 0x7fcdf5384650>",
"S3Output": "<sagemaker.workflow.properties.Properties object at 0x7fcdf53845d0>",
"_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-train']",
"_shape_name": "ProcessingOutput"
}
Now we can pull and print out attributes of the S3 output path related to the sentiment-train
output channel:
print(json.dumps(
'sentiment-train'].S3Output.S3Uri.__dict__,
processing_step.properties.ProcessingOutputConfig.Outputs[=4, sort_keys=True, default=str
indent ))
{
"__str__": "S3Uri",
"_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri",
"_shape_name": "S3Uri"
}
Let’s pull and print out attributes of the S3 output path object related to the sentiment-test
output channel.
print(json.dumps(
'sentiment-test'].S3Output.S3Uri.__dict__,
processing_step.properties.ProcessingOutputConfig.Outputs[=4, sort_keys=True, default=str
indent ))
{
"__str__": "S3Uri",
"_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri",
"_shape_name": "S3Uri"
}
These objects can be passed into the next steps of the workflow. Also, we can pull the arguments of the processing step with the corresponding function. The result is in the dictionary format.
processing_step.arguments.keys()
dict_keys(['ProcessingResources', 'AppSpecification', 'RoleArn', 'ProcessingInputs', 'ProcessingOutputConfig', 'Environment'])
Let’s pull and review processing inputs from the arguments of the processing step:
'ProcessingInputs'] processing_step.arguments[
[{'InputName': 'raw-input-data',
'AppManaged': False,
'S3Input': {'S3Uri': ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://dlai-practical-data-science/data/raw/'),
'LocalPath': '/opt/ml/processing/input/data/',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
'S3DataDistributionType': 'ShardedByS3Key',
'S3CompressionType': 'None'}},
{'InputName': 'code',
'AppManaged': False,
'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-912822595625/sagemaker-scikit-learn-2023-02-12-13-32-22-918/input/code/prepare_data.py',
'LocalPath': '/opt/ml/processing/input/code',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
'S3DataDistributionType': 'FullyReplicated',
'S3CompressionType': 'None'}}]
Let’s now pull and review configuration of the processing outputs from the arguments of the processing step.
'ProcessingOutputConfig'] processing_step.arguments[
{'Outputs': [{'OutputName': 'sentiment-train',
'AppManaged': False,
'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-912822595625/sagemaker-scikit-learn-2023-02-12-13-32-20-378/output/sentiment-train',
'LocalPath': '/opt/ml/processing/output/sentiment/train',
'S3UploadMode': 'EndOfJob'}},
{'OutputName': 'sentiment-validation',
'AppManaged': False,
'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-912822595625/sagemaker-scikit-learn-2023-02-12-13-32-20-378/output/sentiment-validation',
'LocalPath': '/opt/ml/processing/output/sentiment/validation',
'S3UploadMode': 'EndOfJob'}},
{'OutputName': 'sentiment-test',
'AppManaged': False,
'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-912822595625/sagemaker-scikit-learn-2023-02-12-13-32-20-378/output/sentiment-test',
'LocalPath': '/opt/ml/processing/output/sentiment/test',
'S3UploadMode': 'EndOfJob'}}]}
6 Configure training step
6.1 Define parameters
Setup the parameters for the workflow.
= ParameterString(
freeze_bert_layer ="FreezeBertLayer",
name="False",
default_value
)
= ParameterInteger(
epochs ="Epochs",
name=3
default_value
)
= ParameterFloat(
learning_rate ="LearningRate",
name=0.00001
default_value
)
= ParameterInteger(
train_batch_size ="TrainBatchSize",
name=64
default_value
)
= ParameterInteger(
train_steps_per_epoch ="TrainStepsPerEpoch",
name=50
default_value
)
= ParameterInteger(
validation_batch_size ="ValidationBatchSize",
name=64
default_value
)
= ParameterInteger(
validation_steps_per_epoch ="ValidationStepsPerEpoch",
name=50
default_value
)
= ParameterInteger(
seed ="Seed",
name=42
default_value
)
= ParameterString(
run_validation ="RunValidation",
name="True",
default_value
)
= ParameterInteger(
train_instance_count ="TrainInstanceCount",
name=1
default_value
)
= ParameterString(
train_instance_type ="TrainInstanceType",
name="ml.c5.9xlarge"
default_value
)
= ParameterInteger(
train_volume_size ="TrainVolumeSize",
name=256
default_value
)
= ParameterString(
input_mode ="InputMode",
name="File",
default_value )
6.2 Configure hyper-parameters
Setup the dictionary that will be passed into the hyperparameters argument.
={
hyperparameters'max_seq_length': max_seq_length,
'freeze_bert_layer': freeze_bert_layer,
'epochs': epochs,
'learning_rate': learning_rate,
'train_batch_size': train_batch_size,
'train_steps_per_epoch': train_steps_per_epoch,
'validation_batch_size': validation_batch_size,
'validation_steps_per_epoch': validation_steps_per_epoch,
'seed': seed,
'run_validation': run_validation
}
6.3 Configure model-evaluation metrics
Choose loss and accuracy as the evaluation metrics.
= [
metric_definitions 'Name': 'validation:loss', 'Regex': 'val_loss: ([0-9.]+)'},
{'Name': 'validation:accuracy', 'Regex': 'val_acc: ([0-9.]+)'},
{ ]
6.4 Configure the PyTorchEstimator
Let’s configure an estimator and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to model_dir
so that it can be hosted later.
from sagemaker.pytorch import PyTorch as PyTorchEstimator
= PyTorchEstimator(
estimator ='train.py',
entry_point='src',
source_dir=role,
role=train_instance_count,
instance_count=train_instance_type,
instance_type=train_volume_size,
volume_size='py3',
py_version='1.6.0',
framework_version=hyperparameters,
hyperparameters=metric_definitions,
metric_definitions=input_mode
input_mode )
6.5 Setup pipeline step caching
Step signature caching allows SageMaker Pipelines, before executing a step, to find a previous execution of a step that was called using the same arguments. Cache hit gets created if the previous execution is found. Then during execution instead of recomputing the step, pipelines propagates the values from the cache hit.
Timeout period is defined using ISO 8601 format, it can contain a year, month, week, day, hour, and minute value.
More details on SageMaker Pipeline step caching can be found here.
from sagemaker.workflow.steps import CacheConfig
= CacheConfig(enable_caching=True, expire_after="PT1H") # PT1H represents `one hour` cache_config
6.6 Configure the TrainingStep
Now we configure the TrainingStep
calling the outputs of the processing step:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
= TrainingStep(
training_step ='Train',
name=estimator,
estimator={
inputs'train': TrainingInput(
=processing_step.properties.ProcessingOutputConfig.Outputs[
s3_data'sentiment-train'
].S3Output.S3Uri,='text/csv'
content_type
),'validation': TrainingInput(
=processing_step.properties.ProcessingOutputConfig.Outputs[
s3_data'sentiment-validation'
].S3Output.S3Uri,='text/csv'
content_type
)
},=cache_config
cache_config
)
print(training_step)
TrainingStep(name='Train', step_type=<StepTypeEnum.TRAINING: 'Training'>)
We will use the __dict__
method to print out attributes of the training step properties. Briefly review the result. The attributes match the object model of the DescribeTrainingJob response object.
training_step.properties.__dict__
{'_path': 'Steps.Train',
'_shape_name': 'DescribeTrainingJobResponse',
'TrainingJobName': <sagemaker.workflow.properties.Properties at 0x7fcdf5101310>,
'TrainingJobArn': <sagemaker.workflow.properties.Properties at 0x7fcdf5101350>,
'TuningJobArn': <sagemaker.workflow.properties.Properties at 0x7fcdf5101390>,
'LabelingJobArn': <sagemaker.workflow.properties.Properties at 0x7fcdf51013d0>,
'AutoMLJobArn': <sagemaker.workflow.properties.Properties at 0x7fcdf5101210>,
'ModelArtifacts': <sagemaker.workflow.properties.Properties at 0x7fcdf5101250>,
'TrainingJobStatus': <sagemaker.workflow.properties.Properties at 0x7fcdf51012d0>,
'SecondaryStatus': <sagemaker.workflow.properties.Properties at 0x7fcdf5101110>,
'FailureReason': <sagemaker.workflow.properties.Properties at 0x7fcdf5101150>,
'HyperParameters': <sagemaker.workflow.properties.Properties at 0x7fcdf5101190>,
'AlgorithmSpecification': <sagemaker.workflow.properties.Properties at 0x7fcdf51011d0>,
'RoleArn': <sagemaker.workflow.properties.Properties at 0x7fcdf5101850>,
'InputDataConfig': <sagemaker.workflow.properties.PropertiesList at 0x7fcdf5101750>,
'OutputDataConfig': <sagemaker.workflow.properties.Properties at 0x7fcdf5101490>,
'ResourceConfig': <sagemaker.workflow.properties.Properties at 0x7fcdf51015d0>,
'VpcConfig': <sagemaker.workflow.properties.Properties at 0x7fcdf5424e10>,
'StoppingCondition': <sagemaker.workflow.properties.Properties at 0x7fcdf5424350>,
'CreationTime': <sagemaker.workflow.properties.Properties at 0x7fcdf5424910>,
'TrainingStartTime': <sagemaker.workflow.properties.Properties at 0x7fcdf5424750>,
'TrainingEndTime': <sagemaker.workflow.properties.Properties at 0x7fcdf5424950>,
'LastModifiedTime': <sagemaker.workflow.properties.Properties at 0x7fcdf5424550>,
'SecondaryStatusTransitions': <sagemaker.workflow.properties.PropertiesList at 0x7fcdf5424a10>,
'FinalMetricDataList': <sagemaker.workflow.properties.PropertiesList at 0x7fcdf5424590>,
'EnableNetworkIsolation': <sagemaker.workflow.properties.Properties at 0x7fcdf5424e50>,
'EnableInterContainerTrafficEncryption': <sagemaker.workflow.properties.Properties at 0x7fcdf5424690>,
'EnableManagedSpotTraining': <sagemaker.workflow.properties.Properties at 0x7fcdf5424150>,
'CheckpointConfig': <sagemaker.workflow.properties.Properties at 0x7fcdf5424fd0>,
'TrainingTimeInSeconds': <sagemaker.workflow.properties.Properties at 0x7fcdf5424490>,
'BillableTimeInSeconds': <sagemaker.workflow.properties.Properties at 0x7fcdf5424ad0>,
'DebugHookConfig': <sagemaker.workflow.properties.Properties at 0x7fcdf54246d0>,
'ExperimentConfig': <sagemaker.workflow.properties.Properties at 0x7fcdf53a7d50>,
'DebugRuleConfigurations': <sagemaker.workflow.properties.PropertiesList at 0x7fcdf53a7890>,
'TensorBoardOutputConfig': <sagemaker.workflow.properties.Properties at 0x7fcdf53a7e50>,
'DebugRuleEvaluationStatuses': <sagemaker.workflow.properties.PropertiesList at 0x7fcdf53a7dd0>,
'ProfilerConfig': <sagemaker.workflow.properties.Properties at 0x7fcdf53a7d90>,
'ProfilerRuleConfigurations': <sagemaker.workflow.properties.PropertiesList at 0x7fcdf53a79d0>,
'ProfilerRuleEvaluationStatuses': <sagemaker.workflow.properties.PropertiesList at 0x7fcdf53a7410>,
'ProfilingStatus': <sagemaker.workflow.properties.Properties at 0x7fcdf53a7ad0>,
'RetryStrategy': <sagemaker.workflow.properties.Properties at 0x7fcdf53a7a10>,
'Environment': <sagemaker.workflow.properties.Properties at 0x7fcdf53a7950>,
'WarmPoolStatus': <sagemaker.workflow.properties.Properties at 0x7fcdf53a7f10>}
7 Configure model-evaluation step
First, we will develop an evaluation script that will be specified in the model evaluation processing step. The evaluation script users the trained model and the test dataset to produce a JSON file with classification evaluation metrics such as accuracy.
The evaluation script performs the following steps: * loads in the model * reads in the test data * issues a bunch of predictions against the test data * builds a classification report, including accuracy * saves the evaluation report to the evaluation directory
Create an instance of the SKLearnProcessor
to run our evaluation script as a scikit-learn-based SageMaker processing job.
from sagemaker.sklearn.processing import SKLearnProcessor
= SKLearnProcessor(
evaluation_processor ='0.23-1',
framework_version=role,
role=processing_instance_type,
instance_type=processing_instance_count,
instance_count={'AWS_DEFAULT_REGION': region},
env=7200
max_runtime_in_seconds )
Setup the output PropertyFile
.
from sagemaker.workflow.properties import PropertyFile
= PropertyFile(
evaluation_report ='EvaluationReport',
name='metrics',
output_name='evaluation.json'
path )
Now we use the processor instance to construct a ProcessingStep
, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to a processor instance’s run
method.
from sagemaker.processing import ProcessingInput, ProcessingOutput
= ProcessingStep(
evaluation_step ='EvaluateModel',
name=evaluation_processor,
processor='src/evaluate_model_metrics.py',
code=[
inputs
ProcessingInput(=training_step.properties.ModelArtifacts.S3ModelArtifacts,
source='/opt/ml/processing/input/model'
destination
),
ProcessingInput(=processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri,
source='/opt/ml/processing/input/data'
destination
)
],=[
outputs='metrics',
ProcessingOutput(output_name='EndOfJob',
s3_upload_mode='/opt/ml/processing/output/metrics/'),
source
],=[
job_arguments'--max-seq-length', str(max_seq_length.default_value),
],=[evaluation_report],
property_files )
8 Configure and register model step
8.1 Configure the model for deployment
We will now use the estimator instance that was used for the training step to construct an instance of RegisterModel
. The result of executing RegisterModel
in a pipeline is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients necessary for inference. Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.
A model package group is a collection of model packages. You can create a model package group for a specific ML business problem, and you can keep adding versions/model packages into it. Typically, customers are expected to create a ModelPackageGroup for a SageMaker workflow pipeline so that they can keep adding versions/model packages to the group for every workflow pipeline run.
The construction of RegisterModel
is very similar to an estimator instance’s register
method, for those familiar with the existing Python SDK.
In particular, we will pass in the S3ModelArtifacts
from the training_step
properties.
Of note, here we will be provided a specific model package group name which will be used in the Model Registry and Continuous Integration/Continuous Deployment (CI/CD) work later on. Let’s setup the variables.
= ParameterString(
model_approval_status ="ModelApprovalStatus",
name="PendingManualApproval"
default_value
)
= ParameterString(
deploy_instance_type ="DeployInstanceType",
name="ml.m5.large"
default_value
)
= ParameterInteger(
deploy_instance_count ="DeployInstanceCount",
name=1
default_value )
= f"BERT-Reviews-{timestamp}"
model_package_group_name
print(model_package_group_name)
BERT-Reviews-1676208665
Configure the ModelMetrics
to be stored as metadata.
from sagemaker.model_metrics import MetricsSource, ModelMetrics
= ModelMetrics(
model_metrics =MetricsSource(
model_statistics="{}/evaluation.json".format(
s3_uri"ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
evaluation_step.arguments[
),="application/json"
content_type
)
)
print(model_metrics)
<sagemaker.model_metrics.ModelMetrics object at 0x7fcdf40cd5d0>
Define deployment image for inference.
= sagemaker.image_uris.retrieve(
inference_image_uri ="pytorch",
framework=region,
region="1.6.0",
version="py36",
py_version=deploy_instance_type,
instance_type="inference"
image_scope
)print(inference_image_uri)
763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.6.0-cpu-py36
8.2 Register the model for deployment
Let’s now configure the register model step.
from sagemaker.workflow.step_collections import RegisterModel
= RegisterModel(
register_step ="RegisterModel",
name=estimator,
estimator=inference_image_uri,
image_uri=training_step.properties.ModelArtifacts.S3ModelArtifacts,
model_data=["application/jsonlines"],
content_types=["application/jsonlines"],
response_types=[deploy_instance_type],
inference_instances=[deploy_instance_type],
transform_instances=model_package_group_name,
model_package_group_name=model_approval_status,
approval_status=model_metrics
model_metrics )
9 Create model for deployment step
Let’s configure the model for deployment.
from sagemaker.model import Model
= 'bert-model-{}'.format(timestamp)
model_name
= Model(
model =model_name,
name=inference_image_uri,
image_uri=training_step.properties.ModelArtifacts.S3ModelArtifacts,
model_data=sess,
sagemaker_session=role,
role )
Now we configure create model input:
from sagemaker.inputs import CreateModelInput
= CreateModelInput(
create_inputs =deploy_instance_type,
instance_type )
Lastly we configure the create model step for the workflow.
from sagemaker.workflow.steps import CreateModelStep
= CreateModelStep(
create_step ="CreateModel",
name=model,
model=create_inputs,
inputs )
10 Check accuracy condition step
Finally, we would like to only register this model if the accuracy of the model, as determined by our evaluation step evaluation_step
, exceeded some value. A ConditionStep
allows for pipelines to support conditional execution in the pipeline DAG based on conditions of step properties.
Below, we will:
- define a minimum accuracy value as a parameter
- define a
ConditionGreaterThan
on the accuracy value found in the output of the evaluation step,evaluation_step
. - use the condition in the list of conditions in a
ConditionStep
- pass the
RegisterModel
step collection into theif_steps
of theConditionStep
= ParameterFloat(
min_accuracy_value ="MinAccuracyValue",
name=0.33 # random choice from three classes
default_value )
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
ConditionStep,
JsonGet,
)
= ConditionGreaterThanOrEqualTo(
minimum_accuracy_condition =JsonGet(
left=evaluation_step,
step=evaluation_report,
property_file="metrics.accuracy.value",
json_path
),=min_accuracy_value # minimum accuracy threshold
right
)
= ConditionStep(
minimum_accuracy_condition_step ="AccuracyCondition",
name=[minimum_accuracy_condition],
conditions=[register_step, create_step], # successfully exceeded or equaled the minimum accuracy, continue with model registration
if_steps=[], # did not exceed the minimum accuracy, the model will not be registered
else_steps )
11 Create pipeline
11.1 Define a pipeline of parameters, steps, and conditions
Let’s tie it all up into a workflow pipeline so we can execute it, and even schedule it.
A pipeline requires a name
, parameters
, and steps
. Names must be unique within an (account, region)
pair so you can append the timestamp to the name to reduce the chance of name conflict.
Note:
- All the parameters used in the definitions must be present.
- Steps passed into the pipeline need not be in the order of execution. The SageMaker workflow service will resolve the data dependency DAG as steps the execution complete.
- Steps must be unique to either pipeline step list or a single condition step if/else list.
from sagemaker.workflow.pipeline import Pipeline
= Pipeline(
pipeline =pipeline_name,
name=[
parameters
input_data,
processing_instance_count,
processing_instance_type,
max_seq_length,
balance_dataset,
train_split_percentage,
validation_split_percentage,
test_split_percentage,
feature_store_offline_prefix,
feature_group_name,
epochs,
learning_rate,
train_batch_size,
train_steps_per_epoch,
validation_batch_size,
validation_steps_per_epoch,
freeze_bert_layer,
seed,
train_instance_count,
train_instance_type,
train_volume_size,
input_mode,
run_validation,
min_accuracy_value,
model_approval_status,
deploy_instance_type,
deploy_instance_count
],=[processing_step, training_step, evaluation_step, minimum_accuracy_condition_step],
steps=sess,
sagemaker_session )
Let’s examine the JSON of the pipeline definition that meets the SageMaker Workflow Pipeline DSL specification.
By examining the definition, you are also confirming that the pipeline was well-defined, and that the parameters and step properties resolve correctly.
import json
from pprint import pprint
= json.loads(pipeline.definition())
definition
pprint(definition)
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
{'Metadata': {},
'Parameters': [{'DefaultValue': 's3://dlai-practical-data-science/data/raw/',
'Name': 'InputData',
'Type': 'String'},
{'DefaultValue': 1,
'Name': 'ProcessingInstanceCount',
'Type': 'Integer'},
{'DefaultValue': 'ml.c5.2xlarge',
'Name': 'ProcessingInstanceType',
'Type': 'String'},
{'DefaultValue': 128,
'Name': 'MaxSeqLength',
'Type': 'Integer'},
{'DefaultValue': 'True',
'Name': 'BalanceDataset',
'Type': 'String'},
{'DefaultValue': 0.9,
'Name': 'TrainSplitPercentage',
'Type': 'Float'},
{'DefaultValue': 0.05,
'Name': 'ValidationSplitPercentage',
'Type': 'Float'},
{'DefaultValue': 0.05,
'Name': 'TestSplitPercentage',
'Type': 'Float'},
{'DefaultValue': 'reviews-feature-store-1676208665',
'Name': 'FeatureStoreOfflinePrefix',
'Type': 'String'},
{'DefaultValue': 'reviews-feature-group-1676208665',
'Name': 'FeatureGroupName',
'Type': 'String'},
{'DefaultValue': 3, 'Name': 'Epochs', 'Type': 'Integer'},
{'DefaultValue': 1e-05,
'Name': 'LearningRate',
'Type': 'Float'},
{'DefaultValue': 64,
'Name': 'TrainBatchSize',
'Type': 'Integer'},
{'DefaultValue': 50,
'Name': 'TrainStepsPerEpoch',
'Type': 'Integer'},
{'DefaultValue': 64,
'Name': 'ValidationBatchSize',
'Type': 'Integer'},
{'DefaultValue': 50,
'Name': 'ValidationStepsPerEpoch',
'Type': 'Integer'},
{'DefaultValue': 'False',
'Name': 'FreezeBertLayer',
'Type': 'String'},
{'DefaultValue': 42, 'Name': 'Seed', 'Type': 'Integer'},
{'DefaultValue': 1,
'Name': 'TrainInstanceCount',
'Type': 'Integer'},
{'DefaultValue': 'ml.c5.9xlarge',
'Name': 'TrainInstanceType',
'Type': 'String'},
{'DefaultValue': 256,
'Name': 'TrainVolumeSize',
'Type': 'Integer'},
{'DefaultValue': 'File', 'Name': 'InputMode', 'Type': 'String'},
{'DefaultValue': 'True',
'Name': 'RunValidation',
'Type': 'String'},
{'DefaultValue': 0.33,
'Name': 'MinAccuracyValue',
'Type': 'Float'},
{'DefaultValue': 'PendingManualApproval',
'Name': 'ModelApprovalStatus',
'Type': 'String'},
{'DefaultValue': 'ml.m5.large',
'Name': 'DeployInstanceType',
'Type': 'String'},
{'DefaultValue': 1,
'Name': 'DeployInstanceCount',
'Type': 'Integer'}],
'Steps': [{'Arguments': {'AppSpecification': {'ContainerArguments': ['--train-split-percentage',
'0.9',
'--validation-split-percentage',
'0.05',
'--test-split-percentage',
'0.05',
'--balance-dataset',
'True',
'--max-seq-length',
'128',
'--feature-store-offline-prefix',
'reviews-feature-store-1676208665',
'--feature-group-name',
'reviews-feature-group-1676208665'],
'ContainerEntrypoint': ['python3',
'/opt/ml/processing/input/code/prepare_data.py'],
'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3'},
'Environment': {'AWS_DEFAULT_REGION': 'us-east-1'},
'ProcessingInputs': [{'AppManaged': False,
'InputName': 'raw-input-data',
'S3Input': {'LocalPath': '/opt/ml/processing/input/data/',
'S3CompressionType': 'None',
'S3DataDistributionType': 'ShardedByS3Key',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
'S3Uri': {'Get': 'Parameters.InputData'}}},
{'AppManaged': False,
'InputName': 'code',
'S3Input': {'LocalPath': '/opt/ml/processing/input/code',
'S3CompressionType': 'None',
'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
'S3Uri': 's3://sagemaker-us-east-1-912822595625/sagemaker-scikit-learn-2023-02-12-13-37-28-563/input/code/prepare_data.py'}}],
'ProcessingOutputConfig': {'Outputs': [{'AppManaged': False,
'OutputName': 'sentiment-train',
'S3Output': {'LocalPath': '/opt/ml/processing/output/sentiment/train',
'S3UploadMode': 'EndOfJob',
'S3Uri': 's3://sagemaker-us-east-1-912822595625/sagemaker-scikit-learn-2023-02-12-13-32-20-378/output/sentiment-train'}},
{'AppManaged': False,
'OutputName': 'sentiment-validation',
'S3Output': {'LocalPath': '/opt/ml/processing/output/sentiment/validation',
'S3UploadMode': 'EndOfJob',
'S3Uri': 's3://sagemaker-us-east-1-912822595625/sagemaker-scikit-learn-2023-02-12-13-32-20-378/output/sentiment-validation'}},
{'AppManaged': False,
'OutputName': 'sentiment-test',
'S3Output': {'LocalPath': '/opt/ml/processing/output/sentiment/test',
'S3UploadMode': 'EndOfJob',
'S3Uri': 's3://sagemaker-us-east-1-912822595625/sagemaker-scikit-learn-2023-02-12-13-32-20-378/output/sentiment-test'}}]},
'ProcessingResources': {'ClusterConfig': {'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
'VolumeSizeInGB': 30}},
'RoleArn': 'arn:aws:iam::912822595625:role/sagemaker-studio-vpc-firewall-us-east-1-sagemaker-execution-role'},
'Name': 'Processing',
'Type': 'Processing'},
{'Arguments': {'AlgorithmSpecification': {'EnableSageMakerMetricsTimeSeries': True,
'MetricDefinitions': [{'Name': 'validation:loss',
'Regex': 'val_loss: '
'([0-9.]+)'},
{'Name': 'validation:accuracy',
'Regex': 'val_acc: '
'([0-9.]+)'}],
'TrainingImage': '763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:1.6.0-cpu-py3',
'TrainingInputMode': {'Get': 'Parameters.InputMode'}},
'DebugHookConfig': {'CollectionConfigurations': [],
'S3OutputPath': 's3://sagemaker-us-east-1-912822595625/'},
'HyperParameters': {'epochs': '3',
'freeze_bert_layer': '"False"',
'learning_rate': '1e-05',
'max_seq_length': '128',
'run_validation': '"True"',
'sagemaker_container_log_level': '20',
'sagemaker_job_name': '"pytorch-training-2023-02-12-13-37-28-707"',
'sagemaker_program': '"train.py"',
'sagemaker_region': '"us-east-1"',
'sagemaker_submit_directory': '"s3://sagemaker-us-east-1-912822595625/pytorch-training-2023-02-12-13-37-28-707/source/sourcedir.tar.gz"',
'seed': '42',
'train_batch_size': '64',
'train_steps_per_epoch': '50',
'validation_batch_size': '64',
'validation_steps_per_epoch': '50'},
'InputDataConfig': [{'ChannelName': 'train',
'ContentType': 'text/csv',
'DataSource': {'S3DataSource': {'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3Uri': {'Get': "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri"}}}},
{'ChannelName': 'validation',
'ContentType': 'text/csv',
'DataSource': {'S3DataSource': {'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3Uri': {'Get': "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-validation'].S3Output.S3Uri"}}}}],
'OutputDataConfig': {'S3OutputPath': 's3://sagemaker-us-east-1-912822595625/'},
'ProfilerConfig': {'S3OutputPath': 's3://sagemaker-us-east-1-912822595625/'},
'ProfilerRuleConfigurations': [{'RuleConfigurationName': 'ProfilerReport-1676209048',
'RuleEvaluatorImage': '503895931360.dkr.ecr.us-east-1.amazonaws.com/sagemaker-debugger-rules:latest',
'RuleParameters': {'rule_to_invoke': 'ProfilerReport'}}],
'ResourceConfig': {'InstanceCount': {'Get': 'Parameters.TrainInstanceCount'},
'InstanceType': {'Get': 'Parameters.TrainInstanceType'},
'VolumeSizeInGB': {'Get': 'Parameters.TrainVolumeSize'}},
'RoleArn': 'arn:aws:iam::912822595625:role/sagemaker-studio-vpc-firewall-us-east-1-sagemaker-execution-role',
'StoppingCondition': {'MaxRuntimeInSeconds': 86400}},
'CacheConfig': {'Enabled': True, 'ExpireAfter': 'PT1H'},
'Name': 'Train',
'Type': 'Training'},
{'Arguments': {'AppSpecification': {'ContainerArguments': ['--max-seq-length',
'128'],
'ContainerEntrypoint': ['python3',
'/opt/ml/processing/input/code/evaluate_model_metrics.py'],
'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3'},
'Environment': {'AWS_DEFAULT_REGION': 'us-east-1'},
'ProcessingInputs': [{'AppManaged': False,
'InputName': 'input-1',
'S3Input': {'LocalPath': '/opt/ml/processing/input/model',
'S3CompressionType': 'None',
'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
'S3Uri': {'Get': 'Steps.Train.ModelArtifacts.S3ModelArtifacts'}}},
{'AppManaged': False,
'InputName': 'input-2',
'S3Input': {'LocalPath': '/opt/ml/processing/input/data',
'S3CompressionType': 'None',
'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
'S3Uri': {'Get': "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri"}}},
{'AppManaged': False,
'InputName': 'code',
'S3Input': {'LocalPath': '/opt/ml/processing/input/code',
'S3CompressionType': 'None',
'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
'S3Uri': 's3://sagemaker-us-east-1-912822595625/sagemaker-scikit-learn-2023-02-12-13-37-29-187/input/code/evaluate_model_metrics.py'}}],
'ProcessingOutputConfig': {'Outputs': [{'AppManaged': False,
'OutputName': 'metrics',
'S3Output': {'LocalPath': '/opt/ml/processing/output/metrics/',
'S3UploadMode': 'EndOfJob',
'S3Uri': 's3://sagemaker-us-east-1-912822595625/sagemaker-scikit-learn-2023-02-12-13-35-32-414/output/metrics'}}]},
'ProcessingResources': {'ClusterConfig': {'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
'VolumeSizeInGB': 30}},
'RoleArn': 'arn:aws:iam::912822595625:role/sagemaker-studio-vpc-firewall-us-east-1-sagemaker-execution-role',
'StoppingCondition': {'MaxRuntimeInSeconds': 7200}},
'Name': 'EvaluateModel',
'PropertyFiles': [{'FilePath': 'evaluation.json',
'OutputName': 'metrics',
'PropertyFileName': 'EvaluationReport'}],
'Type': 'Processing'},
{'Arguments': {'Conditions': [{'LeftValue': {'Std:JsonGet': {'Path': 'metrics.accuracy.value',
'PropertyFile': {'Get': 'Steps.EvaluateModel.PropertyFiles.EvaluationReport'}}},
'RightValue': {'Get': 'Parameters.MinAccuracyValue'},
'Type': 'GreaterThanOrEqualTo'}],
'ElseSteps': [],
'IfSteps': [{'Arguments': {'InferenceSpecification': {'Containers': [{'Image': '763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.6.0-cpu-py36',
'ModelDataUrl': {'Get': 'Steps.Train.ModelArtifacts.S3ModelArtifacts'}}],
'SupportedContentTypes': ['application/jsonlines'],
'SupportedRealtimeInferenceInstanceTypes': [{'Get': 'Parameters.DeployInstanceType'}],
'SupportedResponseMIMETypes': ['application/jsonlines'],
'SupportedTransformInstanceTypes': [{'Get': 'Parameters.DeployInstanceType'}]},
'ModelApprovalStatus': {'Get': 'Parameters.ModelApprovalStatus'},
'ModelMetrics': {'ModelQuality': {'Statistics': {'ContentType': 'application/json',
'S3Uri': 's3://sagemaker-us-east-1-912822595625/sagemaker-scikit-learn-2023-02-12-13-35-32-414/output/metrics/evaluation.json'}}},
'ModelPackageGroupName': 'BERT-Reviews-1676208665'},
'Name': 'RegisterModel',
'Type': 'RegisterModel'},
{'Arguments': {'ExecutionRoleArn': 'arn:aws:iam::912822595625:role/sagemaker-studio-vpc-firewall-us-east-1-sagemaker-execution-role',
'PrimaryContainer': {'Environment': {},
'Image': '763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.6.0-cpu-py36',
'ModelDataUrl': {'Get': 'Steps.Train.ModelArtifacts.S3ModelArtifacts'}}},
'Name': 'CreateModel',
'Type': 'Model'}]},
'Name': 'AccuracyCondition',
'Type': 'Condition'}],
'Version': '2020-12-01'}
Now we create a pipeline using the create
method and then print the Amazon Resource Name (ARN) of it.
= pipeline.create(role_arn=role)
response
= response["PipelineArn"]
pipeline_arn print(pipeline_arn)
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
arn:aws:sagemaker:us-east-1:912822595625:pipeline/bert-pipeline-1676208665
11.2 Start Pipeline
Let’s submit our pipeline definition to the Amazon SageMaker Pipeline service. The role passed in will be used by the service to create all the jobs defined in the steps. We will start the pipeline using the parameters passed into the start()
function.
= pipeline.start(
execution =dict(
parameters=raw_input_data_s3_uri,
InputData=1,
ProcessingInstanceCount='ml.c5.2xlarge',
ProcessingInstanceType=128,
MaxSeqLength='True',
BalanceDataset=0.9,
TrainSplitPercentage=0.05,
ValidationSplitPercentage=0.05,
TestSplitPercentage='reviews-feature-store-'+str(timestamp),
FeatureStoreOfflinePrefix='reviews-feature-group-'+str(timestamp),
FeatureGroupName=3,
Epochs=0.000012,
LearningRate=64,
TrainBatchSize=50,
TrainStepsPerEpoch=64,
ValidationBatchSize=64,
ValidationStepsPerEpoch='False',
FreezeBertLayer=42,
Seed=1,
TrainInstanceCount='ml.c5.9xlarge',
TrainInstanceType=256,
TrainVolumeSize='File',
InputMode='True',
RunValidation=0.01,
MinAccuracyValue='PendingManualApproval',
ModelApprovalStatus='ml.m5.large',
DeployInstanceType=1
DeployInstanceCount
)
)
print(execution.arn)
arn:aws:sagemaker:us-east-1:912822595625:pipeline/bert-pipeline-1676208665/execution/h4inlmq7fqwk
11.3 Wait for pipeline execution
Now we can describe execution instance and list the steps in the execution to find out more about the execution.
from pprint import pprint
= execution.describe()
execution_run pprint(execution_run)
{'CreatedBy': {'DomainId': 'd-h9yolcap5nrc',
'UserProfileArn': 'arn:aws:sagemaker:us-east-1:912822595625:user-profile/d-h9yolcap5nrc/sagemaker-user-profile-us-east-1',
'UserProfileName': 'sagemaker-user-profile-us-east-1'},
'CreationTime': datetime.datetime(2023, 2, 12, 13, 37, 41, 761000, tzinfo=tzlocal()),
'LastModifiedBy': {'DomainId': 'd-h9yolcap5nrc',
'UserProfileArn': 'arn:aws:sagemaker:us-east-1:912822595625:user-profile/d-h9yolcap5nrc/sagemaker-user-profile-us-east-1',
'UserProfileName': 'sagemaker-user-profile-us-east-1'},
'LastModifiedTime': datetime.datetime(2023, 2, 12, 13, 37, 41, 761000, tzinfo=tzlocal()),
'PipelineArn': 'arn:aws:sagemaker:us-east-1:912822595625:pipeline/bert-pipeline-1676208665',
'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:912822595625:pipeline/bert-pipeline-1676208665/execution/h4inlmq7fqwk',
'PipelineExecutionDisplayName': 'execution-1676209061894',
'PipelineExecutionStatus': 'Executing',
'ResponseMetadata': {'HTTPHeaders': {'content-length': '815',
'content-type': 'application/x-amz-json-1.1',
'date': 'Sun, 12 Feb 2023 13:37:46 GMT',
'x-amzn-requestid': '5d8ec01a-6a95-4737-802b-82302f7ab368'},
'HTTPStatusCode': 200,
'RequestId': '5d8ec01a-6a95-4737-802b-82302f7ab368',
'RetryAttempts': 0}}
Print the execution display name and its ARN:
= execution_run['PipelineExecutionDisplayName']
execution_run_name print(execution_run_name)
execution-1676209061894
= execution_run['PipelineExecutionArn']
pipeline_execution_arn print(pipeline_execution_arn)
arn:aws:sagemaker:us-east-1:912822595625:pipeline/bert-pipeline-1676208665/execution/h4inlmq7fqwk
11.4 Describe completed pipeline
We will wait for the first step to start running and print the information about it:
import time
30)
time.sleep(
execution.list_steps()
[{'StepName': 'Processing',
'StartTime': datetime.datetime(2023, 2, 12, 13, 37, 42, 570000, tzinfo=tzlocal()),
'StepStatus': 'Executing',
'AttemptCount': 0,
'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:912822595625:processing-job/pipelines-h4inlmq7fqwk-processing-mwnbfz07z3'}}}]
11.5 Wait for the pipeline to complete
To get the information about the pipeline execution we can use a low-level service client of the boto3 session. It is also useful for other operations that you will see below.
In the code below we will be observing the pipeline execution summary and waiting for the execution status to change from Executing
to Succeeded
.
%%time
import time
from pprint import pprint
= boto3.Session().client(service_name='sagemaker', region_name=region)
sm
= sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
executions_response = executions_response[0]['PipelineExecutionStatus']
pipeline_execution_status print(pipeline_execution_status)
while pipeline_execution_status=='Executing':
try:
= sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
executions_response = executions_response[0]['PipelineExecutionStatus']
pipeline_execution_status except Exception as e:
print('Please wait...')
30)
time.sleep(
pprint(executions_response)
Executing
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
[{'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:912822595625:pipeline/bert-pipeline-1676208665/execution/h4inlmq7fqwk',
'PipelineExecutionDisplayName': 'execution-1676209061894',
'PipelineExecutionStatus': 'Succeeded',
'StartTime': datetime.datetime(2023, 2, 12, 13, 37, 41, 761000, tzinfo=tzlocal())}]
CPU times: user 14.7 s, sys: 641 ms, total: 15.4 s
Wall time: 32min 38s
We can list the execution steps to check out the status and artifacts:
= executions_response[0]['PipelineExecutionStatus']
pipeline_execution_status print(pipeline_execution_status)
Succeeded
= executions_response[0]['PipelineExecutionArn']
pipeline_execution_arn print(pipeline_execution_arn)
arn:aws:sagemaker:us-east-1:912822595625:pipeline/bert-pipeline-1676208665/execution/h4inlmq7fqwk
12 Evaluate the model
12.1 Describe evaluation metrics
Now we examine the resulting model evaluation after the pipeline completes.
= None
processing_job_name
# pull the processing step name
for execution_step in reversed(execution.list_steps()):
if execution_step['StepName'] == 'Processing':
=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]
processing_job_name
# get the description of the processing job
= sm.describe_processing_job(ProcessingJobName=processing_job_name)
describe_transform_processing_job_response
# get the output S3 path
= describe_transform_processing_job_response['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
transform_output_s3_uri print('Transform output {}'.format(transform_output_s3_uri))
Transform output s3://sagemaker-us-east-1-912822595625/sagemaker-scikit-learn-2023-02-12-13-32-20-378/output/sentiment-train
# list the files in the resulting output S3 path
!aws s3 ls --recursive $transform_output_s3_uri
2023-02-12 13:48:45 4882265 sagemaker-scikit-learn-2023-02-12-13-32-20-378/output/sentiment-train/part-algo-1-womens_clothing_ecommerce_reviews.tsv
Let’s pull the name of the model-evaluation step and then get the S3 path of the evaluation metrics, which will contain the evaluation report.
= None
processing_job_name
for execution_step in reversed(execution.list_steps()):
if execution_step['StepName'] == 'EvaluateModel':
=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]
processing_job_name
= sm.describe_processing_job(ProcessingJobName=processing_job_name)
describe_evaluation_processing_job_response
= describe_evaluation_processing_job_response['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
evaluation_metrics_s3_uri print('Evaluation output {}'.format(evaluation_metrics_s3_uri))
Evaluation output s3://sagemaker-us-east-1-912822595625/sagemaker-scikit-learn-2023-02-12-13-35-32-414/output/metrics
12.2 Review the evaluation report
Download the evaluation report and print the accuracy.
from pprint import pprint
= sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
evaluation_json
evaluation_metrics_s3_uri
))
pprint(json.loads(evaluation_json))
{'metrics': {'accuracy': {'value': 0.7313915857605178}}}
12.3 List pipeline artifacts
Now let’s find and print the ARN and job name of the training job.
=None
training_job_arn
for execution_step in execution.list_steps():
if execution_step['StepName'] == 'Train':
= execution_step['Metadata']['TrainingJob']['Arn']
training_job_arn
pprint(execution_step)break
print('Training job ARN: {}'.format(training_job_arn))
= training_job_arn.split('/')[-1]
training_job_name print('Training job Name: {}'.format(training_job_name))
{'AttemptCount': 0,
'EndTime': datetime.datetime(2023, 2, 12, 14, 4, 49, 838000, tzinfo=tzlocal()),
'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:912822595625:training-job/pipelines-h4inlmq7fqwk-Train-nYXyWGwBe5'}},
'StartTime': datetime.datetime(2023, 2, 12, 13, 48, 54, 641000, tzinfo=tzlocal()),
'StepName': 'Train',
'StepStatus': 'Succeeded'}
Training job ARN: arn:aws:sagemaker:us-east-1:912822595625:training-job/pipelines-h4inlmq7fqwk-Train-nYXyWGwBe5
Training job Name: pipelines-h4inlmq7fqwk-Train-nYXyWGwBe5
Using similar approach we can find and print the pipeline artifacts.
=None
processing_job_name=None training_job_name
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer
= LineageTableVisualizer(sagemaker.session.Session())
viz
for execution_step in reversed(execution.list_steps()):
pprint(execution_step)if execution_step['StepName'] == 'Processing':
=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]
processing_job_nameprint('Processing job name: {}'.format(processing_job_name))
=processing_job_name))
display(viz.show(processing_job_nameelif execution_step['StepName'] == 'Train':
=execution_step['Metadata']['TrainingJob']['Arn'].split('/')[-1]
training_job_nameprint('Training job name: {}'.format(training_job_name))
=training_job_name))
display(viz.show(training_job_nameelse:
=execution_step))
display(viz.show(pipeline_execution_step5) time.sleep(
{'AttemptCount': 0,
'EndTime': datetime.datetime(2023, 2, 12, 13, 48, 53, 920000, tzinfo=tzlocal()),
'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:912822595625:processing-job/pipelines-h4inlmq7fqwk-processing-mwnbfz07z3'}},
'StartTime': datetime.datetime(2023, 2, 12, 13, 37, 42, 570000, tzinfo=tzlocal()),
'StepName': 'Processing',
'StepStatus': 'Succeeded'}
Processing job name: pipelines-h4inlmq7fqwk-processing-mwnbfz07z3
Name/Source | Direction | Type | Association Type | Lineage Type | |
---|---|---|---|---|---|
0 | s3://...-13-37-36-257/input/code/prepare_data.py | Input | DataSet | ContributedTo | artifact |
1 | s3://dlai-practical-data-science/data/raw/ | Input | DataSet | ContributedTo | artifact |
2 | 68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3 | Input | Image | ContributedTo | artifact |
3 | s3://...02-12-13-32-20-378/output/sentiment-test | Output | DataSet | Produced | artifact |
4 | s3://...13-32-20-378/output/sentiment-validation | Output | DataSet | Produced | artifact |
5 | s3://...2-12-13-32-20-378/output/sentiment-train | Output | DataSet | Produced | artifact |
{'AttemptCount': 0,
'EndTime': datetime.datetime(2023, 2, 12, 14, 4, 49, 838000, tzinfo=tzlocal()),
'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:912822595625:training-job/pipelines-h4inlmq7fqwk-Train-nYXyWGwBe5'}},
'StartTime': datetime.datetime(2023, 2, 12, 13, 48, 54, 641000, tzinfo=tzlocal()),
'StepName': 'Train',
'StepStatus': 'Succeeded'}
Training job name: pipelines-h4inlmq7fqwk-Train-nYXyWGwBe5
Name/Source | Direction | Type | Association Type | Lineage Type | |
---|---|---|---|---|---|
0 | s3://...13-32-20-378/output/sentiment-validation | Input | DataSet | ContributedTo | artifact |
1 | s3://...2-12-13-32-20-378/output/sentiment-train | Input | DataSet | ContributedTo | artifact |
2 | 76310...onaws.com/pytorch-training:1.6.0-cpu-py3 | Input | Image | ContributedTo | artifact |
3 | s3://...qwk-Train-nYXyWGwBe5/output/model.tar.gz | Output | Model | Produced | artifact |
{'AttemptCount': 0,
'EndTime': datetime.datetime(2023, 2, 12, 14, 10, 48, 729000, tzinfo=tzlocal()),
'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:912822595625:processing-job/pipelines-h4inlmq7fqwk-evaluatemodel-uqvunnu2ks'}},
'StartTime': datetime.datetime(2023, 2, 12, 14, 4, 50, 615000, tzinfo=tzlocal()),
'StepName': 'EvaluateModel',
'StepStatus': 'Succeeded'}
Name/Source | Direction | Type | Association Type | Lineage Type | |
---|---|---|---|---|---|
0 | s3://...640/input/code/evaluate_model_metrics.py | Input | DataSet | ContributedTo | artifact |
1 | s3://...02-12-13-32-20-378/output/sentiment-test | Input | DataSet | ContributedTo | artifact |
2 | s3://...qwk-Train-nYXyWGwBe5/output/model.tar.gz | Input | Model | ContributedTo | artifact |
3 | 68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3 | Input | Image | ContributedTo | artifact |
4 | s3://...n-2023-02-12-13-35-32-414/output/metrics | Output | DataSet | Produced | artifact |
{'AttemptCount': 0,
'EndTime': datetime.datetime(2023, 2, 12, 14, 10, 50, 320000, tzinfo=tzlocal()),
'Metadata': {'Condition': {'Outcome': 'True'}},
'StartTime': datetime.datetime(2023, 2, 12, 14, 10, 49, 585000, tzinfo=tzlocal()),
'StepName': 'AccuracyCondition',
'StepStatus': 'Succeeded'}
None
{'AttemptCount': 0,
'EndTime': datetime.datetime(2023, 2, 12, 14, 10, 52, 545000, tzinfo=tzlocal()),
'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:912822595625:model/pipelines-h4inlmq7fqwk-createmodel-tu0lobcfq6'}},
'StartTime': datetime.datetime(2023, 2, 12, 14, 10, 51, 78000, tzinfo=tzlocal()),
'StepName': 'CreateModel',
'StepStatus': 'Succeeded'}
None
{'AttemptCount': 0,
'EndTime': datetime.datetime(2023, 2, 12, 14, 10, 52, 324000, tzinfo=tzlocal()),
'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:912822595625:model-package/bert-reviews-1676208665/1'}},
'StartTime': datetime.datetime(2023, 2, 12, 14, 10, 51, 78000, tzinfo=tzlocal()),
'StepName': 'RegisterModel',
'StepStatus': 'Succeeded'}
Name/Source | Direction | Type | Association Type | Lineage Type | |
---|---|---|---|---|---|
0 | s3://...qwk-Train-nYXyWGwBe5/output/model.tar.gz | Input | Model | ContributedTo | artifact |
1 | 76310...aws.com/pytorch-inference:1.6.0-cpu-py36 | Input | Image | ContributedTo | artifact |
2 | bert-reviews-1676208665-1-PendingManualApprova... | Input | Approval | ContributedTo | action |
3 | BERT-Reviews-1676208665-1676211052-aws-model-p... | Output | ModelGroup | AssociatedWith | context |
13 Deploy and test the model
13.1 Approve trained model
The pipeline created a model package version within the specified model package group and an approval status of PendingManualApproval
. This requires a separate step to manually approve the model before deploying to production.
We can approve the model using the SageMaker Studio UI or programmatically as shown below.
Get the model package ARN.
for execution_step in execution.list_steps():
if execution_step['StepName'] == 'RegisterModel':
= execution_step['Metadata']['RegisterModel']['Arn']
model_package_arn break
print(model_package_arn)
arn:aws:sagemaker:us-east-1:912822595625:model-package/bert-reviews-1676208665/1
Update the model package with the Approved
status to prepare for deployment.
The model must be Approved
before it can be deployed.
= sm.update_model_package(
model_package_update_response =model_package_arn,
ModelPackageArn="Approved",
ModelApprovalStatus
)
pprint(model_package_update_response)
{'ModelPackageArn': 'arn:aws:sagemaker:us-east-1:912822595625:model-package/bert-reviews-1676208665/1',
'ResponseMetadata': {'HTTPHeaders': {'content-length': '102',
'content-type': 'application/x-amz-json-1.1',
'date': 'Sun, 12 Feb 2023 14:15:24 GMT',
'x-amzn-requestid': '95e70fcf-b3f0-4925-be40-73450c40a5ec'},
'HTTPStatusCode': 200,
'RequestId': '95e70fcf-b3f0-4925-be40-73450c40a5ec',
'RetryAttempts': 0}}
13.2 Deploy model
Get the model ARN and the model name from it.
for execution_step in execution.list_steps():
print(execution_step['StepName'])
if execution_step['StepName'] == 'CreateModel':
= execution_step['Metadata']['Model']['Arn']
model_arn break
print(model_arn)
= model_arn.split('/')[-1]
model_name print(model_name)
RegisterModel
CreateModel
arn:aws:sagemaker:us-east-1:912822595625:model/pipelines-h4inlmq7fqwk-createmodel-tu0lobcfq6
pipelines-h4inlmq7fqwk-createmodel-tu0lobcfq6
13.3 Create endpoint from registry
Configure the endpoint.
= 'bert-model-epc-{}'.format(timestamp)
endpoint_config_name print(endpoint_config_name)
= sm.create_endpoint_config(
create_endpoint_config_response = endpoint_config_name,
EndpointConfigName =[{
ProductionVariants'InstanceType':'ml.m5.xlarge',
'InitialVariantWeight':1,
'InitialInstanceCount':1,
'ModelName': model_name,
'VariantName':'AllTraffic'}])
bert-model-epc-1676208665
Create the endpoint.
= 'bert-model-ep-{}'.format(timestamp)
pipeline_endpoint_name print("EndpointName={}".format(pipeline_endpoint_name))
= sm.create_endpoint(
create_endpoint_response =pipeline_endpoint_name,
EndpointName=endpoint_config_name)
EndpointConfigNameprint(create_endpoint_response['EndpointArn'])
EndpointName=bert-model-ep-1676208665
arn:aws:sagemaker:us-east-1:912822595625:endpoint/bert-model-ep-1676208665
%%time
while True:
try:
= sm.get_waiter('endpoint_in_service')
waiter print('Waiting for endpoint to be in `InService`...')
=pipeline_endpoint_name)
waiter.wait(EndpointNamebreak;
except:
print('Waiting for endpoint...')
= sm.describe_endpoint(EndpointName=pipeline_endpoint_name)['EndpointStatus']
endpoint_status print('Endpoint status: {}'.format(endpoint_status))
if endpoint_status == 'Failed':
break
30)
time.sleep(
print('Endpoint deployed.')
Waiting for endpoint to be in `InService`...
Endpoint deployed.
CPU times: user 109 ms, sys: 30.6 ms, total: 140 ms
Wall time: 4min 31s
13.4 Test model
Let’s predict the sentiment
with review_body
samples and review the result:
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONLinesSerializer
from sagemaker.deserializers import JSONLinesDeserializer
= [
inputs "features": ["I love this product!"]},
{"features": ["OK, but not great."]},
{"features": ["This is not the right product."]},
{
]
= Predictor(
predictor =pipeline_endpoint_name,
endpoint_name=JSONLinesSerializer(),
serializer=JSONLinesDeserializer(),
deserializer=sess
sagemaker_session
)
= predictor.predict(inputs)
predicted_classes
for predicted_class in predicted_classes:
print("Predicted class {} with probability {}".format(predicted_class['predicted_label'], predicted_class['probability']))
Predicted class 1 with probability 0.9203698635101318
Predicted class 0 with probability 0.44024962186813354
Predicted class -1 with probability 0.778016209602356
13.5 SageMaker Studio extensions
SageMaker Studio provides a rich set of features to visually inspect SageMaker resources including pipelines, training jobs, and endpoints.
14 Acknowledgements
I’d like to express my thanks to the great Deep Learning AI Practical Data Science on AWS Specialisation Course which i completed, and acknowledge the use of some images and other materials from the training course in this article.