No-code model observability

With no-code integration, any Superwise user can now connect a model and define the model’s schema, and log production data via our UI with just an excel file. 

Everything you need to know about drift in machine learning

What keeps you up at night? If you’re an ML engineer or data scientist, then drift is most likely right up there on the top of the list. But drift in machine learning comes in many forms and variations. Concept drift, data drift, and model drift all pop up on this list, but even they only scratch the surface of much more nuanced problems. In this article, we’ll review all the types of drift and the nuances of each of them, as well as best practices to monitor, detect, investigate, and resolve drift in machine learning. 

More posts in this series:

  1. Everything you need to know about drift in machine learning
  2. Data drift detection basics
  3. Concept drift detection basics
  4. A hands-on introduction to drift metrics
  5. Common drift metrics
  6. Troubleshooting model drift
  7. Model-based techniques for drift monitoring

Intro

“The only constant in life is change” ( Heraclitus of Ephesus, ~500 BC) 

Our world is constantly changing. And what’s true and obvious today may not be so tomorrow. This is undoubtedly on-point in the world of machine learning. The ultimate goal of machine learning models is to extract patterns from past data and use them to predict future behavior for unseen instances. These patterns, also referred to as ‘concepts,’ are fundamental to machine learning because they help classify data and recognize relationships between different variables. 

Concept drift example: Sesame Street Bert vs. NLP BERT
Concept drift example: Sesame Street Bert vs. NLP BERT

When these relationships change in the real world–as they inevitably do–the patterns our model learned become invalid and can limit the model’s predictive power. This ‘model drift’  tends to happen when a model moves from development to a live production environment, when the data changes, or when situations in the real world change. 

What is drift in machine learning?

We tend to assume that a model will make correct predictions once it goes into production. After all, if we didn’t think so, we would continue to retrain the model until we felt it worked well enough to be deployed to the real world. However, most of us can attest to the fact that the real-world waits for no one, and that holds true for the data our models run on. Meaning that from day 1, the data that our models utilize to make predictions is already different from the data on which they trained. And depending on the degree of change, our models may suffer from model drift and model decay, unwanted bias, or even just being suboptimal given the type of drift we are faced with. Drift may signal that our results will worsen over time or show suboptimal performance in specific slices of data or populations.  

What causes drift in ML? It can be anything from errors in data collection, changes in the way people behave, or even time gaps that alter what is considered a good prediction and what is not. An example of model drift could be an ML model whose algorithms help approve or reject loans for bank customers. In the past, most people requesting loans were between the ages of 32 and 40, with specific behavior profiles. But today, 80% of loan requests are from people under the age of 30. In this case, the model must be optimized for the new mix of data, or it will offer incorrect predictions when it comes to who is a good loan candidate. 

As more and more machine learning models are deployed and used in live environments for real-world applications, model drift has become a major issue. In our digital and big data era, it’s unrealistic to expect data distributions to remain stable over a long period of time. This means drift is a top concern for data science teams scaling their use of ML. Let’s face it, the amount of time we spend maintaining model health is going up exponentially. It has definitely become a core—and painful—part of the day-to-day tasks for any team maintaining models in a live production environment. 

Types of drift in machine learning for $800 please

Drift in machine learning

‍Concept drift or model drift is sometimes used as a generic term to describe any changes in the statistical properties of the data. Mathematically, it indicates changes in the distribution P(y | X), which describes the relationship between the predictors and the target variables. The common formal definition for concept drift is: “a change in the joint probability distribution, i.e., Pt(X,y)  ≠  Pt+(X,y).” It’s also referred to as ‘dataset shift.’ We can decompose the joint probability P(X,y) into smaller components to better understand what changes in these components can trigger concept drift:

  • Covariate shift P(X) – Also known as input drift, data drift, or population drift, covariate shift occurs when there are changes in the distribution of the input variables (i.e., features). This is the case in our example above, where the age of people asking for loans evolves over time. It may happen for technical reasons, like a change in the data source pipeline or sensors that become inaccurate over time. Or, it may be caused by changes in the population, such as new types of customers, trends, or lifestyle fluctuations. Covariate shift can be detected on a univariate level, also referred to as feature drift, but it may also be analyzed on a multivariate level across the entire feature space distribution. 
  • Prior probability shift P(y) – Sometimes referred to as label drift, unconditional class shift, or prior probability shift, this drift occurs when there are changes in the distribution of the class variable (y). Two typical examples are spam and fraud detection models, where the proportion of spam emails or fraud can significantly vary over time. For example, email phishing attacks and coronavirus scams spiked during the covid pandemic. 
  • Posterior class shift P(y | X) – Also known as conditional change, concept shift, or ‘real concept drift,’ this refers to changes in the relationship between the input variables and the target variables. Take BERT or CORONA, for example, just a few years ago, searches for these terms would turn up our favorite childhood character from Sesame Street and a Mexican beer that you drink with a lemon wedge. Today, these same searches are dominated by a deep learning framework used for text and articles on Covid-19. This is typically the hardest type of drift to detect. It is also the most dramatic type because it can lead to changes in the decision boundary and necessitate updates to the model.

Some researchers also distinguish between ‘real’ and ‘virtual’ concept drifts, where real concept drift refers to changes in P(y | X) and virtual concept drift refers to changes in P(X) or P(y) that don’t affect the decision boundaries or the posterior probabilities P(y | X). Although these ‘virtual’ changes appear to be less serious, they tend to be a side effect of the real ones. It’s hard to imagine a real-world application in which  P(X) changes without impacting P(y | X). 

Real vs virtual concept drift
Real vs. virtual concept drift

Although researchers may need to distinguish between real and virtual drift, we need to monitor real-world applications for all types of drifts. Here’s why:

  1. Every kind of drift can lead to poor performance. Having a sudden peak in the frequency of certain types of data types (i.e., changes in P(X) or P(y)) can make it harder to classify these cases correctly. 
  2. Examining different types of drift can help us understand what is causing the more serious ones, help diagnose potential problems with the model, and make it easier to choose the right path for fast resolution. 

What causes drift?

Drift in machine learning can occur for any number of reasons, but these causes generally fall into two main groups: bad training data and changing environments. 

Bad training data that doesn’t accurately represent real-world situations is also known as unrepresentative training data:

  • Sample selection bias – This occurs when the training data was collected or prepared using a biased or flawed method. It doesn’t reliably represent the operating environment where the model will be deployed.
  • Changes in hidden variables – Hidden variables can’t be measured directly but have a tremendous influence on some of the observed variables. Essentially, a change in hidden variables will change the data we observe. Even if there is no actual drift from the data source, there may still be changes that look similar to concept drift. For example, if we want to predict the number of visitors to an amusement park, we might look at the weather, the day of the week, holidays, etc. But the general economic situation or the general public mood can have an even more significant influence, as could a local tragedy or a big win by the local sports team. Although these factors cannot be measured directly, they have a crucial impact on the number of visitors (our target). 

