RI Fraud Classification Walkthrough
You are a data scientist at a Payment Processing Company. The data science team has been tasked with implementing a Fraud Detection model and monitoring how that model performs over time. The performance of this fraud detection model directly impacts the costs of the company. In order to ensure the data science team develops the best model and the performance of this model doesn’t degrade over time, the VP of Data Science purchases the RIME platform.
In this Notebook Walkthrough, we will walkthrough 2 of RIME’s core products - AI Stress Testing and AI Continuous Testing.
AI Stress Testing is used in the model development stage. Using AI Stress Testing you can test the developed model. RIME goes beyond simply optimizing for basic model performance like accuracy and automatically discovers the model’s weaknesses.
AI Continuous Testing is used after the model is deployed in production. Using AI Continuous Testing, you can automate the monitoring, discovery and remediation of issues that occur post-deployment.
Latest Colab version of this notebook available here
Install Dependencies, Import Libraries and Download Data
Run the cell below to install libraries to receive data, install our SDK, and load analysis libraries.
[ ]:
!pip install rime-sdk &> /dev/null
import pandas as pd
from pathlib import Path
from rime_sdk import Client
[ ]:
!pip install https://github.com/RobustIntelligence/ri-public-examples/archive/master.zip
from ri_public_examples.download_files import download_files
download_files('tabular-2.0/fraud', 'fraud')
Establish the RIME Client
To get started, provide the API credentials and the base domain/address of the RIME service. You can generate and copy an API token from the API Access Tokens Page under Workspace settings. For the domian/address of the RIME service, contact your admin.
[ ]:
API_TOKEN = '' # PASTE API_KEY
CLUSTER_URL = '' # PASTE DEDICATED DOMAIN OF RIME SERVICE (eg: rime.stable.rbst.io)
[ ]:
client = Client(CLUSTER_URL, API_TOKEN)
Create a New Project
You can create projects in RIME to organize your test runs. Each project represents a workspace for a given machine learning task. It can contain multiple candidate models, but should only contain one promoted production model.
[ ]:
description = (
"Run Stress Testing, Continuous Testing and AI Firewall on a tabular"
" binary classification model and dataset. Demonstration uses a"
" dataset that simulates credit card fraud detection."
)
project = client.create_project(
name='Tabular Binary Classification Demo',
description=description,
model_task='MODEL_TASK_BINARY_CLASSIFICATION'
)
Go back to the UI to see the new Fraud Demo Project.
Uploading the Model + Datasets
Let’s first take a look at what the dataset looks like:
[ ]:
df = pd.read_csv(Path('fraud/data/fraud_ref.csv'))
df.head()
For this demo, we are going to use a pretrained CatBoostClassifier Model.
The model predicts whether a particular transaction is fraud or not fraud.
The model makes use of the following features -
category
card_type
card_company
transaction_amount
browser_version
city
country
We now want to kick off RIME Stress Tests that will help us evaluate the model in further depth beyond basic performance metrics like accuracy, precision, recall. In order to do this, we will upload this pre-trained model, the reference dataset the model was trained on, and the evaluation dataset the model was evaluated on to an S3 bucket that can be accessed by RIME.
[ ]:
upload_path = "ri_public_examples_fraud"
model_s3_dir = client.upload_directory(
Path('fraud/models'), upload_path=upload_path
)
model_s3_path = model_s3_dir + "/fraud_model.py"
ref_s3_path = client.upload_file(
Path('fraud/data/fraud_ref.csv'), upload_path=upload_path
)
eval_s3_path = client.upload_file(
Path('fraud/data/fraud_eval.csv'), upload_path=upload_path
)
ref_preds_s3_path = client.upload_file(
Path("fraud/data/fraud_ref_preds.csv"), upload_path=upload_path
)
eval_preds_s3_path = client.upload_file(
Path("fraud/data/fraud_eval_preds.csv"), upload_path=upload_path
)
Once the data and model are uploaded to S3, we can register them to RIME. Once they’re registered, we can refer to these resources using their RIME-generated ID’s.
[ ]:
from datetime import datetime
dt = str(datetime.now())
# Note: models and datasets need to have unique names.
model_id = project.register_model_from_path(f"model_{dt}", model_s3_path)
ref_dataset_id = project.register_dataset_from_file(
f"ref_dataset_{dt}", ref_s3_path, data_params={"label_col": "label"}
)
eval_dataset_id = project.register_dataset_from_file(
f"eval_dataset_{dt}", eval_s3_path, data_params={"label_col": "label"}
)
project.register_predictions_from_file(
ref_dataset_id, model_id, ref_preds_s3_path
)
project.register_predictions_from_file(
eval_dataset_id, model_id, eval_preds_s3_path
)
Running a Stress Test
AI Stress Tests allow you to test your data and model before deployment. They are a comprehensive suite of hundreds of tests that automatically identify implicit assumptions and weaknesses of pre-production models. Each stress test is run on a single model and its associated reference and evaluation datasets.
Below is a sample configuration of how to setup and run a RIME Stress Test.
[ ]:
stress_test_config = {
"run_name": "Onboarding Stress Test Run",
"data_info": {
"ref_dataset_id": ref_dataset_id,
"eval_dataset_id": eval_dataset_id,
},
"model_id": model_id
}
stress_job =client.start_stress_test(
stress_test_config, project.project_id
)
stress_job.get_status(verbose=True, wait_until_finish=True)
Wait for a couple minutes and your results will appear in the UI.
Stress Test Results
Stress tests are grouped into categories that measure various aspects of model robustness (model behavior, distribution drift, abnormal input, transformations, adversarial attacks, data cleanliness). Suggestions to improve your model are aggregated on the category level as well. Tests are ranked by default by a shared severity metric. Clicking on an individual test surfaces more detailed information.
You can view the detailed results in the UI by running the below cell and redirecting to the generated link. This page shows granular results for a given AI Stress Test run.
[ ]:
test_run = stress_job.get_test_run()
test_run
Analyzing the Results
Below you can see a snapshot of the results.
Here are the results of the Subset Performance tests. These tests can be thought as more detailed performance tests that identify subsets of underperformance. These tests help ensure that the model works equally well across different groups.
Below we are exploring the “Subset Recall” test cases for the feature “City”. We can see that even though the model has a Recall of 0.81, it performs poorly on certain cities like Sao Paolo and Rio De Janeiro.
Programmatically Querying the Results
RIME not only provides you with an intuitive UI to visualize and explore these results, but also allows you to programmatically query these results. This allows customers to integrate with their MLOps pipeline, log results to experiment management tools like MLFlow, bring automated decision making to their ML practicies, or store these results for future references.
Run the below cell to programmatically query the results. The results are outputed as a pandas dataframe.
Access results at the a test run overview level
[ ]:
test_run_result = test_run.get_result_df()
test_run_result
Access detailed test results at each individual test cases level.
[ ]:
test_case_result = test_run.get_test_cases_df()
test_case_result.head()
Deploy to Production and Create the AI Firewall
Once you have identified the best stress test run, you can deploy the associated model and set up a RIME Firewall to run Continuous Testing in order to automatically detect “bad” incoming data and statistically significant distributional drift.
[ ]:
from datetime import timedelta
firewall = project.create_firewall(model_id, ref_dataset_id, timedelta(days=1))
Uploading a Batch of Production Data & Model Predictions to Firewall
The fraud detection model has been in production for 30 days. Production data and model predictions have been collected and stored for the past 30 days. Now, we will use Firewall to track how the model performed across the last 30 days.
Upload the Latest Batch of Production Data
[ ]:
dt = str(datetime.now())
prod_s3_path = client.upload_file(
Path('fraud/data/fraud_incremental.csv'),
upload_path=upload_path
)
prod_dataset_id = project.register_dataset_from_file(
f"prod_dataset_{dt}",
prod_s3_path,
data_params={"label_col": "label", "timestamp_col": "timestamp"}
)
prod_preds_s3_path = client.upload_file(
Path('fraud/data/fraud_incremental_preds.csv'),
upload_path=upload_path
)
project.register_predictions_from_file(
prod_dataset_id, model_id, prod_preds_s3_path
)
Run Continuous Testing over Batch of Data
[ ]:
ct_job = firewall.start_continuous_test(prod_dataset_id)
ct_job.get_status(verbose=True, wait_until_finish=True)
firewall
Wait for a couple minutes and your results will appear in the UI.
Querying Results from the Firewall
After a firewall has been created and data has been uploaded for processing, the user can query the results throughout the entire uploaded history.
Obtain All Detection Events
[ ]:
events = [d.to_dict() for m in firewall.list_monitors() for d in m.list_detected_events()]
events_df = pd.DataFrame(events).drop(["id", "project_id", "firewall_id", "event_object_id", "description_html", "last_update_time"], axis=1)
events_df.head()
Firewall Overview
The Overview page is the mission control for your model’s production deployment health. In it, you can see the status of firewall events, get notified when model performance degrades, and see the underlying causes of failure.
Firewall CT Results
The AI Firewall’s Continuous Tests operate at the batch level and provide a mechanism to monitor the health of ML deployments in production. They allow the user to understand when errors begin to occur and surface the underlying drivers of such errors.
You can explore the results in the UI by running the below cell and redirecting to the generated link.
[ ]:
firewall
Analyzing CT Results
Model performance decreases over time - In the below image, we can see that the Recall (model performance) has decreased over time from when the model was first deployed. On 08/01 when the model was deployed, Recall was 0.927. By 08/30, Recall had dropped to 0.668
Prediction drift increases over time In the below image, we can see that the Prediction Drift has increased over time from when the model was first deployed. On 08/01 when the model was deployed, PSI was 0.119. By 08/30, PSI had increased to 1.96.
Appendix
Uploading a Model to RIME
To be able to run certain tests, RIME needs query access to your model. To give RIME access, you’ll need to write a Python file that implements the predict_df(df: pd.DataFrame) -> np.ndarray
function, and upload that file (and any objects that it loads) to the platform. Here we provide an example model file, show you how to upload this file and the relevant model artifacts, and show you how to configure stress tests to use this model.
[ ]:
%%writefile fraud/models/fraud_model.py
from pathlib import Path
from catboost import CatBoostClassifier
import numpy as np
import pandas as pd
mod = CatBoostClassifier()
# Load our CatBoost model. Note that this requires us to
# upload fraud_model.catb along with this Python file,
# in the same directory.
mod.load_model(Path(__file__).parent / "fraud_model.catb")
cat_cols = [
'category',
'card_type',
'card_company',
'city',
'browser_version',
'country',
]
def predict_df(df: pd.DataFrame) -> np.ndarray:
"""Given data as a pandas DataFrame, return probabilities as a NumPy array."""
for col in cat_cols:
df[col] = df[col].astype(object)
# For binary classification we expect a one-dimensional
# array for the probabilities, where the score for each datapoint
# is the probability that the label is 1.
return mod.predict_proba(df.fillna(0))[:,1]
[ ]:
# Note that 'fraud/models' directory already contains the 'fraud_model.catb' file.
appendix_model_dir = client.upload_directory(
Path('fraud/models'), upload_path=upload_path
)
appendix_model_path = appendix_model_dir + "/fraud_model.py"
appendix_model_id = project.register_model_from_path(
f"appendix_model_{dt}", appendix_model_path
)
stress_test_with_model_config = {
"run_name": "Uploaded Model Example",
"data_info": {
"ref_dataset_id": ref_dataset_id,
"eval_dataset_id": eval_dataset_id,
},
"model_id": appendix_model_id
}
stress_job = client.start_stress_test(
stress_test_with_model_config, project.project_id
)
stress_job.get_status(verbose=True, wait_until_finish=True)
Running a Custom Test
With RIME you can write your own custom tests, to encode any domain-specific validation you want to perform. These tests will run and be uploaded to the platform just like any of the built-in tests. To run a custom test you need to implement a specific interface in Python, upload the file to the platform, and point to it in your configuration. Below we provide a simple example of a custom test that checks if the difference in the length of the reference and evaluation datasets does not exceed some threshold.
[ ]:
%%writefile fraud/custom_test.py
"""Custom test batch runner."""
from typing import List, Tuple
from rime.core.schema import (
CustomConfig,
ImportanceLevel,
Status,
TableColumn,
TestBatchResult,
TestOutput,
)
from rime.core.test import BaseTest, TestExtraInfo
from rime.core.stress_tests.batch_runner import DataTestBatchRunner
from rime.core.stress_tests.schema.result import DataTestBatchResult, DataTestOutput
from rime.core.profiler.run_containers import RunContainer
# Signature should not be changed.
class CustomTest(BaseTest):
def __init__(self, delta: int = 0):
"""Initialize with a delta between n_rows ref and eval."""
super().__init__()
self.delta = delta
# Signature should not be changed.
def run(
self,
run_container: RunContainer,
silent_errors: bool = False
) -> Tuple[TestOutput, TestExtraInfo]:
ref_data_size = len(run_container.ref_data.df)
eval_data_size = len(run_container.eval_data.df)
if ref_data_size > eval_data_size + self.delta:
status = Status.WARNING
severity = ImportanceLevel.HIGH
else:
status = Status.PASS
severity = ImportanceLevel.NONE
test_output = DataTestOutput(
self.id, status, {"Severity": severity}, severity, [],
)
return test_output, TestExtraInfo(severity)
# Signature should not be changed.
class CustomBatchRunner(DataTestBatchRunner):
"""TestBatchRunner for the CustomTest."""
# Signature should not be changed.
@classmethod
def _from_config(
cls, run_container: RunContainer, config: CustomConfig, category: str
) -> "DataTestBatchRunner":
if config.params is None:
delta = 0
else:
delta = config.params["delta"]
tests = [CustomTest(delta=delta)]
return cls(tests, category)
# Signature should not be changed.
def _outputs_to_batch_res(
self,
run_container: RunContainer,
outputs: List[DataTestOutput],
extra_infos: List[dict],
duration: float,
return_extra_infos: bool,
) -> TestBatchResult:
long_description_tabs = [
{"title": "Description", "contents": self.long_description},
{"title": "Why it Matters", "contents": "Explain why this test matters."},
{
"title": "Configuration",
"contents": "Explain how this test is configured."
},
{
"title": "Example",
"contents": "Include an example of how this test works."
},
]
return DataTestBatchResult(
self.type,
self.description,
long_description_tabs,
self.category,
outputs,
[],
duration,
extra_infos,
[TableColumn("Severity")],
outputs[0].severity,
show_in_test_comparisons=False,
)
# Signature should not be changed.
@property
def description(self) -> str:
return "This is custom test"
# Signature should not be changed.
@property
def long_description(self) -> str:
return "This is a long description of a custom test."
# Signature should not be changed.
@property
def type(self) -> str:
return "Example Custom Test"
[ ]:
custom_test_path = client.upload_file("fraud/custom_test.py", upload_path=upload_path)
[ ]:
stress_test_with_model_config = {
"run_name": "Custom Test Example",
"data_info": {
"ref_dataset_id": ref_dataset_id,
"eval_dataset_id": eval_dataset_id,
},
"test_suite_config": {
"custom_tests": [
{
"custom_test_category": "Data Cleanliness",
"test_path": custom_test_path
}
],
},
"model_id": model_id
}
stress_job = client.start_stress_test(
stress_test_with_model_config, project.project_id
)
stress_job.get_status(verbose=True, wait_until_finish=True)
[ ]:
stress_job.get_test_run()