Custom Models and human-in-the-loop pipelines with AWS Augmented AI (A2I)

In this project we will create our own human workforce, a human task UI, and then define the human review workflow to perform data labeling for an ML task.
aws
cloud-data-science
natural-language-processing
deep-learning
Author

Pranath Fernando

Published

February 24, 2023

1 Introduction

In earlier articles we introduced AWS cloud services for data science, and showed how it can help with different stages of the data science & machine learning workflow.

In this project we will create our own human workforce, a human task UI, and then define the human review workflow to perform data labeling for an ML task. We will make the original predictions of the labels with the custom ML model, and then create a human loop if the probability scores are lower than the preset threshold. After the completion of the human loop tasks, we will review the results and prepare data for re-training.

Let’s install and import the required modules.

import boto3
import sagemaker
import pandas as pd
from pprint import pprint
import botocore

config = botocore.config.Config(user_agent_extra='dlai-pds/c3/w3')

# low-level service client of the boto3 session
sm = boto3.client(service_name='sagemaker', 
                  config=config)

sm_runtime = boto3.client('sagemaker-runtime',
                          config=config)

sess = sagemaker.Session(sagemaker_client=sm,
                         sagemaker_runtime_client=sm_runtime)

bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = sess.boto_region_name

s3 = boto3.Session().client(service_name='s3', 
                            config=config)
cognito_idp = boto3.Session().client(service_name='cognito-idp', 
                                     config=config)
a2i = boto3.Session().client(service_name='sagemaker-a2i-runtime', 
                             config=config)

2 Set up Amazon Cognito user pool and define human workforce

The first step in the creation of the human-in-the-loop pipeline will be to create our own private workforce.

Amazon Cognito provides authentication, authorization, and user management for apps. This enables our workers to sign in directly to the labeling UI with a username and password.

We will construct an Amazon Cognito user pool, setting up its client, domain, and group. Then we’ll create a SageMaker workforce, linking it to the Cognito user pool. Followed by the creation of a SageMaker workteam, linking it to the Cognito user pool and group. And finally, we will create a pool user and add it to the group.

To get started, let’s construct the user pool and user pool client names.

import time
timestamp = int(time.time())

user_pool_name = 'groundtruth-user-pool-{}'.format(timestamp)
user_pool_client_name = 'groundtruth-user-pool-client-{}'.format(timestamp)

print("Amazon Cognito user pool name: {}".format(user_pool_name))
print("Amazon Cognito user pool client name: {}".format(user_pool_client_name))
Amazon Cognito user pool name: groundtruth-user-pool-1677153775
Amazon Cognito user pool client name: groundtruth-user-pool-client-1677153775

2.1 Create Amazon Cognito user pool

The function cognito_idp.create_user_pool creates a new Amazon Cognito user pool. Passing the function result into a variable we can get the information about the response. The result is in dictionary format.

create_user_pool_response = cognito_idp.create_user_pool(PoolName=user_pool_name)
user_pool_id = create_user_pool_response['UserPool']['Id']

print("Amazon Cognito user pool ID: {}".format(user_pool_id))
Amazon Cognito user pool ID: us-east-1_8s0SOCEPn

Let’s pull the Amazon Cognito user pool name from its description.

print(create_user_pool_response['UserPool'].keys())
dict_keys(['Id', 'Name', 'Policies', 'DeletionProtection', 'LambdaConfig', 'LastModifiedDate', 'CreationDate', 'SchemaAttributes', 'VerificationMessageTemplate', 'UserAttributeUpdateSettings', 'MfaConfiguration', 'EstimatedNumberOfUsers', 'EmailConfiguration', 'AdminCreateUserConfig', 'Arn'])
user_pool_name = create_user_pool_response['UserPool']['Name'] 
print('Amazon Cognito user pool name: {}'.format(user_pool_name))
Amazon Cognito user pool name: groundtruth-user-pool-1677153775

2.2 Create Amazon Cognito user pool client

Now let’s set up the Amazon Cognito user pool client for the created above user pool.

The Amazon Cognito user pool client implements an open standard for authorization framework, OAuth. The standard enables apps to obtain limited access (scopes) to a user’s data without giving away a user’s password. It decouples authentication from authorization and supports multiple use cases addressing different device capabilities.

Lets create the Amazon Cognito user pool client for the constructed user pool.

