Scheduled Continuous Testing

Instantiate RIME client and create a new project

Let’s begin by creating a new client connection to our Robust Intelligence cluster.

[ ]:
from rime_sdk import Client

API_TOKEN = '' # PASTE API_KEY
CLUSTER_URL = '' # PASTE DEDICATED DOMAIN OF RIME SERVICE (eg: rime.stable.rbst.io)
AGENT_ID = '' # PASTE AGENT_ID IF USING AN AGENT THAT IS NOT THE DEFAULT

client = Client(CLUSTER_URL, API_TOKEN)
[ ]:
description = (
    "Create a Continuous Test and set up scheduling."
    " Demonstration uses a tabular binary classification dataset"
    " and model that simulates credit card fraud detection."
)
project = client.create_project(
    "Scheduled Continuous Testing Demo",
    description,
    "MODEL_TASK_BINARY_CLASSIFICATION"
)
[ ]:
project_id = project.project_id

# You can always retrieve a project when re-running the notebook by using its id
project = client.get_project(project_id)

Create a custom loader function to retrieve data from S3

There are many ways for Continuous Testing (CT) to load in your data. In this example, we’ll write a Python function to generate data and store the function on S3.

The Python function will take in the following parameters:

  • start_time: Required for scheduled CT. This is needed to grab the proper data based on time.

  • end_time: Required for scheduled CT. This is needed to grab the proper data based on time.

  • ref_data: Custom parameter for this function (you can add your own custom parameters and let RI know about them once you register your dataset). Returns reference or evaluation dataset for this example notebook based on the value of the boolean.

  • preds_only: Custom parameter for this function (you can add your own custom parameters and let Robust Intelligence know about them once you register your dataset). Returns the actual data or just the predictions for this example notebook based on the value of the boolean.

We’ll put this function into a .py file and upload it to an S3 bucket. Then, later in the notebook, we will set up our project to download and call this function to generate data to use with CT runs.

[ ]:
"""Custom loader for fraud datasets from S3."""
from datetime import datetime

import pandas as pd
import subprocess
import sys