Changing environments

  • Dynamic environment – This is the more basic and intuitive case of instability, where the change in data and relations is beyond our control. Some examples are:
  1. Any system that follows users’ personal interests, such as advertisements customized for constantly changing preferences. 
  2. Use cases affected by weather, such as traffic predictions, where the data used to train the model may no longer be relevant. 
  3. Changes in the market may be caused by new competitors or a company moving in with new pricing models. 
  4. Changes in regulations.
  • Technical issues – These issues can be caused by a broken data pipeline or changes upstream in one of the feature’s values. This may be caused by a bug, some schema change, or even a change in a default value. 
  • Adversarial classification problems – Some common examples are spam filtering, network intrusion detection, or fraud detection, where attackers change their methods to bypass the model.
  • Deliberate business actions – These can include launching a marketing campaign that attracts new types of users or changes in a website that affect the users’ behavior.
  • Domain shift – This refers to changes in the meaning of values or terms. For example, inflation reduces the value of money, which means that an item’s price or a person’s income will have different effects at different times. Another example is a change in the meaning of terms. For example, a web search for ‘corona’ will retrieve completely different results in 2022 compared to 2019.
  • Hidden feedback loops – In many cases, deploying a model in a live environment inevitably changes that environment and invalidates the assumptions of the initial model in the process. For example, a churn prediction model will use the historical DB of user engagement to predict the chances of a given user abandoning the product. Typically, whenever the model predicts that a user is likely to churn, the marketing department will contact the user. If retention efforts work and the user stays, the dataset will now contain concept drift–because the user who was predicted as likely to churn has stayed put. 

What are drift patterns, and why do they matter?

The word ‘drift’ generally implies a gradual change over time. But these changes in data distribution over time can manifest in different shapes and sizes. Here are some of the different ways they take place based on transition speed: 

  • Gradual –  A gradual transition will happen over time when new concepts come into play. For example, in a movie recommendation task, movies, genres, and user preferences all change gradually over time. 
  • Sudden – A drift can happen suddenly, for example, when a sensor is replaced by another one with a different calibration.
  • Incremental – Drift can also happen in a sequence of small steps, such that it is only noticed after a long period of time. An example may be when a sensor wears down and becomes less accurate over time. 
  • Blip – Spikes or blips are basically outliers or one-off occurrences that influence the model. This may be a war or some exceptional event. Of course, there may be recurring anomalies that take place—with no clue as to when they may happen again. 
Patterns of drift in machine learning
Patterns of drift in machine learning

ML drift detection

These different patterns in which models drift bring us directly to the issue of detecting ML drift. Because drift in the wild takes different forms, different methods are needed to detect each. Take, for example, detecting seasonal model drift, which requires methods that rely on time series analysis; these know how to decompose seasonal aspects versus statistical process controls that are primarily used to detect sudden or outlier changes. There are a few common statistical measures used to calculate these drifts, and in the next post on the subject, we’ll dive into that as well. 

Last words

As the ML industry and MLOps, in particular, become increasingly more mature, it’s essential to align the terminology among practitioners. Drift is a fundamental concept that needs to be understood by every ML practitioner. While general macro drift may occur occasionally, smaller local drifts on specific subpopulations or segments happen on a frequent basis and usually get by under the radar. While concept drift is often referred to as one major issue, as we saw in this post, it can be divided into many different types of drift in ML that happen for various reasons and may appear in different formats. Understanding all the possible patterns and types is an important step in detecting them efficiently and understanding how to deal with and fix their actual root cause.

Ready to get started with Superwise?

Head over to the Superwise platform and get started with easy, customizable, scalable, and secure model observability for free with our community edition.

Prefer a demo?

Request a demo and our team will show what Superwise can do for your ML and business. 

  1. Data Distribution Shifts and Monitoring by Chip Huyan
  2. Seasonality in holiday data and appropriate retraining strategies
  3. A Survey on Concept Drift Adaptation
  4. Ensemble learning for data stream analysis: A survey
  5. A unifying view on dataset shift in classification
  6. Learning under Concept Drift: A Review
  7. Characterizing Concept Drift
  8. Understanding Concept Drift

Putting together a continuous ML stack

Due to the increased usage of ML-based products within organizations, a new CI/CD like paradigm is on the rise. On top of testing your code, building a package, and continuously deploying it, we must now incorporate CT (continuous training) that can be stochastically triggered by events and data and not necessarily dependent on time-scheduled triggers.

The following post will show how fast and easy it is to set up a robust training-serving pipeline that will execute automatically based on production data and ongoing events. Notebook and repo included.

Intro

As the ML ecosystem grows, more and more companies are adopting and integrating ML-powered solutions for internal use and customer-facing products. However, as we all know, machine learning algorithms are a bit of a black box. While experimentations and development during the data science phase may look promising, when the time comes for your models to contend with the real world, many things can go wrong due to constantly evolving data profiles.

Machine learning-based software adds an extra layer of complexity to the traditional CI/CD pipeline, the reasons being:

Team skills

In an ML project, the team usually includes data scientists or ML researchers who focus on exploratory data analysis, model development, and experimentation. These members might not be experienced software engineers who can build production-class services.

Development

ML is experimental in nature. You should try different features, algorithms, modeling techniques, and parameter configurations to find what works best for the problem as quickly as possible. The challenge is tracking what worked and what didn’t and maintaining reproducibility while maximizing code reusability.

Testing

Testing an ML system is more involved than testing other software systems. In addition to typical unit and integration tests, you need data validation, trained model quality evaluation, and model validation.

Deployment

In ML systems, deployment isn’t as simple as deploying an offline-trained ML model as a prediction service. ML systems can require you to deploy a multi-step pipeline to automatically retrain and deploy the model. This pipeline adds complexity and requires you to automate steps that are manually done before deployment by data scientists to train and validate new models.

Production 

ML models can have reduced performance not only due to suboptimal coding but also due to constantly evolving data profiles. In other words, models can decay in more ways than conventional software systems, and you need to consider this degradation. Therefore, you need to track summary statistics of your data and monitor the online performance of your model to send notifications or roll back when values deviate from your expectations.

MLOps: Continuous delivery and automation pipelines in machine learning, Google

In production, without the ability to observe, detect, and automatically fix unexpected behavior, an ML-infused product is on the highway to failure.

While tackling each of the points above can be a great topic for a book, our goal in this post is to demonstrate how we can achieve a robust training pipeline that will be triggered by training-serving data skew.

Prerequisites:

GCP Stack:

We will use the following GCP components: 

Vertex pipeline (Kubeflow based) for the training-serving pipeline.

  • Alternatives: airflow, Jenkins, Argo, etc.

Vertex model & endpoint for serving our model to a production-like environment

  • Alternatives: Seldon, mlflow, TensorFlow Serving, etc.,

Google storage – for storing the artifact, pipeline outputs, and our trained model before deployment

  • Alternatives: any file system & artifact registry

Google artifactory registry to store our custom predictor image

  • Alternatives: dockerhub

Google Cloud function – to simulate an HTTP webhook that will trigger a retraining pipeline

  • Alternatives: web server application, AWS lambda, etc.

Assets:

Infrastructure setup

Let’s go ahead and create a service user. This service user will be responsible for performing all the GCP operations from within the customized docker image. In the GCP navigation menu -> IAM -> service accounts -> create. After the service account has been created, generate a JSON key file. This key file will be stored later on inside the image, so keep it in a safe place for now (Google tutorial).