create_user_pool_client_response = cognito_idp.create_user_pool_client( # Replace None
    UserPoolId=user_pool_id, 
    ClientName=user_pool_client_name, 
    GenerateSecret=True, # boolean to specify whether you want to generate a secret
    # a list of provider names for the identity providers that are supported on this client, e.g. Cognito, Facebook, Google
    SupportedIdentityProviders=[
        'COGNITO' 
    ],
    # a list of the allowed OAuth flows, e.g. code, implicit, client_credentials
    AllowedOAuthFlows=[
        'code',
        'implicit'
    ],
    # a list of the allowed OAuth scopes, e.g. phone, email, openid, and profile
    AllowedOAuthScopes=[
        'email',
        'openid',
        'profile'
    ],
    # a list of allowed redirect (callback) URLs for the identity providers
    CallbackURLs=[
        'https://datascienceonaws.com', 
    ],
    # set to true if the client is allowed to follow the OAuth protocol when interacting with Cognito user pools
    AllowedOAuthFlowsUserPoolClient=True
)

client_id = create_user_pool_client_response['UserPoolClient']['ClientId']
print('Amazon Cognito user pool client ID: {}'.format(client_id))
Amazon Cognito user pool client ID: 4ebq1ga0irfdvssomfjhbh5fgq

2.3 Create Amazon Cognito user pool domain and group

Now we set up the Amazon Cognito user pool domain for the constructed user pool.

user_pool_domain_name = 'groundtruth-user-pool-domain-{}'.format(timestamp)

try:
    cognito_idp.create_user_pool_domain( 
        UserPoolId=user_pool_id, 
        Domain=user_pool_domain_name 
    )
    print("Created Amazon Cognito user pool domain: {}".format(user_pool_domain_name))
except:
    print("Amazon Cognito user pool domain {} already exists".format(user_pool_domain_name))
Created Amazon Cognito user pool domain: groundtruth-user-pool-domain-1677153775

We will use the following function to check if the Amazon Cognito user group already exists.

def check_user_pool_group_existence(user_pool_id, user_pool_group_name):  
    for group in cognito_idp.list_groups(UserPoolId=user_pool_id)['Groups']:
        if user_pool_group_name == group['GroupName']:
            return True
    return False

Now we will set up the Amazon Cognito user group.

user_pool_group_name = 'groundtruth-user-pool-group-{}'.format(timestamp)

if not check_user_pool_group_existence(user_pool_id, user_pool_group_name):
    cognito_idp.create_group( 
        UserPoolId=user_pool_id, 
        GroupName=user_pool_group_name
    )
    print("Created Amazon Cognito user group: {}".format(user_pool_group_name))
else:
    print("Amazon Cognito user group {} already exists".format(user_pool_group_name))
Created Amazon Cognito user group: groundtruth-user-pool-group-1677153775

2.4 Create workforce and workteam

We can use the following function to check if the workforce already exists. We can only create one workforce per region, therefore we’ll have to delete any other existing workforce, together with all of the related workteams.

def check_workforce_existence(workforce_name):  
    for workforce in sm.list_workforces()['Workforces']:
        if workforce_name == workforce['WorkforceName']:
            return True
        else:
            for workteam in sm.list_workteams()['Workteams']:
                sm.delete_workteam(WorkteamName=workteam['WorkteamName'])
            sm.delete_workforce(WorkforceName=workforce['WorkforceName'])
    return False

Lets create a workforce.

workforce_name = 'groundtruth-workforce-name-{}'.format(timestamp)

if not check_workforce_existence(workforce_name):
    create_workforce_response = sm.create_workforce(
        WorkforceName=workforce_name,
        CognitoConfig={
            'UserPool': user_pool_id, 
            'ClientId': client_id
        }
    )
    print("Workforce name: {}".format(workforce_name))
    pprint(create_workforce_response)
else:
    print("Workforce {} already exists".format(workforce_name))
Workforce name: groundtruth-workforce-name-1677153775
{'ResponseMetadata': {'HTTPHeaders': {'content-length': '107',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Thu, 23 Feb 2023 12:04:42 GMT',
                                      'x-amzn-requestid': '8e749026-4d1e-4758-949a-ab78fdfaafbe'},
                      'HTTPStatusCode': 200,
                      'RequestId': '8e749026-4d1e-4758-949a-ab78fdfaafbe',
                      'RetryAttempts': 0},
 'WorkforceArn': 'arn:aws:sagemaker:us-east-1:753124839657:workforce/groundtruth-workforce-name-1677153775'}

