Robust Intelligence + DataRobot Integration Walkthroughļƒ

You are a data scientist at a Payment Processing Company. The data science team has been tasked with implementing a Fraud Detection service and monitoring how that model performs over time. The performance of this fraud detection model directly impacts the costs of the company. In order to ensure the data science team develops the best model and the performance of this model doesnā€™t degrade over time, the VP of Data Science purchases the RIME platform.

Your team currently does all model development and serving on DataRobotā€™s intelligent cloud and already uses all of its MLOps tooling.

In this Notebook Walkthrough, we will walkthrough 2 of RIMEā€™s core products - AI Stress Testing and AI Firewall and demonstrate how they integrate with DataRobotā€™s core offerings to help you develop and maintain more robust AI models.

  1. AI Stress Testing is used in the model development stage. Using AI Stress Testing you can test the developed model. RIME goes beyond simply optimizing for basic model performance like accuracy and automatically discovers the modelā€™s weaknesses.

  2. AI Firewall is used after the model is deployed in production. Using AI Firewall, you can automate the monitoring, discovery and remediation of issues that occur post-deployment. Additionally it automatically flags, blocks, or imputes erroneous data in real-time.

Latest Colab version of this notebook available here

Install Dependencies, Import Libraries and Download Dataļƒ

Run the cell below to install libraries to recieve data, install our SDK, and load analysis libraries.

[ ]:
%pip install pandas &> /dev/null
%pip install datarobot &> /dev/null
%pip install rime-sdk &> /dev/null
%pip install https://github.com/RobustIntelligence/ri-public-examples/archive/master.zip &> /dev/null

from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional

import datarobot as dr
import pandas as pd
from ri_public_examples.download_files import download_files

download_files("tabular/fraud", "fraud_data")

Connecting to DataRobotļƒ

If you do not already have an account with DataRobot, create one now. Once that is done, copy your DataRobot API Key and assign to the variable DATAROBOT_TOKEN below.

See the documentation here for more information on DataRobotā€™s API key management console.

Connecting to RIMEļƒ

Next, copy your API key for RIME and assign to the variable RIME_API_TOKEN below.

You can generate a new API key within the ā€˜Workspace settingsā€™ page on your RIME clusterā€™s website.

Additionally, copy the url of the cluster (e.g., ā€˜rime.<cluster-name>.rime.devā€™) to the RIME_CLUSTER_URL variable below. (Note: the cluster cannot be localhost)

Next, we will define some additional constants to help us load the correct datasets for this walkthrough.

[ ]:
# DataRobot User-provided Constants
DATAROBOT_TOKEN = ""
# Endpoint may differ for managed cloud/on-prem/EU/etc.
DATAROBOT_ENDPOINT="https://app2.datarobot.com/api/v2"

# RIME User-provided Constants
RIME_API_TOKEN = ""
RIME_CLUSTER_URL = "" # e.g., "rime.<cluster>.rime.dev"

# DataRobot project constants
dr_project_name = 'Fraud Detection'
dr_project_id = None # Set to your DataRobot project ID if you've developed a project
[ ]:
# Constants for loading the data
label_col = "label"
excluded_cols = ['preds']
ref_file = "fraud_data/data/fraud_ref.csv"
eval_file = "fraud_data/data/fraud_eval.csv"
incremental_file = "fraud_data/data/fraud_incremental.csv"
metric = 'LogLoss' # DataRobot recommended metric
autopilot_mode = dr.enums.AUTOPILOT_MODE.QUICK

# Load the data for training and evaluation
train =  pd.read_csv(ref_file).drop(excluded_cols, axis=1)
test  =  pd.read_csv(eval_file).drop(excluded_cols, axis=1)
train.head()

Develop Models using DataRobotā€™s AutoPilot Serviceļƒ

We have loaded the development datasets above and are ready to train a model. First, weā€™ll define a few helper functions (not RIME-specific).