In a new terminal, set a new environment variable:

export GOOGLE_APPLICATION_CREDENTIALS=<path_to_key_file>

And run

gcloud init.

Authenticate your Google account and follow the instructions to set up the connection. From here on out, we can use gcloud and gsutil from the terminal to perform actions in GCP.

To enable all the GCP components mentioned above, run the following:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

“Operation “operations/acf.p2-<some unique id>” finished successfully.” should be printed.

Now that we have all the necessary components enabled, let’s start writing our first pipeline. Create a new venv and install the relevant packages:

python -m venv venv
. ./venv/bin/activate
pip install google-cloud-aiplatform==1.11.0 kfp google_cloud_pipeline_components

Pipeline overview

The end goal of this post is to have a training-serving pipeline in place that will be initiated in the event of a distribution shift in the production data.

Goals:

  • Basic serving pipeline
  • ML model orchestration
  • Adding monitoring
  • Adding deployment components to the basic pipeline
  • Simulate real-time data script
  • Auto-retrain

Let’s start writing some code

In a new script (pipeline.py) import all relevant packages:

import os
import sys
from typing import List, NamedTuple
from datetime import datetime
from google.cloud import aiplatform, storage
from google.cloud.aiplatform import gapic as aip
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Input, Output, Model, Metrics, Dataset, HTML
USERNAME = "<lowercase user name>"
BUCKET_NAME = "gs://<USED BUCKET>"
REGION = "<REGION>"
PROJECT_ID = "<GCP PROJECT ID>" # use `gcloud config list --format 'value(core.project)` to get it
PROJECT_NUMBER = "<GCP PROJECT NUMBER>" # can be retrieved from GCP console
PIPELINE_NAME = f"diamonds-predictor-pipeline-by-{USERNAME}"
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)
PIPELINE_ROOT = "{}/{}_pipeline_root/workshop".format(BUCKET_NAME, USERNAME)
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_NAME)

Now let’s start building the pipeline. kfp packages offer us many different objects to assemble our pipeline with, of which the two main ones are:

Component – a self-contained set of instructions to perform one step in the ML workflow.

Pipeline – chained components that are performed in a graph sequence and describe the entire ML workflow.

Our goal is to build a pipeline that will do the following:

  1. Extract the Diamonds dataset, and use only the >10,000 priced diamonds for training.
  2. Validate the dataset – some simple feature engineering.
  3. Prepare the dataset for training.
  4. Train a RandomForestRegressor on the training data.
  5. Evaluate our model on the test data.
  6. Validate that the model is production-ready.
  7. Generate a model and a version in our monitoring system.
  8. Deploy our model to an endpoint.

Steps 1-6 will usually be your data science team’s responsibility (therefore, we’ve simplified this part here), while 7-8 are the engineering part. Kubeflow allows us to generate for each component (python function) a standalone JSON file that can be shared between different flows.

Load dataset component

@component(packages_to_install=["pandas"])
def load_data(dataset: Output[Dataset]):
    import pandas as pd
    df = pd.read_csv("https://www.openml.org/data/get_csv/21792853/dataset")
    df = df[df["price"] < 10000]
    print("Load Data: ", df.head())
    df.to_csv(dataset.path, index=False)

As you can see, the component is a decorator that receives the packages needed for that pipeline. Since the Kubeflow Component is self-contained, the pandas package will be installed during the generation of the container. Another point worth mentioning is the argument type – Output [Dataset], which is a Kubeflow object that will hold a path parameter we can use during runtime.

Now we’ll read the diamonds dataset, filter the >10000 priced diamonds and write it to the dataset.path url generated.

Diamonds dataset context

This classic dataset contains prices and other attributes of almost 54,000 diamonds. It’s a great dataset for beginners learning to work with data analysis and visualization.

What it contains

  • Price in US dollars (\$326–\$18,823)
  • Carat weight of the diamond (0.2–5.01)
  • Cut quality of the diamond (Fair, Good, Very Good, Premium, Ideal)
  • Diamond color, from J (worst) to D (best)
  • Clarity – a measurement of how clear the diamond is (I1 (worst), SI2, SI1, VS2, VS1, VVS2, VVS1, IF (best))
  • x length in mm (0–10.74)
  • y width in mm (0–58.9)
  • z depth in mm (0–31.8)
  • Depth total depth percentage = z / mean(x, y) = 2 * z / (x + y) (43–79)
  • Table width of the top of the diamond relative to widest point (43–95)

Validate dataset component

@component(packages_to_install=["pandas"])
def validate_data(df: Input[Dataset], validated_df: Output[Dataset]):
    import pandas as pd
    df = pd.read_csv(df.path)
    print("Validate_data: ", df.head())
    BINARY_FEATURES = []
    # List all column names for numeric features
    NUMERIC_FEATURES = ["carat", "depth", "table", "x", "y", "z"]
    # List all column names for categorical features
    CATEGORICAL_FEATURES = ["cut", "color", "clarity"]
    # ID column - needed to support predict() over numpy arrays
    ID = ["record_id"]
    TARGET = "price"
    ALL_COLUMNS = ID + BINARY_FEATURES + NUMERIC_FEATURES + CATEGORICAL_FEATURES
    # define the column name for the target
    df = df.reset_index().rename(columns={"index": "record_id"})
    for n in NUMERIC_FEATURES:
        df[n] = pd.to_numeric(df[n], errors="coerce")
    df = df.fillna(df.mean(numeric_only=True))
    def data_selection(df: pd.DataFrame, selected_columns: List[str]):
        selected_columns.append(TARGET)
        data = df.loc[:, selected_columns]
        return data
    ## Feature selection
    df = data_selection(df, ALL_COLUMNS)
    return df.to_csv(validated_df.path, index=False)

This time we use the Input[Dataset] to understand where to output from the last step was written to, which we can then use for loading data. The output of this component is a validated dataset without nulls and correct values.

Prepare for the training component

@component(packages_to_install=["scikit-learn==1.0.2", "pandas"])
def prepare_data(
    df: Input[Dataset],
    X_train: Output[Dataset],
    y_train: Output[Dataset],
    X_test: Output[Dataset],
    y_test: Output[Dataset],
):
    import pandas as pd
    from sklearn.model_selection import train_test_split
    target = "price"
    df = pd.read_csv(df.path)
    print("Prepare data: ", df.head())
    X, y = df.drop(columns=[target]), df[target]
    X_train_data, X_test_data, y_train_data, y_test_data = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    X_train_data.to_csv(X_train.path, index=False)
    y_train_data.to_csv(y_train.path, index=False)
    X_test_data.to_csv(X_test.path, index=False)
    y_test_data.to_csv(y_test.path, index=False)

To prepare the data, we will use train_test_split from sklearn, therefore, we will add ”scikit-learn==1.0.2” to the packages_to_install.

Train the model component