You can use the sm.describe_workforce function to get the information about the workforce.

describe_workforce_response = sm.describe_workforce(WorkforceName=workforce_name)
describe_workforce_response

We use the following function to check if the workteam already exists. If there are no workteams in the list, we will give some time for the workforce to set up.

def check_workteam_existence(workteam_name):  
    if sm.list_workteams()['Workteams']:
        for workteam in sm.list_workteams()['Workteams']:
            if workteam_name == workteam['WorkteamName']:
                return True
    else:
        time.sleep(60)
        return False
    return False

Now lets create a workteam.

workteam_name = 'groundtruth-workteam-{}'.format(timestamp)

if not check_workteam_existence(workteam_name):
    create_workteam_response = sm.create_workteam(
        Description='groundtruth workteam',
        WorkforceName=workforce_name,
        WorkteamName=workteam_name,
        # objects that identify the workers that make up the work team
        MemberDefinitions=[{
            'CognitoMemberDefinition': {
                'UserPool': user_pool_id, 
                'ClientId': client_id, 
                'UserGroup': user_pool_group_name 
            }
        }]
    )
    pprint(create_workteam_response)
else:
    print("Workteam {} already exists".format(workteam_name))
{'ResponseMetadata': {'HTTPHeaders': {'content-length': '113',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Thu, 23 Feb 2023 12:06:06 GMT',
                                      'x-amzn-requestid': 'bd89c3fa-45bb-439b-aa33-f2c685e69d8a'},
                      'HTTPStatusCode': 200,
                      'RequestId': 'bd89c3fa-45bb-439b-aa33-f2c685e69d8a',
                      'RetryAttempts': 0},
 'WorkteamArn': 'arn:aws:sagemaker:us-east-1:753124839657:workteam/private-crowd/groundtruth-workteam-1677153775'}

We can use the sm.describe_workteam function to get information about the workteam.

describe_workteam_response = sm.describe_workteam(WorkteamName=workteam_name)
describe_workteam_response
{'Workteam': {'WorkteamName': 'groundtruth-workteam-1677153775',
  'MemberDefinitions': [{'CognitoMemberDefinition': {'UserPool': 'us-east-1_8s0SOCEPn',
     'UserGroup': 'groundtruth-user-pool-group-1677153775',
     'ClientId': '4ebq1ga0irfdvssomfjhbh5fgq'}}],
  'WorkteamArn': 'arn:aws:sagemaker:us-east-1:753124839657:workteam/private-crowd/groundtruth-workteam-1677153775',
  'Description': 'groundtruth workteam',
  'SubDomain': 'aqa042udc1.labeling.us-east-1.sagemaker.aws',
  'CreateDate': datetime.datetime(2023, 2, 23, 12, 6, 5, 715000, tzinfo=tzlocal()),
  'LastUpdatedDate': datetime.datetime(2023, 2, 23, 12, 6, 7, 175000, tzinfo=tzlocal()),
  'NotificationConfiguration': {}},
 'ResponseMetadata': {'RequestId': '615a618f-d243-4c27-a8d5-f94290f6c790',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '615a618f-d243-4c27-a8d5-f94290f6c790',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '544',
   'date': 'Thu, 23 Feb 2023 12:06:06 GMT'},
  'RetryAttempts': 0}}

Now we can pull the workteam ARN either from create_workteam_response or describe_workteam_response.

workteam_arn = describe_workteam_response['Workteam']['WorkteamArn']
workteam_arn
'arn:aws:sagemaker:us-east-1:753124839657:workteam/private-crowd/groundtruth-workteam-1677153775'

2.5 Create an Amazon Cognito user and add the user to the group

We will use the following function to check if the Amazon Cognito user already exists.

def check_user_existence(user_pool_id, user_name):  
    for user in cognito_idp.list_users(UserPoolId=user_pool_id)['Users']:
        if user_name == user['Username']:
            return True
    return False

Now we create a user passing the username, temporary password, and the Amazon Cognito user pool ID.

user_name = 'user-{}'.format(timestamp)

temporary_password = 'Password@420'

if not check_user_existence(user_pool_id, user_name):
    create_user_response=cognito_idp.admin_create_user(
        Username=user_name,
        UserPoolId=user_pool_id,
        TemporaryPassword=temporary_password,
        MessageAction='SUPPRESS' # suppress sending the invitation message to a user that already exists
    )
    pprint(create_user_response)
