Using a Data Collector for Scheduled Continuous Testing
In this Notebook walkthrough, we will show how to create an AI Firewall, log points to a data collector, and schedule automatic tests with it. The Data Collector can be used to log datapoints in a production setting and batch setting, which can be coupled with a a scheduler to automatically run continuous tests on data, with no extra configuration. We will also show you how to kick off manual testing runs with the collector.
Latest Colab version of this notebook available here
Create a Firewall and Schedule Tests After specifying the required credentials, run the cells below to install libraries, run stress testing, and deploy an AI Firewall with Data Collector. For a more fine-grained walkthrough of this code, please reference our Fraud Onboarding Walkthrough.
[ ]:
API_TOKEN = '' # PASTE API_KEY
CLUSTER_URL = '' # PASTE DEDICATED DOMAIN OF RIME SERVICE (eg: rime.stable.rbst.io)
[ ]:
# See the Fraud Onboarding Walkthrough for more information on the following code
[ ]:
!pip install rime-sdk &> /dev/null
!pip install https://github.com/RobustIntelligence/ri-public-examples/archive/master.zip
[ ]:
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import List
import pandas as pd
from ri_public_examples.download_files import download_files
from rime_sdk import Client
[ ]:
rime_client = Client(CLUSTER_URL, API_TOKEN)
[ ]:
description = (
"Create an AI Firewall, log data directly to the Robust Intelligence platform,"
" and schedule automated Continuous Testing of your deployed model. All using"
" the RIME SDK. Demonstration uses a tabular binary classification dataset"
" and model that simulates credit card fraud detection."
)
project = rime_client.create_project(
name='AI Firewall Data Collector Demo',
description=description,
)
[ ]:
# Fetch example data
download_files('tabular/fraud', 'fraud')
[ ]:
# Upload data to cluster storage
upload_path = "ri_public_examples_fraud"
rime_training_path = rime_client.upload_file(Path('fraud/data/fraud_ref.csv'), upload_path=upload_path)
rime_testing_path = rime_client.upload_file(Path('fraud/data/fraud_eval.csv'), upload_path=upload_path)
fraud_model_path = rime_client.upload_directory(Path('fraud/models'), upload_path=upload_path)
fraud_model_path = fraud_model_path + "/fraud_model.py"
[ ]:
# Run Stress Tests
stress_test_config = {
"run_name": "Fraud Model",
"data_info": {
"pred_col": "preds",
"label_col": "label",
"ref_path": rime_training_path,
"eval_path": rime_testing_path
},
"model_info": {
"path": fraud_model_path
},
"model_task":"Binary Classification"
}
stress_job = rime_client.start_stress_test(test_run_config=stress_test_config, project_id=project.project_id)
stress_job.get_status(verbose=True, wait_until_finish=True)
stress_test_run = stress_job.get_test_run()
# You can view the test run results in the provided link
stress_test_run
Upload Continuous Testing Data to the Data Collector
The following function simulates a batch upload of production data to the data collector in a real time setting.
[ ]:
from rime_sdk.data_collector import DataCollector
# Load the data to be used to simulate multiple days of production
continuous_testing_data = pd.read_csv("fraud/data/fraud_incremental.csv")
def log_datapoints(
data_collector: DataCollector, df: pd.DataFrame, pred_col: str, label_col: str, timestamp_col: str, keep_timestamps: bool = False
) -> None:
"""Log Dataframe to Data Collector"""
preds = list(df[pred_col])
labels = list(df[label_col])
timestamps = None
if keep_timestamps:
timestamps = list(df[timestamp_col])
df_dict = df.drop(columns=[pred_col, label_col, timestamp_col]).to_dict("records")
data_collector.log_datapoints(
df_dict, preds=preds, labels=labels, timestamps=timestamps
)
You can see the current Firewall status in the UI by clicking the link below. Events will populate once you run the subsequent cell.
[ ]:
# Create an AI Firewall
firewall = project.create_firewall(name="Data Collector Scheduled CT Firewall", bin_size="hour", test_run_id=stress_test_run.test_run_id)
firewall
Activate the Schedule and Use the Data Collector
The scheduler can be configured to run according to the bin size of the data - every hour, every day, etc. Configure a schedule associated with the firewall, to run with data from the data collector. Then get the data collector and start logging points. We will be attaching real-time timestamps to these datapoints to simulate a production setting. This is done by default in the data collector if no timestamps are attached.
[ ]:
# Activate a Schedule for the Data Collector
firewall.activate_ct_schedule(location_type="data_collector")
[ ]:
# Get the Data Collector
collector = firewall.get_data_collector()
[ ]:
PRED_COL = "preds"
LABEL_COL = "label"
TIMESTAMP_COL = "timestamp"
# Log datapoints to the collector
log_datapoints(collector, continuous_testing_data, pred_col=PRED_COL, label_col=LABEL_COL, timestamp_col=TIMESTAMP_COL)
Now that you’ve logged datapoints to the data collector, the scheduler should display results within an hour.
Create a Firewall and Test Data Manually
You can also use the data collector in manual Continuous Testing runs without depending on the scheduler
[ ]:
project = rime_client.create_project(name='Firewall Data Collector Manual Configuration Demo', description='This is an Onboarding Demo')
[ ]:
stress_job = rime_client.start_stress_test(test_run_config=stress_test_config, project_id=project.project_id)
stress_job.get_status(verbose=True, wait_until_finish=True)
stress_test_run = stress_job.get_test_run()
[ ]:
# Create an AI Firewall
firewall = project.create_firewall(name="Data Collector Manual Firewall", bin_size="day", test_run_id=stress_test_run.test_run_id)
firewall
For this example, we will keep the original timestamps associated with the data. The Data Collector has a TTL of 1 year, so we will need to adjust the timestamps first.
[ ]:
import datetime
continuous_testing_data = pd.read_csv("fraud/data/fraud_incremental.csv")
curr_year = datetime.datetime.now().year
continuous_testing_data["timestamp"] = continuous_testing_data["timestamp"].str.replace('2018',f"{curr_year}")
[ ]:
time_format = "%Y-%m-%d"
earliest_timestamp = datetime.datetime.strptime(continuous_testing_data["timestamp"].min(), time_format)
latest_timestamp = datetime.datetime.strptime(continuous_testing_data["timestamp"].max(), time_format)
[ ]:
# Get the Data Collector
collector = firewall.get_data_collector()
[ ]:
PRED_COL = "preds"
LABEL_COL = "label"
TIMESTAMP_COL = "timestamp"
# Log datapoints to the collector
log_datapoints(collector, continuous_testing_data, pred_col=PRED_COL, label_col=LABEL_COL, timestamp_col=TIMESTAMP_COL, keep_timestamps=True)
Let’s run a continuous test from 08/01 to 08/15. The continuous testing config expects start time and end time to be in unix seconds, so we need to do a quick conversion below and start the test. You can review the results in the UI below.
[ ]:
request_end_date = datetime.datetime(curr_year, 8, 15)
incremental_config = {
"eval_data_info": {
"type": "data_collector",
"start_time": int(earliest_timestamp.timestamp()),
"end_time": int(request_end_date.timestamp())
}
}
ct_job = firewall.start_continuous_test(incremental_config)
ct_job.get_status(verbose=True, wait_until_finish=True)
firewall
Update Reference Set
Now suppose that we have updated your model by retraining on new data from the first half of August. We want to update our deployed Firewall to reflect the new reference dataset.
However, it’s even easier to adapt all of the tests and configuration parameters by updating the firewall based on the data stored on the collector during that time period.
[ ]:
# Update the reference set by setting the reference time period to the first have of august
firewall.update_scheduled_ct_info(location_type="data_collector", reference_set_time_bin=(earliest_timestamp, request_end_date))
[ ]:
# We can see that the 'refDataInfo', which is the reference set now contains the reference period we set it to
# in unix seconds
firewall.get_config()
Let’s kick off another continuous test run for the second half of August, now that we’ve changed our baseline data
[ ]:
# Set the new request start date to 08/16
new_start_date = request_end_date + datetime.timedelta(days=1)
new_incremental_config = {
"eval_data_info": {
"type": "data_collector",
"start_time": int(new_start_date.timestamp()),
"end_time": int(latest_timestamp.timestamp())
}
}
ct_job = firewall.start_continuous_test(new_incremental_config)
ct_job.get_status(verbose=True, wait_until_finish=True)
firewall