This article has been republished with permission from Derrick mwiti, Data scientist at Layer, and was originally published on Medium
Deploying machine learning models is a small part of the entire machine learning life cycle. Once a model is in production, you have to monitor it to ensure it’s working as expected. The behavior of a model can change in unexpected ways, especially when data in the real world changes. For instance, when the data the model was trained on is significantly different from the data used for making predictions. This leads to model drift. Drift can be caused by, among other things, changes in the behavior of people, changes in data distribution, and errors in data collection.
This article will illustrate how you can use Layer and Amazon SageMaker to deploy a machine-learning model and track it using Superwise. Amazon SageMaker enables you to build, train and deploy machine learning models.
Layer helps you build, train and track all your machine learning project metadata, including ML models and datasets with semantic versioning, extensive artifact logging, and dynamic reporting with local↔cloud training.
Superwise provides tools for ensuring continuous model observability to keep your ML healthy in production. The platform enables you to create alerts using ML monitoring templates. It also offers root cause investigation by correlating and grouping anomalies to discover why models are misbehaving. Furthermore, you can use Superwise to optimize models with production observability insights to identify retaining opportunities and strategies, underperforming segments, and biases in ML decision-making processes.
Create an Amazon SageMaker notebook instance
This example is meant to run on an Amazon SageMaker instance. Start by creating one.
When creating, ensure that you select notebook-al2-v1
for the platform identifier because it has a newer Python version required to install Superwise.
Fetch a model from Layer
With the instance ready, install Layer, SageMaker, and Superwise.pip install SageMaker Superwise layer
pip install sagemaker superwise layer
For this illustration, let’s use a regression model that is already trained on the California housing dataset. Check out this example to learn how to train a model from scratch with Layer. Let’s import the model using Layer:
import layer my_model = layer.get_model("layer/california_housing/models/housing").get_train()
Deploying the model on SageMaker requires a model.joblib
file. We, therefore, have to persist the model.
# persist model import os import joblib path = os.path.join(".", "model.joblib") joblib.dump(my_model, path) print("model persisted at " + path) # > model persisted at ./model.joblib
Deploying a model with SageMaker requires a script containing the following functions:
- model function named
model_fn
- input function named
input_fn
- prediction function named
predict_fn
The function is attached to this repo.
Deploy the model to SageMaker
Deploying the model to Amazon SageMaker requires a tar file containing the model.joblib
. Build the tar file and upload it to your s3 bucket. Next, build a SKLearnModel
using the uploaded artifact and deploy it to an endpoint using the deploy
command.
#Build tar file with model data + inference code import subprocess bashCommand = "tar -cvpzf model.tar.gz model.joblib" process = subprocess.Popen(bashCommand.split(), stdout=subprocess.PIPE) output, error = process.communicate() #Bucket for model artifacts default_bucket = sess.default_bucket() print(default_bucket) boto_session = boto3.session.Session() s3 = boto_session.resource('s3') #Upload tar.gz to bucket model_artifacts = f"s3://{default_bucket}/model.tar.gz" response = s3.meta.client.upload_file('model.tar.gz', default_bucket, 'model.tar.gz') from sagemaker.sklearn.model import SKLearnModel model = SKLearnModel( name='housing-superwise', model_data=model_artifacts, role=get_execution_role(), entry_point='my_script.py', framework_version='0.23-1') predictor = model.deploy(instance_type="ml.t2.medium", initial_instance_count=1) # > Using already existing model: housing-superwise # >---------!
After deployment, test the endpoint to confirm that it’s working as expected.
import pandas as pd import numpy as np from sklearn.model_selection import train_test_split train = pd.read_csv("train.csv") X = train.drop(columns="median_house_value") y = train["median_house_value"] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42) # Prepare the input as a numpy array with record_id, feature 1... feature n # Treat the Dataframe index as the record_id indexes = X_train.index.map(str) X_train_inference = X_train.to_numpy() X_train_inference = np.insert(X_train_inference, 0, indexes, axis=1) predictions = predictor.predict(X_train_inference) # Returns a column with ID and the prediction value predictions # >array([[ 9173. , 131330.90954797], ..., [ 15795. , 199673.75455909]])
Tracking the Layer model with Superwise
You need a Superwise account to start tracking the model. After creating one, obtain your client ID and secret from your profile. Then initialize Superwise.
import os os.environ["SUPERWISE_CLIENT_ID"] = 'YOUR_SUPERWISE_CLIENT_ID' os.environ["SUPERWISE_SECRET"]='YOURSUPERWISE_SECRET' from superwise import Superwise from superwise.models.model import Model from superwise.models.version import Version from superwise.models.data_entity import DataEntity from superwise.resources.superwise_enums import FeatureType, DataEntityRole sw = Superwise()
Create the model entity
Next, create a Superwise model entity.
housing_model = Model( name="housing-layer-and-superwise", description="4.0" ) my_model = sw.model.create(housing_model) print(my_model.get_properties()) # > {'active_version_id': None, 'description': '4.0', 'external_id': '60', 'id': 60, # 'is_archived': False, 'name': 'housing-layer-and-superwise', 'time_units': ['D', 'W']}
Create a schema object
Add the prediction value, a timestamp, and the label to the training features and use the DataFrame index as the record ID. In practice, you should use an ID representing the problem you are tackling. This baseline data is used for creating a schema object. Superwise uses this to interpret the data––for instance, in understanding which column represents predictions and the labels.
baseline_data = X_train.assign( prediction=predictions[:,1].astype(float), ts=pd.Timestamp.now(), median_house_value=y_train ) # treat the Dataframe index as a record ID - for demonstration purpose only. baseline_df = baseline_data.reset_index().rename(columns={"index": "record_id"}) entities_collection = sw.data_entity.summarise( data=baseline_df, specific_roles = { 'record_id': DataEntityRole.ID, 'ts': DataEntityRole.TIMESTAMP, 'prediction': DataEntityRole.PREDICTION_VALUE, 'median_house_value': DataEntityRole.LABEL } )
Create a version object
The next step is to create a Version object for the model we are tracking.
housing_version = Version( model_id=my_model.id, name="5.0", data_entities=entities_collection, ) my_version = sw.version.create(housing_version) sw.version.activate(my_version.id) # <Response [200]>
Monitoring ongoing predictions
Now that we have a Version of the model setup with a Baseline, we can start sending ongoing model predictions to Superwise to monitor the model’s performance in a production setting.
We will treat the test split of the data as our “ongoing predictions” for this demo.
# insert the record ID as part of the input payload to the model indexes = X_test.index.map(str) X_test_inference = X_test.to_numpy() X_test_inference = np.insert(X_test_inference, 0, indexes, axis=1) predictions = predictor.predict(X_test_inference) predictions # array([[ 10941. , 143357.7423556 ], ..., [ 1275. , 154390.03384507]]) # Note: we provide the column names we declared in the Schema object, # so that Superwise.ai will be able to interpret the data ongoing_predictions = pd.DataFrame(data=predictions, columns=["record_id", "prediction"]) ongoing_predictions["record_id"] = ongoing_predictions["record_id"].astype(int) ongoing_features = X_test.copy() ongoing_features['record_id'] = X_test.index ongoing_data = ongoing_features.merge(ongoing_predictions, how='left', on='record_id') ongoing_data['ts'] = pd.Timestamp.now()
Next, log the production data to Superwise and check the status.
def chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n] # Log the production data in Superwise (Max chunck size = 1000 predictions) ongoing_data_chuncks = chunks(ongoing_data.to_dict(orient='records'), 1000) transaction_ids = list() for ongoing_data_chunck in ongoing_data_chuncks: transaction_id = sw.transaction.log_records( model_id=my_model.id, version_id=my_version.name, records=ongoing_data_chunck ) transaction_ids.append(transaction_id) transaction_id = sw.transaction.get(transaction_id=transaction_ids[0]['transaction_id']) transaction_id.get_properties()['status'] # Passed'
Don’t forget to delete the endpoint used for the demonstration.
sm_boto3.delete_endpoint(EndpointName=predictor.endpoint)
Final thoughts
In this article, we have demonstrated how to integrate a SageMaker-based development workflow with Superwise and Layer. We have also seen how to set up Superwise to start tracking your models by registering and providing a baseline for the model’s behavior. Finally, we have looked at sending new predictions from your model to Superwise, simulating a post-deployment scenario. We are confident that with this new partnership, your production models will never drift again.
Resources
Everything you need to know about AI direct to your inbox
Superwise Newsletter
Superwise needs the contact information you provide to us to contact you about our products and services. You may unsubscribe from these communications at any time. For information on how to unsubscribe, as well as our privacy practices and commitment to protecting your privacy, please review our privacy policy.