@component(packages_to_install=["scikit-learn==1.0.2", "pandas", "joblib"])
def train_model(
    X_train: Input[Dataset],
    y_train: Input[Dataset],
    model_artifact: Output[Model],
):
    import joblib
    import pandas as pd
    from sklearn.pipeline import Pipeline
    from sklearn.impute import SimpleImputer
    from sklearn.compose import ColumnTransformer
    from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.model_selection import cross_val_score
    # List all column names for numeric features
    NUMERIC_FEATURES = ["carat", "depth", "table", "x", "y", "z"]
    # List all column names for categorical features
    CATEGORICAL_FEATURES = ["cut", "color", "clarity"]
    # ID column - needed to support predict() over numpy arrays
    ID = ["record_id"]
    ALL_COLUMNS = ID + NUMERIC_FEATURES + CATEGORICAL_FEATURES
    X, y = pd.read_csv(X_train.path), pd.read_csv(y_train.path)
    X = X.loc[:, ALL_COLUMNS]
    print("Trainning model X:", X.head(), "Y: ", y.head())
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler()),
        ]
    )
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("cat", OneHotEncoder(handle_unknown="ignore")),
        ]
    )
    preprocessor = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, NUMERIC_FEATURES),
            ("cat", categorical_transformer, CATEGORICAL_FEATURES),
        ],
        remainder="drop",
        n_jobs=-1,
    )
    # We now create a full pipeline, for preprocessing and training.
    # for training we selected a RandomForestRegressor
    model_params = {"max_features": "auto",
                    "n_estimators": 500,
                    "max_depth": 9,
                    "random_state": 42}
    regressor = RandomForestRegressor()
    regressor.set_params(**model_params)
    # steps=[('i', SimpleImputer(strategy='median'))
    pipeline = Pipeline(
        steps=[("preprocessor", preprocessor), ("regressor", regressor)]
    )
    # For Workshop time efficiency we will use 1-fold cross validation
    score = cross_val_score(
        pipeline, X, y, cv=10, scoring="neg_root_mean_squared_error", n_jobs=-1
    ).mean()
    print("finished cross val")
    # Now we fit all our data to the classifier.
    pipeline.fit(X, y)
    # Upload the model to GCS
    joblib.dump(pipeline, model_artifact.path, compress=3)
    model_artifact.metadata["train_score"] = score

Voilà! We just read the X_train and y_train outputs from the previous step, created a transformer for categorical features, built a random forest regressor model, evaluated training performance based on the mean RMSE of a 10-fold cross-validation, and wrote the model to a temporary location generated by the Output[Model] object.

Evaluate the model component

@component(
    packages_to_install=["scikit-learn==1.0.2", "pandas", "seaborn", "matplotlib"]
)
def evaluate_model(
    model_artifact: Input[Model],
    x_test: Input[Dataset],
    y_test: Input[Dataset],
    model_performance: Output[Metrics],
    html: Output[HTML],
):
    import joblib
    import io
    import base64
    import seaborn as sns
    import pandas as pd
    import matplotlib.pyplot as plt
    
    from math import sqrt
    from sklearn.metrics import mean_squared_error, r2_score
    model = joblib.load(model_artifact.path)
    y_test = pd.read_csv(y_test.path)["price"]
    y_pred = model.predict(pd.read_csv(x_test.path))
    model_performance.metadata["rmse"] = sqrt(mean_squared_error(y_test, y_pred))
    model_performance.metadata["r2"] = r2_score(y_test, y_pred)
    model_performance.log_metric("r2", model_performance.metadata["r2"])
    model_performance.log_metric("rmse", model_performance.metadata["rmse"])
    df = pd.DataFrame({"predicted Price(USD)": y_pred, "actual Price(USD)": y_test})
    def fig_to_base64(fig):
        img = io.BytesIO()
        fig.get_figure().savefig(img, format="png", bbox_inches="tight")
        img.seek(0)
        return base64.b64encode(img.getvalue())
    encoded = fig_to_base64(
        sns.scatterplot(data=df, x="predicted Price(USD)", y="actual Price(USD)")
    )
    encoded_html = "{}".format(encoded.decode("utf-8"))
    html_content = '<html><head></head><body><h1>Predicted vs Actual Price</h1>\n<img src="data:image/png;base64, {}"></body></html>'.format(
        encoded_html
    )
    with open(html.path, "w") as f:
        f.write(html_content)

In order to evaluate the model performance, we will run the model from the training step on the x_test and y_test datasets, calculate the RMSE and r2 and eventually generate a small HTML with a scatterplot for later use.

Validate the model component

@component(packages_to_install=["scikit-learn==1.0.2", "pandas"])
def validate_model(
    new_model_metrics: Input[Metrics],
    new_model: Input[Model],
    dataset: Input[Dataset],
    baseline: Output[Dataset],
    model_metrics: Output[Metrics],
) -> NamedTuple("output", [("deploy", str)]):
    import joblib
    import pandas as pd
    
    from math import sqrt
    from sklearn.metrics import mean_squared_error, r2_score
    target = "price"
    validation_data = pd.read_csv(dataset.path)
    X, y = validation_data.drop(columns=[target]), validation_data[target]
    model = joblib.load(new_model.path)
    y_pred = model.predict(X)
    rmse = sqrt(mean_squared_error(y, y_pred))
    r2 = r2_score(y, y_pred)
    train_score = new_model.metadata["train_score"]
    print("new model rmse cross validation mean score: ", train_score)
    print("new model train rmse: ", new_model_metrics.metadata["rmse"])
    print("new model train r2: ", new_model_metrics.metadata["r2"])
    print("new model validation rmse: ", rmse)
    print("new model validation r2: ", r2)
    model_metrics.log_metric("rmse", rmse)
    model_metrics.log_metric("r2", r2)
    validation_data["predictions"] = y_pred
    validation_data.to_csv(baseline.path, index=False)
    if (
        rmse <= new_model_metrics.metadata["rmse"]
        and new_model_metrics.metadata["r2"] >= 0.95
        and abs(train_score) < 1000
    ):
        return ("true",)
    return ("false",)

Read the entire dataset from the data validation step (before splitting to x_train, x_test, y_train, y_test), run your model on the entire dataset, and check its performance.

In this case, if the 3 conditions are met, then you can return True and deploy the model to the endpoint.

Now that we’ve completed the first 6 steps let’s assemble the pipeline.