[ ]:
#@title Helper Functions (Hidden)
def get_model_score(model: dr.Model, metric: str):
    res = {}
    res['model_number'] = model.model_number
    res['model_type'] = model.model_type
    res['model'] = model
    res['sample_pct'] = model.sample_pct

    res['metric_v'] = model.metrics.get(metric, {}).get('validation')
    res['metric_cv'] = model.metrics.get(metric, {}).get('crossValidation')

    return res

def get_train_preds(model: dr.Model) -> pd.DataFrame:
    """Request and/or retrieve training predictions for a given model."""
    try:
        # Request training predictions and get job IDs
        pred_job = model.request_training_predictions(dr.enums.DATA_SUBSET.ALL)
        preds = pred_job.get_result_when_complete().get_all_as_dataframe()
        return preds
    except:
        # Retrieve training predictions if they were already requested
        train_preds = dr.TrainingPredictions.list(model.project_id)
        for train_pred in train_preds:
            if train_pred.model_id == model.id and train_pred.data_subset == 'all':
                preds = dr.TrainingPredictions.get(model.project_id, train_pred.prediction_id).get_all_as_dataframe()
                return preds

Create DataRobot Client and Projectļƒ

Before we train a model, we need to connect to our DataRobot account and create a new project (or connect to an existing one). Once the project is made, we can begin Autopilot to develop and select a model that performs well on the training data.

[ ]:
# To connect from a Zepl notebook:
# dr_client = dr.Client(token=z.getDatasource("datarobot_api")['token'] , endpoint=DATAROBOT_ENDPOINT)

# To connect from a Jupyter notebook:
dr_client = dr.Client(endpoint=DATAROBOT_ENDPOINT, token=DATAROBOT_TOKEN)
[ ]:
if dr_project_id is None:
    dr_project = dr.Project.create(project_name=dr_project_name, sourcedata=train)
    dr_project.set_target(
        target=label_col,
        worker_count = '-1',
        mode=autopilot_mode
    )

    dr_project.wait_for_autopilot(verbosity=1)  # Wait for autopilot to complete
    dr_project_id = dr_project.id
else:
    dr_project = dr.Project.get(project_id=dr_project_id)
test_dr_dataset = dr_project.upload_dataset(test)
[ ]:
# Fetch models trained on at least 60% of the uploaded dataset
models = dr_project.get_models(
    search_params={
        'sample_pct__gt': 60,
    }
)
# Order by AUC computed during cross-validation
models = sorted(models, key=lambda model: get_model_score(model, 'AUC')['metric_cv'] or -1, reverse=True)
# Choose model with highest training AUC to evaluate on RIME
model = models[0]

Compute Model Predictionsļƒ

Now that weā€™ve trained the model with Autopilot, letā€™s prepare the data and model for RIME to see how it can provide us with more in-depth information and recommendations for making our model as robust as possible.

First, weā€™ll compute the model predictions for the evaluation dataset to speed up our RIME run.

[ ]:
pred_col_name = "pred"
predict_job = model.request_predictions(test_dr_dataset.id)
predictions = predict_job.get_result_when_complete()
test_with_preds = test.copy()
test_with_preds[pred_col_name] = predictions['positive_probability']
train_with_preds = train.copy()
train_with_preds[pred_col_name] = get_train_preds(model)['class_1.0']

Next, weā€™ll deploy the model to facilitate a loose integration with RIME.

[ ]:
# Deploy model
def deploy_model(model: dr.Model) -> dr.Deployment:
    """Create a deployed model endpoint."""
    deployment = dr.Deployment.create_from_learning_model(
      model_id=model.id, label="Fraud Detection Deployment",
      description="Deployed with DataRobot client")

    # View deployment stats
    service_stats = deployment.get_service_stats()
    print(service_stats.metrics)
    return deployment

deployment = deploy_model(model)

Connecting to RIMEļƒ