def custom_data_loader_func(
    start_time: datetime, # Note: Required for scheduled CT
    end_time: datetime, # Note: Required for scheduled CT
    ref_data: bool,
    preds_only: bool,
) -> pd.DataFrame:
    # Load the data
    subprocess.check_call([sys.executable, "-m", "pip", "install", "https://github.com/RobustIntelligence/ri-public-examples/archive/master.zip"])

    from ri_public_examples.download_files import download_files
    download_files('tabular-2.0/fraud', 'fraud')

    ct_data = (pd.read_csv("fraud/data/fraud_incremental_preds.csv")
               if preds_only
               else pd.read_csv("fraud/data/fraud_incremental.csv"))

    if ref_data:
        return ct_data[:len(ct_data)//2]
    else:
        return ct_data[len(ct_data)//2:]

Create an integration to S3

We need to set up an S3 integration with credential information to be able to properly download the data loading Python file from our S3 bucket. You can find additional information on how to create integrations here.

[ ]:
WORKSPACE_ID = '' # PASTE WORKSPACE_ID
AWS_ACCESS_KEY_ID = '' # PASTE AWS ACCESS KEY ID WITH ACCESS TO YOUR S3 BUCKET
AWS_SECRET_ACCESS_KEY = '' # PASTE AWS SECRET ACCESS KEY WITH ACCESS TO YOUR S3 BUCKET

S3_INTEGRATION_ID = client.create_integration(
    workspace_id=WORKSPACE_ID,
    name="s3 integration",
    integration_type="INTEGRATION_TYPE_AWS_ACCESS_KEY",
    integration_schema=[
        {
            "name": "AWS_ACCESS_KEY_ID",
            "sensitivity": "VARIABLE_SENSITIVITY_PUBLIC",
            "value": AWS_ACCESS_KEY_ID,
        },
        {
            "name": "AWS_SECRET_ACCESS_KEY",
            "sensitivity": "VARIABLE_SENSITIVITY_WORKSPACE_SECRET",
            "value": AWS_SECRET_ACCESS_KEY,
        },
    ],
)

Register datasets with the project

To start a CT run, we need to register our model (optional), datasets and predictions (required if model is not specified) with the project. For dataset registration, we need to specify:

  • the S3 path to the Python file

  • the name of the data loading function in the Python file

  • the list of custom parameters required by the function (e.g., ref_data, preds_only)

[ ]:
dt = str(datetime.now())

# Note: models and datasets need to have unique names.
model_id = project.register_model(f"fraud_model_{dt}", None, agent_id=AGENT_ID)

ref_data_id = project.register_dataset(
    name=f"fraud_reference_dataset{dt}",
    data_config={
        "connection_info": {"data_loading": {
            "path": "s3://pranay-scheduled-ct-test/s3_fraud_custom_loader.py",
            "load_func_name": "custom_data_loader_func",
            "loader_kwargs_json": "{\"ref_data\": true, \"preds_only\": false}",
        }},
        "data_params": {
            "label_col": "label",
            "timestamp_col": "timestamp",
        },
    },
    integration_id=S3_INTEGRATION_ID,
    agent_id=AGENT_ID,
)

project.register_predictions(
    dataset_id=ref_data_id,
    model_id=model_id,
    pred_config={
        "connection_info": {"data_loading": {
            "path": "s3://pranay-scheduled-ct-test/s3_fraud_custom_loader.py",
            "load_func_name": "custom_data_loader_func",
            "loader_kwargs_json": "{\"ref_data\": true, \"preds_only\": true}",
        }},
        "pred_params": {
            "pred_col": "preds",
        },
    },
    integration_id=S3_INTEGRATION_ID,
    agent_id=AGENT_ID,
)
[ ]:
eval_data_id = project.register_dataset(
    name=f"fraud_evaluation_dataset{dt}",
    data_config={
        "connection_info": {"data_loading": {
            "path": "s3://pranay-scheduled-ct-test/s3_fraud_custom_loader.py",
            "load_func_name": "custom_data_loader_func",
            "loader_kwargs_json": "{\"ref_data\": false, \"preds_only\": false}",
        }},
        "data_params": {
            "label_col": "label",
            "timestamp_col": "timestamp",
        },
    },
    integration_id=S3_INTEGRATION_ID,
    agent_id=AGENT_ID,
)

project.register_predictions(
    dataset_id=eval_data_id,
    model_id=model_id,
    pred_config={
        "connection_info": {"data_loading": {
            "path": "s3://pranay-scheduled-ct-test/s3_fraud_custom_loader.py",
            "load_func_name": "custom_data_loader_func",
            "loader_kwargs_json": "{\"ref_data\": false, \"preds_only\": true}",
        }},
        "pred_params": {
            "pred_col": "preds",
        },
    },
    integration_id=S3_INTEGRATION_ID,
    agent_id=AGENT_ID,
)

Test everything out with a manual CT run

We don’t want to wait until a scheduled CT run in the future to surface small mistakes (e.g., a typo in the S3 path). We can create a new CT instance and start a manual run to verify that our configuration works as intended before activating scheduling.

NOTE: Data needs to exist and be returned for every time bin. If there is no data for a bin, an error will be returned.

[ ]:
from datetime import timedelta

ct = project.create_ct(model_id, ref_data_id, bin_size=timedelta(days=1))

# If you are re-running this notebook and already have created CT for the project, you can retrieve and update the CT object.
# ct = project.get_ct()
# ct.update_ct(ref_data_id=ref_data_id)
[ ]:
ct_job = ct.start_continuous_test(eval_data_id, override_existing_bins=True, agent_id=AGENT_ID)
ct_job.get_status(verbose=True, wait_until_finish=True)

Activate Scheduled Continuous Testing

After verifying that CT can successfully load in our data, run tests and push the results to the Robust Intelligence UI, we are ready to activate scheduled CT.

[ ]:
ct.activate_ct_scheduling(
    data_info={
        "connection_info": {"data_loading": {
            "path": "s3://pranay-scheduled-ct-test/s3_fraud_custom_loader_scheduled_ct.py",
            "load_func_name": "custom_data_loader_func",
            "loader_kwargs_json": "{\"ref_data\": false, \"preds_only\": false}",
        }},
        "data_params": {
            "label_col": "label",
            "timestamp_col": "timestamp",
        },
    },
    data_integration_id=S3_INTEGRATION_ID,
    pred_integration_id=S3_INTEGRATION_ID,
    pred_info={
        "connection_info": {"data_loading": {
            "path": "s3://pranay-scheduled-ct-test/s3_fraud_custom_loader_scheduled_ct.py",
            "load_func_name": "custom_data_loader_func",
            "loader_kwargs_json": "{\"ref_data\": false, \"preds_only\": true}",
        }},
        "pred_params": {
            "pred_col": "preds",
        },
    },
)

Some additional methods to work with Scheduled CT

# Find the schedule for CT
ct.get_scheduled_ct_info()

# Deactivate the schedule for CT
ct.deactivate_ct_scheduling()

# Update the reference dataset for a scheduled Continuous Test
ct.update_ct(ref_data_id=new_ref_data_id)