@pipeline(
    name=PIPELINE_NAME,
    description="An ml pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def ml_pipeline():
    raw_data = load_data()
    validated_data = validate_data(raw_data.outputs["dataset"])
    prepared_data = prepare_data(validated_data.outputs["validated_df"])
    trained_model_task = train_model(
        prepared_data.outputs["X_train"], prepared_data.outputs["y_train"]
    )
    evaluated_model = evaluate_model(
        trained_model_task.outputs["model_artifact"],
        prepared_data.outputs["X_test"],
        prepared_data.outputs["y_test"],
    )
    validated_model = validate_model(
        new_model_metrics=evaluated_model.outputs["model_performance"],
        new_model=trained_model_task.outputs["model_artifact"],
        dataset=validated_data.outputs["validated_df"],
    )

It’s as simple as that!

We used @pipeline decorator, with pipeline root, to define where all our data will be stored and read from. Notice the arguments for each of the components we wrote above is shown here as the output of the next component. For example, train_model component will get the X_train and y_train parameters from prepare_data component. While evaluate_model will get X_test and y_test.

Let’s trigger this pipeline and see the execution graph!

## GET UNIQUE VALUE
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
ml_pipeline_file = "ml_pipeline.json"
compiler.Compiler().compile(
    pipeline_func=ml_pipeline, package_path=ml_pipeline_file
)
job = aiplatform.PipelineJob(
    display_name="diamonds-predictor-pipeline",
    template_path=ml_pipeline_file,
    job_id="basic-pipeline-{}-{}".format(USERNAME, TIMESTAMP),
    enable_caching=True,
)
job.submit()

You will now see the new ‘ml_pipeline.json’ that describes the execution of this pipeline. Super useful for sharing pipelines between different teams.

Head over to Vertex in your GCP console and click pipeline, you should see your pipeline:

Putting together a continuous ML stack - Vertex GCP console pipeline

As we said, each of the steps here will have a VM allocated for the runtime, in addition, we can see the outputs and inputs of each step. (I encourage you to explore this to find some easter eggs 🙂 )

Let’s head next to the MLOps-ish part.

Google lets us use many common libraries (tensorflow, sklearn, XGDBoost) out of the box. However, we need to modify it a bit. In this stage, we’ll write a small flask app that will wrap our predictor and log every prediction to our monitoring system.

Our Flask App:

import logging
import os
from flask import Flask, jsonify, request
from predictor.predictor import DiamondPricePredictor
app = Flask("DiamondPricePredictor")
gunicorn_logger = logging.getLogger("gunicorn.error")
app.logger.handlers = gunicorn_logger.handlers
app.logger.setLevel(gunicorn_logger.level)
predictor = DiamondPricePredictor(os.environ["MODEL_PATH"])
@app.route("/diamonds/v1/predict", methods=["POST"])
def predict():
   """
   Handle the Endpoint predict request.
   """
   predictions = predictor.predict(request.json["instances"])
   return jsonify(
       {
           "predictions": predictions["predicted_prices"],
           "transaction_id": predictions["transaction_id"],
       }
   )
@app.route("/diamonds/v1", methods=["GET"])
def healthcheck():
   """
   Vertex AI intermittently performs health checks on your
   HTTP server while it is running to ensure that it is
   ready to handle prediction requests.
   """

Only 2 endpoints are needed, predict and health check.

The payload for prediction is in the form:

"instances": [
           {
               "carat" : 1.42, "clarity" : "VVS1", "color" : "F", "cut" : "Ideal", "depth" : 60.8, "record_id" : 27671, "table" : 56, "x" : 7.25, "y" : 7.32, "z" : 4.43
           },
           {
               "carat" : 2.03, "clarity" : "VS2", "color" : "G", "cut" : "Premium", "depth" : 59.6, "record_id" : 27670, "table" : 60, "x" : 8.27, "y" : 8.21, "z" : 4.91
           }
           ]
       }

Our predictor script:

import os
from tempfile import TemporaryFile
import joblib
import pandas as pd
from google.cloud import storage
from superwise import Superwise
CLIENT_ID = os.getenv("SUPERWISE_CLIENT_ID")
SECRET = os.getenv("SUPERWISE_SECRET")
SUPERWISE_MODEL_ID = os.getenv("SUPERWISE_MODEL_ID")
SUPERWISE_VERSION_ID = os.getenv("SUPERWISE_VERSION_ID")
class DiamondPricePredictor(object):
   def __init__(self, model_gcs_path):
       self._model = self._set_model(model_gcs_path)
       self._sw = Superwise(
           client_id=os.getenv("SUPERWISE_CLIENT_ID"),
           secret=os.getenv("SUPERWISE_SECRET")
       )
   def _send_monitor_data(self, predictions):
       """
       send predictions and input data to Superwise
       :param pd.Serie prediction
       :return str transaction_id
       """
       transaction_id = self._sw.transaction.log_records(
           model_id=int(os.getenv("SUPERWISE_MODEL_ID")),
           version_id=int(os.getenv("SUPERWISE_VERSION_ID")),
           records=predictions
       )
       return transaction_id
   def predict(self, instances):
       """
       apply predictions on instances and log predictions to Superwise
       :param list instances: [{record1}, {record2} ... {record-N}]
       :return dict api_output: {[predicted_prices: prediction, transaction_id: str]}
       """
       input_df = pd.DataFrame(instances)
       # Add timestamp to prediction
       input_df["predictions"] = self._model.predict(input_df)
       # Send data to Superwise
       transaction_id = self._send_monitor_data(input_df)
       api_output = {
           "transaction_id": transaction_id,
           "predicted_prices": input_df["predictions"].values.tolist(),
       }
       return api_output
   def _set_model(self, model_gcs_path):
       """
       download file from gcs to temp file and deserialize it to sklearn object
       :param str model_gcs_path: Path to gcs file
       :return sklearn.Pipeline model: Deserialized pipeline ready for production
       """
       storage_client = storage.Client()
       bucket_name = os.environ["BUCKET_NAME"]
       print(f"Loading from bucket {bucket_name} model {model_gcs_path}")
       bucket = storage_client.get_bucket(bucket_name)
       # select bucket file
       blob = bucket.blob(model_gcs_path)
       with TemporaryFile() as temp_file:
           # download blob into temp file
           blob.download_to_file(temp_file)
           temp_file.seek(0)
           # load into joblib
           model = joblib.load(temp_file)
       print(f"Finished loading model from GCS")
       return model

Our predictor implements 3 functions:

  1. _send_monitor_data – for each prediction request, send the prediction data to Superwise.
  2. _set_model – read the joblib object from GCS and load it as our model (this is the output of Validate model step).
  3. Predict – use the model to predict the price of the diamonds and log data to the Superwise platform.

Dockerfile: (ARGs and ENVs are for local testing)

FROM python:3.7
WORKDIR /app
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY . ./
ARG MODEL_PATH
ARG SUPERWISE_CLIENT_ID
ARG SUPERWISE_SECRET
ARG SUPERWISE_MODEL_ID
ARG SUPERWISE_VERSION_ID
ARG BUCKET_NAME
ENV SUPERWISE_CLIENT_ID=${SUPERWISE_CLIENT_ID}
ENV SUPERWISE_SECRET=${SUPERWISE_SECRET}
ENV BUCKET_NAME=${BUCKET_NAME}
ENV MODEL_PATH=${MODEL_PATH}
ENV SUPERWISE_MODEL_ID=${SUPERWISE_MODEL_ID}
ENV SUPERWISE_VERSION_ID=${SUPERWISE_VERSION_ID}
ENV FLASK_APP /app/server.py
ENV GOOGLE_APPLICATION_CREDENTIALS /app/resources/creds.json
ENTRYPOINT ["gunicorn", "--bind", "0.0.0.0:5050", "predictor.server:app", "--timeout", "1000", "-w", "4"]
EXPOSE 5050

Now we need to push this to a new artifactory registry in GCP.

Go to GCP console -> artifact registry -> create repository

name: diamonds-predictor-repo

format: docker

Then run this in terminal