Now for the fun. To connect to RIME, we use the API key and cluster URL specified above. You can generate a new API key within the ā€˜Workspace settingsā€™ page on your RIME clusterā€™s website.

[ ]:
from rime_sdk import Client, Project, Job

rime_client = Client(RIME_CLUSTER_URL, api_key=RIME_API_TOKEN)
rime_project = rime_client.create_project(name='DataRobot Demo',
                                          description='Create an e2e RIME Demo using DataRobot.')

Define Helper Functionsļƒ

Below, define functions to assist us in: - creating a model.py file that connects with the model endpoint deployed above. - uploading the model and data to the RIME cluster. - configuring the RIME run and initializing stress testing.

[ ]:
#@title Helper Functions
def get_model_file_contents(deployment: dr.Deployment, api_token: str) -> str:
    """Return the stringified model.py file."""
    api_url = deployment.default_prediction_server["url"]
    deployment_id = deployment.id
    file_str = """\"\"\"Template for how you can use RIME for a model hosted on DataRobot.

We expect this file to contain a `predict_dict` function that takes in a mapping from
feature name to feature value. This corresponds to one row in the dataset. This
method should return a score between 0 and 1.


This specific file implements this assuming that 1) your model is hosted
on DataRobot, and 2), that your machine is authenticated with Google Cloud,
and 3) that you have the requests library installed.

\"\"\"

import time

import numpy as np
import pandas as pd
import requests


# Step 1: Define endpoint variables.
"""
    file_str += f'API_URL = "{api_url}/predApi/v1.0/deployments"\n'
    file_str += f"API_KEY = '{api_token}'\n"
    file_str += f"DEPLOYMENT_ID = '{deployment_id}'\n"
    file_str += """
URL = API_URL + f"/{DEPLOYMENT_ID}/predictions"
HEADERS = {
        'Content-Type': 'application/json; charset=UTF-8',
        'Authorization': 'Bearer {}'.format(API_KEY),
    }

MAX_PREDICTION_FILE_SIZE_BYTES = 52428800  # 50 MB


def predict_df(df: pd.DataFrame) -> np.ndarray:
    \"\"\"Return array of probabilities assigned to the positive class.\"\"\"
    data = pd.DataFrame.to_json(df, orient="records")
    # breakpoint()
    headers = {
        "Content-Type": "application/json; charset=UTF-8",
        "Authorization": "Bearer {}".format(API_KEY),
    }

    # Make API request for predictions
    success = False
    while not success:
        predictions_response = requests.post(URL, data=data, headers=headers,)
        # Make sure we are not running into a 429 (too many requests) error
        if predictions_response.status_code == 429:
            time.sleep(int(predictions_response.headers["Retry-After"]))
        else:
            success = True
    # Get response data
    res = predictions_response.json()["data"]
    # Get the prediction for the case where label == 1
    # NOTE: this is only for binary classification
    preds = []
    for pred in res:
        for val in pred["predictionValues"]:
            if val["label"] == 1:
                preds.append(val["value"])
                break
        else:
            raise ValueError(
                f"No prediction for input row {pred['rowId']} and label == 1"
            )
    return np.array(preds)
    """
    return file_str


def upload_model(
    deployment: dr.Deployment, api_token: str, client: Client
) -> str:
    """Upload trained model to RIME cluster and return the resultant model directory path."""
    upload_path = "datarobot_demo"
    with TemporaryDirectory() as d:
        _dir = Path(d)
        extras_dir = _dir / "model_extras"
        extras_dir.mkdir()
        with open(_dir / "model.py", "w") as f:
            f.write(get_model_file_contents(deployment, api_token))
        uploaded_path = client.upload_directory(d, upload_path=upload_path)
    return uploaded_path + "/model.py"


def upload_dataset_file(client: Client, df: pd.DataFrame, split: str) -> str:
    """Upload dataframe to RIME cluster."""
    upload_path = "datarobot_demo"
    with TemporaryDirectory() as d:
        f = Path(d) / f"{split}_data.csv"
        df.to_csv(f, index=False)
        uploaded_name = client.upload_file(f.resolve(), upload_path=upload_path)
    return uploaded_name