else:
    print("Amazon Cognito user {} already exists".format(user_name))
{'ResponseMetadata': {'HTTPHeaders': {'connection': 'keep-alive',
                                      'content-length': '242',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Thu, 23 Feb 2023 12:06:07 GMT',
                                      'x-amzn-requestid': '9799ecf1-9400-4385-a696-f3067a8ee4ab'},
                      'HTTPStatusCode': 200,
                      'RequestId': '9799ecf1-9400-4385-a696-f3067a8ee4ab',
                      'RetryAttempts': 0},
 'User': {'Attributes': [{'Name': 'sub',
                          'Value': '7e22b0c1-059a-45b4-b69a-e1b378950097'}],
          'Enabled': True,
          'UserCreateDate': datetime.datetime(2023, 2, 23, 12, 6, 7, 848000, tzinfo=tzlocal()),
          'UserLastModifiedDate': datetime.datetime(2023, 2, 23, 12, 6, 7, 848000, tzinfo=tzlocal()),
          'UserStatus': 'FORCE_CHANGE_PASSWORD',
          'Username': 'user-1677153775'}}

Add the user into the Amazon Cognito user group.

cognito_idp.admin_add_user_to_group(
    UserPoolId=user_pool_id,
    Username=user_name,
    GroupName=user_pool_group_name
)
{'ResponseMetadata': {'RequestId': '18dd685f-63f6-4d5b-8f81-cd22d9304a5e',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Thu, 23 Feb 2023 12:06:08 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'connection': 'keep-alive',
   'x-amzn-requestid': '18dd685f-63f6-4d5b-8f81-cd22d9304a5e'},
  'RetryAttempts': 0}}

3 Create Human Task UI

We will create a Human Task UI resource, using a worker task UI template. This template will be rendered to the human workers whenever human interaction is required.

Below there is a simple template, that is compatible with the current use case of classifying product reviews into the three sentiment classes. For other pre-built UIs (there are 70+), check: https://github.com/aws-samples/amazon-a2i-sample-task-uis

template = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>

<crowd-form>
    <crowd-classifier name="sentiment"
                      categories="['-1', '0', '1']"
                      initial-value="{{ task.input.initialValue }}"
                      header="Classify Reviews into Sentiment:  -1 (negative), 0 (neutral), and 1 (positive)">
      
        <classification-target>
            {{ task.input.taskObject }}
        </classification-target>
      
        <full-instructions header="Classify reviews into sentiment:  -1 (negative), 0 (neutral), and 1 (positive)">
            <p><strong>1</strong>: joy, excitement, delight</p>       
            <p><strong>0</strong>: neither positive or negative, such as stating a fact</p>
            <p><strong>-1</strong>: anger, sarcasm, anxiety</p>
        </full-instructions>

        <short-instructions>
            Classify reviews into sentiment:  -1 (negative), 0 (neutral), and 1 (positive)
        </short-instructions>
    </crowd-classifier>