REPOSITORY='diamonds-predictor-repo'
PROJECT_ID='your GCP project ID'
REGION='<GCP Region (e.g. us-central 1)>'
IMAGE='diamonds_predictor'
docker build --tag=${REGION}-docker.pkg.dev/${PROJECT_ID}/${REPOSITORY}/${IMAGE} .
docker push ${REGION}-docker.pkg.dev/${PROJECT_ID}/${REPOSITORY}/${IMAGE}

Let’s start monitoring

Register your model to Superwise:

In order to monitor your model in production, you’ll first need to register it to Superwise’s platform. For this step, we will need to log into Superwise and generate a CLIENT_ID and a SECRET. Click on your user name on the bottom left.

Putting together a continuous ML stack - Superwise keys

Then select personal tokens and generate a token, copy the CLIENT ID and SECRET, and save it somewhere safe.

Putting together a continuous ML stack - Superwise personal access tokens

The following snippet is a new component designed to register your model to Superwise, create a version for it (similar to a tag) and have Superwise stand ready to monitor it.

SUPERWISE_CLIENT_ID="<YOUR SUPERWISE ACCOUNT CLIENT ID>" # @param project number
SUPERWISE_SECRET="<YOUR SUPERWISE ACCOUNT SECRET>"# @param project number
SUPERWISE_MODEL_NAME = "Regression - Diamonds Price Predictor"
@component(packages_to_install=["superwise", "pandas"])
def register_model_to_superwise(
    model_name: str,
    superwise_client_id: str,
    superwise_secret: str,
    baseline: Input[Dataset],
    timestamp: str,
) -> NamedTuple("output", [("superwise_model_id", int), ("superwise_version_id", int)]):
    import pandas as pd
    
    from datetime import datetime
    from superwise import Superwise
    from superwise.models.model import Model
    from superwise.models.version import Version
    from superwise.resources.superwise_enums import DataEntityRole
    from superwise.controller.infer import infer_dtype
    sw = Superwise(
        client_id=superwise_client_id,
        secret=superwise_secret,
    )
    first_version = False
    # Check if model exists
    models = sw.model.get_by_name(model_name)
    if len(models) == 0:
        print(f"Registering new model {model_name} to Superwise")
        diamond_model = Model(name=model_name, description="Predicting Diamond Prices")
        new_model = sw.model.create(diamond_model)
        model_id = new_model.id
        first_version = True
    else:
        print(f"Model {model_name} already exists in Superwise")
        model_id = models[0].id
    baseline_data = pd.read_csv(baseline.path).assign(
        ts=pd.Timestamp.now() - pd.Timedelta(30, "d")
    )
    # infer baseline data types and calculate metrics & distribution for features
    entities_dtypes = infer_dtype(df=baseline_data)
    entities_collection = sw.data_entity.summarise(
        data=baseline_data,
        entities_dtypes=entities_dtypes,
        specific_roles={
            "record_id": DataEntityRole.ID,
            "ts": DataEntityRole.TIMESTAMP,
            "predictions": DataEntityRole.PREDICTION_VALUE,
            "price": DataEntityRole.LABEL,
        },
    )
    if not first_version:
        model_versions = sw.version.get({"model_id": model_id})
        print(
            f"Model already has the following versions: {[v.name for v in model_versions]}"
        )
    new_version_name = f"v_{timestamp}"
    # create new version for model in Superwise
    diamond_version = Version(
        model_id=model_id,
        name=new_version_name,
        data_entities=entities_collection,
    )
    new_version = sw.version.create(diamond_version)
    # activate the new version for monitoring
    sw.version.activate(new_version.id)
    return (model_id, new_version.id)

Our inputs are the model_id, client and secret, the baseline data that we performed the training on, and a timestamp to create a unique version name. The code above uses the Superwise public SDK to perform actions on the platform.

Our final step will be to create the endpoint and deploy our custom image to it.

@component(
    packages_to_install=[
        "google-cloud-aiplatform==1.7.0",
        "google-cloud-pipeline-components",
    ]
)
def deploy_model_to_endpoint(
    project: str,
    location: str,
    bucket_name: str,
    timestamp: str,
    superwise_client_id: str,
    superwise_secret: str,
    superwise_model_id: int,
    superwise_version_id: int,
    serving_container_image_uri: str,
    model: Input[Model],
    vertex_model: Output[Model],
):
    import os
    from google.cloud import aiplatform, storage
    aiplatform.init(project=project, location=location)
    DISPLAY_NAME = "Diamonds-Price-Predictor"
    def create_endpoint():
        endpoints = aiplatform.Endpoint.list(
            filter='display_name="{}"'.format(DISPLAY_NAME),
            order_by="create_time desc",
            project=project,
            location=location,
        )
        if len(endpoints) > 0:
            endpoint = endpoints[0]  # most recently created
        else:
            endpoint = aiplatform.Endpoint.create(
                display_name=DISPLAY_NAME, project=project, location=location
            )
        return endpoint
    def upload_model_to_gcs(artifact_filename, local_path):
        model_directory = f"{bucket_name}/models/"
        storage_path = os.path.join(model_directory, artifact_filename)
        blob = storage.blob.Blob.from_string(storage_path, client=storage.Client())
        blob.upload_from_filename(local_path)
        return f"models/{artifact_filename}"
    endpoint = create_endpoint()
    model_gcs_path = upload_model_to_gcs(f"model_{timestamp}.joblib", model.path)
    model_upload = aiplatform.Model.upload(
        display_name=DISPLAY_NAME,
        serving_container_image_uri=serving_container_image_uri,
        serving_container_ports=[5050],
        serving_container_health_route=f"/diamonds/v1",
        serving_container_predict_route=f"/diamonds/v1/predict",
        serving_container_environment_variables={
            "MODEL_PATH": model_gcs_path,
            "BUCKET_NAME": bucket_name.strip("gs://"),
            "SUPERWISE_CLIENT_ID": superwise_client_id,
            "SUPERWISE_SECRET": superwise_secret,
            "SUPERWISE_MODEL_ID": superwise_model_id,
            "SUPERWISE_VERSION_ID": superwise_version_id,
        },
    )
    print("uploaded version")
    model_deploy = model_upload.deploy(
        machine_type="n1-standard-4",
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=DISPLAY_NAME,
    )
    vertex_model.uri = model_deploy.resource_name

During this step’s execution, we will write the serialized model object using joblib into a predefined folder in the bucket (not the pipeline’s root), so the predictor will be able to run_set_model function and load it from there. In addition, we will deploy the image using all the environment variables needed for it to work.

Lastly, in this code snippet, we can see that the deploy function gets a machine-type (we chose the basic one), and a traffic split, which is useful for A/B testing or gradual deployments. 

At last, we’re here! Let’s create a new pipeline and run it!