def prepare_test_config(
    rime_ref_path: str,
    rime_eval_path: str,
    rime_model_path: str,
    net_name: str,
    label_col: str,
    pred_col: Optional[str] = None,
) -> dict:
    """Prepare a test config from the given paths."""
    test_config = {
        "run_name": f"{net_name} DataRobot Experiment",
        "data_info": {"label_col": label_col},
        "model_info": {},
        "model_task": "Binary Classification",
    }
    if pred_col is not None:
        test_config["data_info"]["pred_col"] = pred_col
    test_config["data_info"]["ref_path"] = rime_ref_path
    test_config["data_info"]["eval_path"] = rime_eval_path
    test_config["model_info"]["path"] = rime_model_path
    return test_config


def run_rime_stress_tests(
    test_config: dict, client: Client, project: Project
) -> Job:
    """Run the stress tests for RIME."""
    stress_test_job = client.start_stress_test(
        test_config, project_id=project.project_id
    )
    stress_test_job_status = stress_test_job.get_status(
        verbose=True, wait_until_finish=True
    )
    if stress_test_job_status["status"] == "JOB_STATUS_SUCCEEDED":
        return stress_test_job
    raise Exception(f"Stress test job run failing. {stress_test_job_status}")

Upload Data + Modelļƒ

Below, upload data and the model file, then kick off a stress testing run.

[ ]:
net_name = "Fraud Detection Model"
rime_ref_path = upload_dataset_file(rime_client, train_with_preds, "ref")
rime_eval_path = upload_dataset_file(rime_client, test_with_preds, "eval")
rime_model_path = upload_model(deployment, DATAROBOT_TOKEN, rime_client)
test_config = prepare_test_config(rime_ref_path, rime_eval_path, rime_model_path, net_name, label_col, pred_col=pred_col_name)
stress_job = run_rime_stress_tests(test_config, rime_client, rime_project)

Stress Test Resultsļƒ

Stress stest are grouped into categories that measure various aspects of model robustness (model behavior, distribution drift, abnormal input, transformations, adversarial attacks, data cleanliness). Suggestions to improve your model are aggregated on the category level as well. Tests are ranked by default by a shared severity metric. Clicking on an individual test surfaces more detailed information.

You can view the detailed results in the UI by running the below cell and redirecting to the generated link. This page shows granular results for a given AI Stress Test run.

[ ]:
test_run = stress_job.get_test_run()
test_run

Analyzing the Resultsļƒ

Navigate the link printed in the above code block to identify issues with the current model.

Programmatically Querying the Resultsļƒ

RIME not only provides you with an intuitive UI to visualize and explore these results, but also allows you to programmatically query these results. This allows you to integrate with any MLOps pipeline, log results to experiment management tools like MLFlow, bring automated decision making to your ML practices, or store these results for future references.

Run the below cell to programmatically query the results. The results are outputed as a pandas dataframe.

Access results at the a test run overview level

[ ]:
test_run_result = test_run.get_result_df()
test_run_result.to_csv("Fraud_Test_Run_Results.csv")
test_run_result

Access detailed test results at each individual test cases level.

[ ]:
test_case_result = test_run.get_test_cases_df(show_test_case_metrics=True)
test_case_result.to_csv("Fraud_Test_Case_Results.csv")
test_case_result

Deploy to Production and Create the AI Firewallļƒ

Once you have identified the best stress test run, you can deploy the associated model wrapped with the AI Firewall. The AI Firewall operates on both a datapoint and batch level. It automatically protects your model in real-time from ā€œbadā€ incoming data and also alerts on statistically significant distributional drift.

In this scenario, the data scientist is short on time and decided to deploy the existing model to production. The data scientist also creates and wraps a firewall around the model. The AI Firewall is automatically configured based on the failures identified by AI Stress testing to protect the tested model in Production.