</crowd-form>
"""

We will now create a human task UI resource.

# Task UI name - this value is unique per account and region. 
task_ui_name = 'ui-{}'.format(timestamp)

human_task_ui_response = sm.create_human_task_ui(
    HumanTaskUiName=task_ui_name,
    UiTemplate={
        "Content": template  
    }
)
human_task_ui_response
{'HumanTaskUiArn': 'arn:aws:sagemaker:us-east-1:753124839657:human-task-ui/ui-1677153775',
 'ResponseMetadata': {'RequestId': 'a3561000-dec3-44de-b527-1c26ea8b443d',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'a3561000-dec3-44de-b527-1c26ea8b443d',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '89',
   'date': 'Thu, 23 Feb 2023 12:06:08 GMT'},
  'RetryAttempts': 0}}

Pull the ARN of the human task UI:

human_task_ui_arn = human_task_ui_response["HumanTaskUiArn"]
print(human_task_ui_arn)
arn:aws:sagemaker:us-east-1:753124839657:human-task-ui/ui-1677153775

4 Define human review workflow

In this section, we are going to create a Flow Definition. A flow Definitions allows you to specify:

  • The workforce (in fact, it is a workteam) that our tasks will be sent to.
  • The instructions that our workforce will receive (worker task template).
  • The configuration of our worker tasks, including the number of workers that receive a task and time limits to complete tasks.
  • Where our output data will be stored.

Here we are going to use the API, but we can optionally create this workflow definition in the console as well.

For more details and instructions, see: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html.

Let’s construct the S3 bucket output path.

output_path = 's3://{}/a2i-results-{}'.format(bucket, timestamp)
print(output_path)
s3://sagemaker-us-east-1-753124839657/a2i-results-1677153775

Lets construct the Flow Definition with the workteam and human task UI in the human loop configurations that we created above.

# Flow definition name - this value is unique per account and region
flow_definition_name = 'fd-{}'.format(timestamp)

create_workflow_definition_response = sm.create_flow_definition(
    FlowDefinitionName=flow_definition_name,
    RoleArn=role,
    HumanLoopConfig={
        "WorkteamArn": workteam_arn, 
        "HumanTaskUiArn": human_task_ui_arn, 
        "TaskCount": 1, # the number of workers that receive a task
        "TaskDescription": "Classify Reviews into sentiment:  -1 (negative), 0 (neutral), 1 (positive)",
        "TaskTitle": "Classify Reviews into sentiment:  -1 (negative), 0 (neutral), 1 (positive)",
    },
    OutputConfig={"S3OutputPath": output_path},
)

augmented_ai_flow_definition_arn = create_workflow_definition_response["FlowDefinitionArn"]

You can pull information about the Flow Definition with the function sm.describe_flow_definition and wait for its status value FlowDefinitionStatus to become Active.

for _ in range(60):
    describe_flow_definition_response = sm.describe_flow_definition(FlowDefinitionName=flow_definition_name)
    print(describe_flow_definition_response["FlowDefinitionStatus"])
    if describe_flow_definition_response["FlowDefinitionStatus"] == "Active":
        print("Flow Definition is active")
        break
    time.sleep(2)
Active
Flow Definition is active

5 Start human loop with custom ML model

We will now deploy a custom ML model into an endpoint and call it to predict labels for some sample reviews. We need to check the confidence score for each prediction. If it is smaller than the threshold, we will engage our workforce for a human review, starting a human loop. We can fix the labels by completing the human loop tasks and review the results.

Lets set up a sentiment predictor class to be wrapped later into the PyTorch Model.

from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONLinesSerializer
from sagemaker.deserializers import JSONLinesDeserializer

class SentimentPredictor(Predictor):
    def __init__(self, endpoint_name, sagemaker_session):
        super().__init__(
            endpoint_name, 
            sagemaker_session=sagemaker_session,
            serializer=JSONLinesSerializer(), 
            deserializer=JSONLinesDeserializer() 
        )

Now we create a SageMaker model based on the model artifact saved in the S3 bucket.

from sagemaker.pytorch.model import PyTorchModel

pytorch_model_name = 'model-{}'.format(timestamp)

model = PyTorchModel(name=pytorch_model_name,
                     model_data='s3://dlai-practical-data-science/models/ab/variant_a/model.tar.gz',
                     predictor_cls=SentimentPredictor,
                     entry_point='inference.py',
                     source_dir='src',
                     framework_version='1.6.0',
                     py_version='py3',
                     role=role)

Now we will create a SageMaker Endpoint from the model. For the purposes of this project, we will use a relatively small instance type. Please refer to this link for additional instance types that may work for your use cases outside of this lab.

%%time

pytorch_endpoint_name = 'endpoint-{}'.format(timestamp)

predictor = model.deploy(initial_instance_count=1, 
                         instance_type='ml.m5.large', 
                         endpoint_name=pytorch_endpoint_name)
----------!CPU times: user 2min 15s, sys: 9.67 s, total: 2min 24s
Wall time: 7min 24s

5.1 Start the human loop

Let’s create a list of sample reviews.

reviews = ["I enjoy this product", 
           "I am unhappy with this product", 
           "It is okay", 
           "sometimes it works"]

Now we can send each of the sample reviews to the model via the predictor.predict() API call. Note that we need to pass the reviews in the JSON format that model expects as input. Then, we parse the model’s response to obtain the predicted label and the confidence score.

After that, we check the condition for when you want to engage a human for review. We can check whether the returned confidence score is under the defined threshold of 90%, which would mean that we would want to start the human loop with the predicted label and the review as inputs. Finally, we start the human loop passing the input content and Flow Definition defined above.

import json

human_loops_started = []

CONFIDENCE_SCORE_THRESHOLD = 0.90

for review in reviews:
    inputs = [
        {"features": [review]},
    ]

    response = predictor.predict(inputs)
    print(response)
    prediction = response[0]['predicted_label']
    confidence_score = response[0]['probability']

    print('Checking prediction confidence {} for sample review: "{}"'.format(confidence_score, review))

    # condition for when we want to engage a human for review
    if confidence_score < CONFIDENCE_SCORE_THRESHOLD:
        human_loop_name = str(time.time()).replace('.', '-') # using milliseconds
        input_content = {
            "initialValue": prediction, 
            "taskObject": review 
        }
        start_loop_response = a2i.start_human_loop(
            HumanLoopName=human_loop_name,
            FlowDefinitionArn=augmented_ai_flow_definition_arn,
            HumanLoopInput={"InputContent": json.dumps(input_content)},
        )

        human_loops_started.append(human_loop_name)

        print(
            f"Confidence score of {confidence_score * 100}% for prediction of {prediction} is less than the threshold of {CONFIDENCE_SCORE_THRESHOLD * 100}%"
        )
        print(f"*** ==> Starting human loop with name: {human_loop_name}  \n")
    else:
        print(
            f"Confidence score of {confidence_score * 100}% for star rating of {prediction} is above threshold of {CONFIDENCE_SCORE_THRESHOLD * 100}%"
        )
        print("Human loop not needed. \n")
[{'probability': 0.9376369118690491, 'predicted_label': 1}]
Checking prediction confidence 0.9376369118690491 for sample review: "I enjoy this product"
Confidence score of 93.76369118690491% for star rating of 1 is above threshold of 90.0%
Human loop not needed. 

[{'probability': 0.6340296864509583, 'predicted_label': -1}]
Checking prediction confidence 0.6340296864509583 for sample review: "I am unhappy with this product"
Confidence score of 63.402968645095825% for prediction of -1 is less than the threshold of 90.0%
*** ==> Starting human loop with name: 1677154445-9813657  

[{'probability': 0.5422114729881287, 'predicted_label': 1}]
Checking prediction confidence 0.5422114729881287 for sample review: "It is okay"
Confidence score of 54.221147298812866% for prediction of 1 is less than the threshold of 90.0%
*** ==> Starting human loop with name: 1677154446-4558146  

[{'probability': 0.3931102454662323, 'predicted_label': 1}]
Checking prediction confidence 0.3931102454662323 for sample review: "sometimes it works"
Confidence score of 39.31102454662323% for prediction of 1 is less than the threshold of 90.0%
*** ==> Starting human loop with name: 1677154446-8940263  

Three of the sample reviews with the probability scores lower than the threshold went into the human loop. The original predicted labels are passed together with the review text and will be seen in the task.

5.2 Check status of the human loop

Function a2i.describe_human_loop can be used to pull the information about the human loop.

completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
    print(f"HumanLoop Name: {human_loop_name}")
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print("")

    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)
HumanLoop Name: 1677154445-9813657
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-753124839657/a2i-results-1677153775/fd-1677153775/2023/02/23/12/14/06/1677154445-9813657/output.json'}

HumanLoop Name: 1677154446-4558146
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-753124839657/a2i-results-1677153775/fd-1677153775/2023/02/23/12/14/06/1677154446-4558146/output.json'}

HumanLoop Name: 1677154446-8940263
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-753124839657/a2i-results-1677153775/fd-1677153775/2023/02/23/12/14/06/1677154446-8940263/output.json'}

5.3 Complete the human loop tasks

Now we will pull the labeling UI from the workteam information to get into the human loop tasks in the AWS console.

labeling_ui = sm.describe_workteam(WorkteamName=workteam_name)["Workteam"]["SubDomain"]
print(labeling_ui)
aqa042udc1.labeling.us-east-1.sagemaker.aws

We will navigate to a link and login with the defined username and password.

5.4 Verify that the human loops were completed by the workforce

import time

completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
    print(f"HumanLoop Name: {human_loop_name}")
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print("")
    while resp["HumanLoopStatus"] != "Completed":
        print(f"Waiting for HumanLoop to complete.")
        time.sleep(10)
        resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)
        print(f"Completed!")
        print("")
HumanLoop Name: 1677154445-9813657
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-753124839657/a2i-results-1677153775/fd-1677153775/2023/02/23/12/14/06/1677154445-9813657/output.json'}

Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Completed!

HumanLoop Name: 1677154446-4558146
HumanLoop Status: Completed
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-753124839657/a2i-results-1677153775/fd-1677153775/2023/02/23/12/14/06/1677154446-4558146/output.json'}

Completed!

HumanLoop Name: 1677154446-8940263
HumanLoop Status: Completed
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-753124839657/a2i-results-1677153775/fd-1677153775/2023/02/23/12/14/06/1677154446-8940263/output.json'}

Completed!

This process ^^ above ^^ will not complete until we label the data following the instructions above.

5.5 View human labels and prepare the data for re-training

Once the work is complete, Amazon A2I stores the results in the specified S3 bucket and sends a Cloudwatch Event. Let’s check the S3 contents.

import re
from pprint import pprint

fixed_items = []

for resp in completed_human_loops:
    split_string = re.split("s3://" + bucket + "/", resp["HumanLoopOutput"]["OutputS3Uri"])
    output_bucket_key = split_string[1]

    response = s3.get_object(Bucket=bucket, Key=output_bucket_key)
    content = response["Body"].read().decode("utf-8")
    json_output = json.loads(content)
    pprint(json_output)

    input_content = json_output["inputContent"]
    human_answer = json_output["humanAnswers"][0]["answerContent"]
    fixed_item = {"input_content": input_content, "human_answer": human_answer}
    fixed_items.append(fixed_item)
{'flowDefinitionArn': 'arn:aws:sagemaker:us-east-1:753124839657:flow-definition/fd-1677153775',
 'humanAnswers': [{'acceptanceTime': '2023-02-23T12:16:28.736Z',
                   'answerContent': {'sentiment': {'label': '-1'}},
                   'submissionTime': '2023-02-23T12:16:33.547Z',
                   'timeSpentInSeconds': 4.811,
                   'workerId': '0e31fea759d04da1',
                   'workerMetadata': {'identityData': {'identityProviderType': 'Cognito',
                                                       'issuer': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_8s0SOCEPn',
                                                       'sub': '7e22b0c1-059a-45b4-b69a-e1b378950097'}}}],
 'humanLoopName': '1677154445-9813657',
 'inputContent': {'initialValue': -1,
                  'taskObject': 'I am unhappy with this product'}}
{'flowDefinitionArn': 'arn:aws:sagemaker:us-east-1:753124839657:flow-definition/fd-1677153775',
 'humanAnswers': [{'acceptanceTime': '2023-02-23T12:16:06.376Z',
                   'answerContent': {'sentiment': {'label': '0'}},
                   'submissionTime': '2023-02-23T12:16:23.626Z',
                   'timeSpentInSeconds': 17.25,
                   'workerId': '0e31fea759d04da1',
                   'workerMetadata': {'identityData': {'identityProviderType': 'Cognito',
                                                       'issuer': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_8s0SOCEPn',
                                                       'sub': '7e22b0c1-059a-45b4-b69a-e1b378950097'}}}],
 'humanLoopName': '1677154446-4558146',
 'inputContent': {'initialValue': 1, 'taskObject': 'It is okay'}}
{'flowDefinitionArn': 'arn:aws:sagemaker:us-east-1:753124839657:flow-definition/fd-1677153775',
 'humanAnswers': [{'acceptanceTime': '2023-02-23T12:16:23.694Z',
                   'answerContent': {'sentiment': {'label': '0'}},
                   'submissionTime': '2023-02-23T12:16:28.668Z',
                   'timeSpentInSeconds': 4.974,
                   'workerId': '0e31fea759d04da1',
                   'workerMetadata': {'identityData': {'identityProviderType': 'Cognito',
                                                       'issuer': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_8s0SOCEPn',
                                                       'sub': '7e22b0c1-059a-45b4-b69a-e1b378950097'}}}],
 'humanLoopName': '1677154446-8940263',
 'inputContent': {'initialValue': 1, 'taskObject': 'sometimes it works'}}

Now we can prepare the data for re-training.

df_fixed_items = pd.DataFrame(fixed_items)  
df_fixed_items.head()
input_content human_answer
0 {'initialValue': -1, 'taskObject': 'I am unhap... {'sentiment': {'label': '-1'}}
1 {'initialValue': 1, 'taskObject': 'It is okay'} {'sentiment': {'label': '0'}}
2 {'initialValue': 1, 'taskObject': 'sometimes i... {'sentiment': {'label': '0'}}

6 Acknowledgements

I’d like to express my thanks to the great Deep Learning AI Practical Data Science on AWS Specialisation Course which i completed, and acknowledge the use of some images and other materials from the training course in this article.

Subscribe