@pipeline(
    name=PIPELINE_NAME,
    description="An ml pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def ml_pipeline():
    raw_data = load_data()
    validated_data = validate_data(raw_data.outputs["dataset"])
    prepared_data = prepare_data(validated_data.outputs["validated_df"])
    trained_model_task = train_model(
        prepared_data.outputs["X_train"], prepared_data.outputs["y_train"]
    )
    evaluated_model = evaluate_model(
        trained_model_task.outputs["model_artifact"],
        prepared_data.outputs["X_test"],
        prepared_data.outputs["y_test"],
    )
    validated_model = validate_model(
        new_model_metrics=evaluated_model.outputs["model_performance"],
        new_model=trained_model_task.outputs["model_artifact"],
        dataset=validated_data.outputs["validated_df"],
    )
    ### NEWLY ADD SECTION ###
    with dsl.Condition(
        validated_model.outputs["deploy"] == "true", name="deploy_decision"
    ):
        superwise_metadata = register_model_to_superwise(
            SUPERWISE_MODEL_NAME,
            SUPERWISE_CLIENT_ID,
            SUPERWISE_SECRET,
            validated_model.outputs["baseline"],
            TIMESTAMP,
        )
        vertex_model = deploy_model_to_endpoint(
            PROJECT_ID,
            REGION,
            BUCKET_NAME,
            TIMESTAMP,
            SUPERWISE_CLIENT_ID,
            SUPERWISE_SECRET,
            superwise_metadata.outputs["superwise_model_id"],
            Superwise_metadata.outputs["superwise_version_id"],
            f"{REGION}-docker.pkg.dev/{PROJECT_ID}/diamonds-predictor-repo/diamonds_predictor:latest",
            trained_model_task.outputs["model_artifact"],
        )

Exactly like the pipeline we already executed, with 3 new lines

dsl.condition helps us set conditions during execution. In our case, if the evaluation step were producing a False value, we would not continue to deployment.

Superwise_metadata  – Gets the outputs of the register_model_to_superwise step

Vertex_model – Get a single output, the vertex_model uri. We will use this to send prediction requests.

Let’s run it:

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client(project=PROJECT_ID)
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print("File {} uploaded to {}.".format(source_file_name, destination_blob_name))
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
ml_pipeline_file = "ml_pipeline.json"
compiler.Compiler().compile(
    pipeline_func=ml_pipeline, package_path=ml_pipeline_file
)
job = aiplatform.PipelineJob(
    display_name="diamonds-predictor-pipeline",
    template_path=ml_pipeline_file,
    job_id="e2e-pipeline-{}-{}".format(USERNAME, TIMESTAMP),
    enable_caching=True,
)
upload_blob(
    bucket_name=BUCKET_NAME.strip("gs://"),
    source_file_name=ml_pipeline_file,
    destination_blob_name=ml_pipeline_file,
)
job.submit()

In this execution snippet, we added a small function to upload the generated JSON file to GCS as well, so other users can use it or in case we want to perform a rollback and load an older model.

This will run for a few minutes. Vertex will use a caching mechanism to skip the first pipeline we ran and execute only the 2 new steps. Here’s how the graph should look once it’s done:

Putting together a continuous ML stack - Vertex GCP console pipeline run

In order to simulate a web server listening to events and triggering a new pipeline in case of an incident. I will be using Google’s cloud function in order to trigger the webhook. This webhook will trigger a new pipeline, only this time, our extract_data component will read the entire Diamonds dataset, so our model will encounter >10000 priced diamonds.

To do so, I have placed a file called full_data_ml_pipeline.json (which is the output of our pipeline.py script) without the df = df[df[‘price’] < 10000] line, in “gs://pipeline_blog_bucket/itaybenhaim_pipeline_root/full_data_ml_pipeline.json”

Head over to GCP cloud functions, enable the requested APIs, and create a new HTTP function with Python code:

from google.cloud import aiplatform
PROJECT_ID = 'your-project-id'                     # <---CHANGE THIS
REGION = 'your-region'                             # <---CHANGE THIS
PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS
def trigger_pipeline_run():
    """Triggers a pipeline run"""
    pipeline_spec_uri = "gs://pipeline_blog_bucket/itaybenhaim_pipeline_root/full_data_ml_pipeline.json"
    # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri
    aiplatform.init(
        project=PROJECT_ID,
        location=REGION,
    )
    job = aiplatform.PipelineJob(
        display_name='incident-triggered-ml-pipeline',
        template_path=pipeline_spec_uri,
        pipeline_root=PIPELINE_ROOT,
        enable_caching=False
    )
    # Submit the PipelineJob
    job.submit()

We will use the generated URL of this webhook in Superwise’s integrations page:

Putting together a continuous ML stack - Superwise webhook
Putting together a continuous ML stack - Superwise add new webhook

Now let’s create a policy in Superwise so that upon violation detection will trigger a webhook to retrain our pipeline. On the main models page, click on add monitoring policy.

Putting together a continuous ML stack - Superwise add monitoring policy

Then follow the wizard to configure a policy and integrate it into the trigger-full-pipeline webhook (I chose feature stability and had Superwise automatically configure the thresholds and features to monitor).

Putting together a continuous ML stack - Superwise add monitoring policy feature stability
Putting together a continuous ML stack - Superwise add monitoring policy feature stability to webhook

Great! We are all set, and our Vertex endpoint is being monitored!

Now let’s simulate real-world behavior of a production environment.

Let’s say that in the first 18 days of use, the new observations were in the range of <10000 priced diamonds, and the model performed well, but lo and behold, a new diamond mine has been revealed, causing diamonds to be bigger and raise their prices!!

(In a new script)

import requests
import json
import pandas as pd
import google.auth
import google.auth.transport.requests
ENDPOINT_ID = "<GET THE ENDPOINT_ID FROM THE PIPELINE'S OUTPUT>" # @param
url = f"https://{REGION}-aiplatform.googleapis.com/v1/projects/{PROJECT_NUMBER}/locations/{REGION}/endpoints/{ENDPOINT_ID}:predict"
                                                      
credentials, project_id = google.auth.default(
    scopes=[
        "https://www.googleapis.com/auth/cloud-platform",
        "https://www.googleapis.com/auth/cloud-platform.read-only",
    ]
)
instances = {"instances": []}
df = pd.read_csv("https://www.openml.org/data/get_csv/21792853/dataset")
expensive_df = df[df["price"] > 10000].sort_values("price", ascending=False)
df = df[df["price"] < 10000]
count = 28
chunk_size = 500
reset_index = True
min_chunk, max_chunk = 0, chunk_size
while count:
    print(count)
    print(f"Uploading data from: {str(pd.Timestamp.now() - pd.Timedelta(count, 'd'))}")
    if count < 10:
        if reset_index:
          min_chunk, max_chunk = 0, 500
          reset_index = False
        print(expensive_df.iloc[min_chunk:max_chunk]['price'].mean())
        for row_tuple in expensive_df.iloc[min_chunk:max_chunk].iterrows():  
            row_dict = row_tuple[1].drop("price").to_dict()
            row_dict["record_id"] = row_tuple[1].name
            row_dict["ts"] = str(pd.Timestamp.now() - pd.Timedelta(count, 'd'))
            instances["instances"].append(row_dict)        
    else:
        print(df.iloc[min_chunk:max_chunk]['price'].mean())
        for row_tuple in df.iloc[min_chunk:max_chunk].iterrows():
            row_dict = row_tuple[1].drop("price").to_dict()
            row_dict["record_id"] = row_tuple[1].name
            row_dict["ts"] = str(pd.Timestamp.now() - pd.Timedelta(count, 'd'))
            instances["instances"].append(row_dict)
    request = google.auth.transport.requests.Request()
    credentials.refresh(request)
    token = credentials.token
    headers = {"Authorization": "Bearer " + token}
    response = requests.post(url, json=instances, headers=headers)
    #print(response.text) ## If needed 🙂
    print("---" * 15)
    instances["instances"] = []
    count -= 1
    min_chunk += chunk_size
    max_chunk += chunk_size

Running this will generate the situation above and have Superwise trigger a webhook to retrain on the whole dataset!

Output:

Putting together a continuous ML stack - Superwise trigger retraining

Before we look into the incidents and the automation, we can see Superwise has calculated the production data distributions for us, and we can already see a distribution shift in multiple features.

Putting together a continuous ML stack - Superwise production data distribution

Now the incident will get caught when the policy daemon will run (according to the schedule defined in the policy) and show up on the incidents screen.

Putting together a continuous ML stack - Superwise incident

Superb!! We simulated the distribution shift, and Superwise automatically triggered our cloud function to rerun the training pipeline:

Putting together a continuous ML stack - Superwise triggered retraining in GCP

Summary

In this super yet awesomely long post, we introduced two new paradigms:

  • Training-serving pipelines – instead of pickled objects.
  • Continuous training – data-driven from production observability insights (to read more about data-driven retraining, check out this blog and jupyter notebook).

The stack and the technical implementation can be adapted and assembled from many different tools. There are a lot of great off-the-shelf and open-source tools out there, so it’s entirely up to you to put together the toolbox that best fits your needs! Using the big vendors’ ML platforms is a quick and easy way to get started but is less flexible and will not always work for some use cases.

And one final thing to think about before we sign off. Retraining is not always the solution. Sometimes model recalibration, fixing the data source stream, splitting to sub-populations, and adjusting thresholds are needed. So you need to ensure that the monitoring you set up is capable of detecting issues, identifying root-cause, and identifying the right resolution needed. 

For any questions, discussions, or ideas – talk to us!

info@superwise.ai and itay.benhaim@superwise.ai

Data-driven retraining with production observability insights

We all know that our model’s best day in production will be its first day in production. It’s simply a fact of life that, over time, model performance degrades. ML attempts to predict real-world behavior based on observed patterns it has trained on and learned. But the real world is dynamic and always in motion; sooner or later, depending on your use case and data velocity, your model will decay and begin to exhibit concept drift, data drift, or even both. 

Your best day in production is your first day in production

When models misbehave, we often turn to retraining to attempt to fix the problem. As a rule of thumb, data science teams will often take the most recent data from production, say 2 or 3 months’ worth of data, and retrain their model on it with the assumption that “refreshing” the model’s training observations will enable it to predict future results better. But is the most recent data the best data to resolve a model’s performance issues and get it back on track? Think about it this way, if an end-to-end software test failed would you accept it as fixed just by rerunning the test? Most likely not. You’d troubleshoot the issue to pinpoint the root cause and apply an exact fix to resolve the issue. ML teams do precisely this with model monitoring to pinpoint anomalies and uncover their root cause to resolve issues quickly before they impact business outcomes. But when the resolution requires retraining, “fresh is best” is not exactly a data-driven approach.

This article will demonstrate how data science and ML engineering teams can leverage ML monitoring to find the best data and retraining strategy mix to resolve machine learning performance issues. This data-driven, production-first approach enables more thoughtful retraining selections and shorter and leaner retraining cycles and can be integrated into MLOps CI/CD pipelines for continuous model retraining upon anomaly detection.

Matching production insights to retraining strategies

The insights explained below are based on anomalies detected in the Superwise model observability platform and analyzed in a corresponding jupyter notebook that extracts retraining insights. All the assets are open for use under the Superwise community edition, and you can use it to run the notebook on your own data. 

* It’s important to note that the value of this approach lies in identifying how to best retrain once you have eliminated other possible issues in your root cause investigation. 

Identifying retraining groups 

The question? What data should I use for the next retraining?

Models are subject to temporality and seasonality. Selecting a dataset impacted by a temporal anomaly or flux can result in model skew. An important insight from production data is data DNA or the similarity of days distribution. Understanding how data is changing between dates (drift score between dates) enables date-based grouping based on similarities and differences. With this information, you can create a combination of data-retraining groups that reflect or exclude the temporal behavior of your production data. 

Here we can see a heatmap plot matrix of dates X dates, and each cell represents the change between 2 dates. Cells colored in bold are very different from each other, while cells that are colored lightly represent dates that are very similar to each other.

Identifying retraining groups
Data DNA

As you can see in this example, the data is divided into 3 main groups, orange, red, and green, representing the 3 optional datasets to use in the next retraining.

  • Red – the red group reflects a recurring event in our data that we want to train on. This could be, for example, behavior over the weekends. 
  • Orange – the orange group is normal data behavior in production.
  • Green – the green group represents a unique behavioral event. For example, this could be a marketing campaign in a click-through rate optimization use case. 

Depending on your domain insights, the include/exclude decisions may differ. If the marketing campaign was successful and the insights will be rolled-out to all marketing campaigns, you may decide to retrain green and red. If the campaign was a one-time event or a failed experiment, orange and red would be a better retraining data group selection. 

Retraining groups
Retraining groups

Identifying drifted segments

The question? Which populations are impacted?

A model’s purpose is to abstract predictions across your population, but with that said, you will always need to monitor your model’s behavior on the segment level to detect if a specific segment is drifting. When segment drift is detected, we can consider the following resolutions or even a combination of them, together with retraining. 

  • Model split – create a specific model for the segment.
  • Optimize the model – suit the model to handle the current segment.
  • Resample the data – change the data distribution that the model will learn on data for the specific model.

Here we can see the segment drift value where each bar shows the drift score of each segment. Before taking action, it is important to understand the relationship of the segment size to the segment drift to determine the extent of the segment’s effect on the model.

Mean segment drift
Mean segment drift

Moreover, this lets us see the relation of segment size to the segment drift value and determine if we need to create a specific model for this segment or not.

Relationship between segment size and segment drift
Relationship between segment size and segment drift

Identifying days with integrity issues

The question? Which days should be excluded from retraining on principle?

Some data should be excluded from retraining on principle, namely days when we experienced data integrity issues due to some pipeline or upstream source issue. If this data is taken into consideration during retraining, it can cause our model to misinterpret the ‘normal’ distribution, which can result in a further decline in model performance. 

Top 10 day's with integrity incidents
Top 10 days with integrity incidents

Here we can see a bar graph of the days with data integrity incidents. This lets us quickly identify ‘bad’ data that we should exclude from the next retraining.

Smarter, leaner retraining

Retraining isn’t free. It takes up resources both in terms of training runs and your team’s focus and efforts. So anything that we can do to improve the probability of finishing a retraining cycle with higher-performing results is crucial. That is the value of data-driven retraining with production insights. Smarter and leaner retraining that leverages model observability to take you from detection quickly and effectively. 

Ready to get started with Superwise?

Head over to the Superwise platform and get started with easy, customizable, scalable, and secure model observability for free with our community edition.

Prefer a demo?

Request a demo and our team will show what Superwise can do for your ML and business.