[ ]:
firewall = rime_project.create_firewall(name="DataRobot Firewall", bin_size='day', test_run_id=test_run.test_run_id)

Uploading a Batch of Production Data & Model Predictions to Firewallļƒ

The fraud detection model has been in production for 30 days. Production data and model predictions have been collected and stored for the past 30 days. Now, we will use Firewall to track how the model performed across the last 30 days.

Upload an Incremental Batch of Data

[ ]:
# Fetch the production data
TIMESTAMP_COL = "timestamp"
incremental_df = pd.read_csv(incremental_file).drop(excluded_cols, axis=1)

# Group by week to simulate running weekly batches
# This isn't a requirement, as RIME can perform the grouping itself
timestamps_dt = pd.DatetimeIndex(incremental_df[TIMESTAMP_COL])
for name, group in incremental_df[TIMESTAMP_COL].groupby(timestamps_dt.to_period("W-SUN").to_timestamp()):
    split_name = name.split(' ')[0]
    print(f"Running batch for week {split_name}")
    batch_df = incremental_df.loc[group.index]
    # Upload to datarobot to make predictions. In a real deployment,
    # this step would have been done already
    prod_dr_dataset = dr_project.upload_dataset(batch_df)
    predict_job = model.request_predictions(prod_dr_dataset.id)
    predictions = predict_job.get_result_when_complete()
    batch_df[pred_col_name] = predictions['positive_probability']
    # Upload production data with preds to the RIME cluster
    rime_incremental_path = upload_dataset_file(rime_client, batch_df, split_name)
    incremental_config = {
        "eval_path": rime_incremental_path,
        "timestamp_col": TIMESTAMP_COL
    }
    ct_job = firewall.start_continuous_test(test_run_config=incremental_config, disable_firewall_events=False)
    ct_job.get_status(verbose=True, wait_until_finish=True)

Wait for a couple minutes and your results will appear in the UI.

Firewall Overviewļƒ

[ ]:
firewall

The Overview page is the mission control for your modelā€™s production deployment health. In it, you can see the status of firewall events, get notified when model performance degrades, and see the underlying causes of failure.

Firewall CT Resultsļƒ

The AI Firewallā€™s Continuous Tests operate at the batch level and provide a mechanism to monitor the health of ML deployments in production. They allow the user to understand when errors begin to occur and surface the underlying drivers of such errors.

You can explore the results in the UI by running the below cell and redirecting to the generated link.

Analyzing CT Resultsļƒ

Navigate to the ā€œContinuous Testsā€ tab to explore various test metrics over the past month. This view captures some important insights, such as:

  • Abnormality rate increases - By changing the ā€œMetricā€ to ā€œAbnormality Rateā€, we can see that very few abnormal inputs are seen at the outset of the month. By monthā€™s end, the abnormality rate has shot up to 28.6%

  • Prediction Percentiles dropped over time - By changing the metric to ā€œPrediciton Percentielsā€, we can see that the 50% at the beginning of the month was 0.7, but it dropped to 0.538 by monthā€™s end.

  • Prediction drift increases over time When evaluating the Prediction Drift, we can see it has increased over time. On 08/01, when the model was deployed, PSI was 0.54. By 08/29, PSI had increased to 2.17.

Firewall Realtime Events Resultsļƒ

The AI Firewallā€™s Realtime Events tab operates at the datapoint-level to simultaneously alert and protect your model against issues in real-time. It tracks and surfaces the datapoints that were flagged when entering the AI system.

We can see that for the flagged datapoints in the latest time bucket, the ā€œbrowser_versionā€ feature is failing. We may dig in further by clicking on a flagged row to see the reason it was flagged. In this case, there was a new feature value for ā€œbrowser_versionā€ that the model had not encountered during the model development phase.

Whatā€™s Next?ļƒ

Try RIME on your own data and models. You can use this tutorial to help you started!