Robust Intelligence SageMaker Integration Walkthroughļ
You are a data scientist at a Payment Processing Company. The data science team has been tasked with implementing a Fraud Detection service and monitoring how that model performs over time. The performance of this fraud detection model directly impacts the costs of the company. In order to ensure the data science team develops the best model and the performance of this model doesnāt degrade over time, the VP of Data Science purchases the RIME platform.
Your team currently does all model development and serving on SageMakerās intelligent cloud and already uses all of its MLOps tooling.
In this Notebook Walkthrough, we will walkthrough 2 of RIMEās core products - AI Stress Testing and AI Firewall and demonstrate how they integrate with SageMakerās core offerings to help you develop and maintain more robust AI models.
AI Stress Testing is used in the model development stage. Using AI Stress Testing you can test the developed model. RIME goes beyond simply optimizing for basic model performance like accuracy and automatically discovers the modelās weaknesses.
AI Firewall is used after the model is deployed in production. Using AI Firewall, you can automate the monitoring, discovery and remediation of issues that occur post-deployment. Additionally it automatically flags, blocks, or imputes erroneous data in real-time.
Latest Colab version of this notebook available here
[ ]:
%pip install rime-sdk &> /dev/null
%pip install catboost
%pip install ipython &> /dev/null
%pip install https://github.com/RobustIntelligence/ri-public-examples/archive/master.zip &> /dev/null
%mkdir -p model_training
%mkdir -p trained_models
[ ]:
# Install dependencies and download example data
import json
import os
import pickle
import shutil
import tarfile
import warnings
from pathlib import Path
from tempfile import TemporaryDirectory
from time import gmtime, strftime
from typing import Any, List, Optional, Tuple, Union
import boto3
import botocore
import joblib
import numpy as np
import pandas as pd
import sagemaker
from ri_public_examples.download_files import download_files
from sagemaker import ModelPackage, get_execution_role, image_uris
from sagemaker.estimator import Estimator
from sagemaker.session import ClientError, Session
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.metrics import (accuracy_score, f1_score, precision_score,
recall_score, roc_auc_score)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from tqdm.notebook import tqdm
warnings.filterwarnings(action='ignore', category=UserWarning, module='sklearn')
sagemaker_session = Session()
sm_client = sagemaker_session.boto_session.client("sagemaker")
Define S3 Helper Functions and Prepare Dataļ
Before developing our model, weāll define a few utility functions to help download and prepare the example dataset.
[ ]:
# S3 / File Utilities
def s3_file_exists(file: Union[str, Path], bucket: Optional[str] = None) -> bool:
bucket = bucket or sagemaker_session.default_bucket()
s3 = boto3.resource('s3')
try:
s3.Object(bucket, str(file)).load()
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
# The object does not exist.
return False
else:
# Something else has gone wrong.
raise
return True
def get_s3_path(relative_path: Union[str, Path], bucket: Optional[str] = None) -> str:
"""Return the s3 bucket of a file."""
bucket = bucket or sagemaker_session.default_bucket()
bucket_uri = f"s3://{bucket}/" + str(relative_path)
return bucket_uri
def populate_s3_file(source_uri: str, file: Union[str, Path], bucket: Optional[str] = None) -> bool:
"""Populate the s3 path."""
bucket_uri = get_s3_path(file, bucket)
pd.read_csv(source_uri).to_csv(bucket_uri, index=False)
def download_s3_file(object_name: str, output_path: str, bucket: Optional[str] = None) -> bool:
"""Upload a local directory."""
bucket = bucket or sagemaker_session.default_bucket()
s3 = boto3.resource('s3')
bucket_obj = s3.Bucket(bucket)
bucket_obj.download_file(object_name, output_path)
def make_tarfile(output_filename: str, source_dir: str):
"""Create a tarfile of the specified source dir."""
with tarfile.open(output_filename, "w:gz") as tar:
tar.add(source_dir, arcname=Path(source_dir).parent.name)
def create_tarball_then_upload_to_s3(local_dir: str, target_path: str, bucket: Optional[str] = None) -> bool:
"""Convert local directory to a *.tar.gz ball then upload to the specified bucket and target path."""
bucket = bucket or sagemaker_session.default_bucket()
s3 = boto3.resource('s3')
bucket_obj = s3.Bucket(bucket)
with TemporaryDirectory() as tmp_dir:
make_tarfile(os.path.join(tmp_dir, "code.tar.gz"), local_dir)
bucket_obj.upload_file(os.path.join(tmp_dir, "code.tar.gz"), str(Path(target_path) / "code.tar.gz"))
# Download the example data, then upload to S3 for training + evaluation
data_dir = Path("fraud_data")
path_dict = {
"ref": data_dir / "ref.csv",
"eval": data_dir / "eval.csv",
"incremental": data_dir / "incremental.csv",
}
download_files("tabular/fraud", "fraud_data")
for ds_name, path in path_dict.items():
if not s3_file_exists(path):
populate_s3_file(
f"fraud_data/data/fraud_{ds_name}.csv", path
)
Create SageMaker Training Filesļ
SageMaker Estimators require a training script. In our case, the image requires a couple additional dependencies, which will be uploaded in the tarball in a requirements.txt
file.
[ ]:
%%writefile model_training/requirements.txt
catboost
sklearn
[ ]:
%%writefile model_training/train.py
"""Training and serving file."""
import argparse
import json
import os
import pickle
from typing import Any, List, Tuple
import joblib
import numpy as np
import pandas as pd
from catboost import CatBoostClassifier
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.metrics import (accuracy_score, f1_score, precision_score,
recall_score, roc_auc_score)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
def extract_feature_types(
df: pd.DataFrame, label_col: str
) -> Tuple[List[str], List[str]]:
"""Extract categorical and continuous features (if not provided)."""
dtypes = df.dtypes
dtypes = dtypes[~dtypes.index.isin([label_col])]
cat_features = list(dtypes[dtypes == object].index)
cont_features = list(dtypes[dtypes != object].index)
return cat_features, cont_features
def preprocess_df(
df: pd.DataFrame, cat_features: List[str], cont_features: List[str]
) -> pd.DataFrame:
"""Apply string preprocessing to categorical features."""
cat_df = df[cat_features].astype(str)
cont_df = df[cont_features]
return pd.concat([cat_df, cont_df], axis=1)
def _get_eval_metrics(actual: np.ndarray, pred: np.ndarray):
"""Metrics to for train and test to report in our training loop."""
acc = accuracy_score(actual, pred)
f1 = f1_score(actual, pred)
prec = precision_score(actual, pred)
recall = recall_score(actual, pred)
auc = roc_auc_score(actual, pred)
return acc, f1, prec, recall, auc
def _get_pipeline(net_type: str, cat_features: List[str]) -> Pipeline:
"""Return pipeline for specified net_type."""
if net_type == "catboost-classifier":
pipeline = Pipeline(
[("clf", CatBoostClassifier(cat_features=cat_features, verbose=False))]
)
elif net_type == "sgd-classifier":
pipeline = Pipeline(
[
("impute", SimpleImputer(strategy="most_frequent")),
("ohe", OneHotEncoder(sparse=False, handle_unknown="ignore")),
("normalizer", StandardScaler()),
("clf", SGDClassifier(loss="modified_huber")),
]
)
elif net_type == "logistic-regression-classifier":
pipeline = Pipeline(
[
("impute", SimpleImputer(strategy="most_frequent")),
("ohe", OneHotEncoder(sparse=False, handle_unknown="ignore")),
("normalizer", StandardScaler()),
("clf", LogisticRegression()),
]
)
else:
raise ValueError(f"Unsupported net_type {net_type}")
return pipeline
def train_net(
net_type: str,
train_x: pd.DataFrame,
train_y: pd.Series,
test_x: pd.DataFrame,
test_y: pd.Series,
label_col: str,
**kwargs: Any,
) -> Pipeline:
"""Train specified pipeline."""
cat_features, cont_features = extract_feature_types(train_x, label_col)
pipeline = _get_pipeline(net_type, cat_features)
_train_x = preprocess_df(train_x, cat_features, cont_features)
pipeline = pipeline.fit(_train_x, train_y.values.flatten())
_test_x = preprocess_df(test_x, cat_features, cont_features)
preds = pipeline.predict(_test_x)
(acc, f1, prec, recall, auc) = _get_eval_metrics(test_y, preds)
print(f"Performance metrics for net_type={net_type}: acc={acc}, f1={f1}, prec={prec}, recall={recall}, auc={auc}")
return pipeline
LABEL_COL = "label"
PRED_COL = "pred"
TIMESTAMP_COL = "timestamp"
if __name__ == "__main__":
# Pass in environment variables and hyperparameters
parser = argparse.ArgumentParser()
parser.add_argument("--net-type", type=str, default=os.environ.get("SM_HP_NET_TYPE"))
parser.add_argument("--sm-model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAINING"))
args, _ = parser.parse_known_args()
training_dir = args.train
# Read in data
train = pd.read_csv(training_dir + "/train.csv").drop(columns=["preds"])
test = pd.read_csv(training_dir + "/test.csv").drop(columns=["preds"])
# Get the data to prepare for training
train_x = train.drop([LABEL_COL], axis=1)
test_x = test.drop([LABEL_COL], axis=1)
train_y = train[[LABEL_COL]]
test_y = test[[LABEL_COL]]
cat_features, cont_features = extract_feature_types(train_x, LABEL_COL)
pipeline = train_net(args.net_type, train_x, train_y, test_x, test_y, LABEL_COL)
# Save model
joblib.dump(pipeline, os.path.join(args.sm_model_dir, "model.joblib"))
with open(os.path.join(args.sm_model_dir, "cat_features.pkl"), "wb") as f:
pickle.dump(cat_features, f)
with open(os.path.join(args.sm_model_dir, "cont_features.pkl"), "wb") as f:
pickle.dump(cont_features, f)
# Model serving
def model_fn(model_dir):
"""
Deserialize fitted model
"""
model = joblib.load(os.path.join(model_dir, "model.joblib"))
cat_features = pickle.load(open(os.path.join(model_dir, "cat_features.pkl"), "rb"))
cont_features = pickle.load(open(os.path.join(model_dir, "cont_features.pkl"), "rb"))
return model, cat_features, cont_features
def input_fn(request_body, request_content_type):
"""
input_fn
request_body: The body of the request sent to the model.
request_content_type: (string) specifies the format/variable type of the request
"""
if request_content_type == "application/json":
request_body = json.loads(request_body)
inpVar = request_body["Input"]
return inpVar
else:
raise ValueError("This model only supports application/json input")
def predict_fn(input_data, model_tuple) -> np.ndarray:
"""
predict_fn
input_data: returned array from input_fn above
model (sklearn model) returned model loaded from model_fn above
"""
model, cat_features, cont_features = model_tuple
df = preprocess_df(input_data, cat_features, cont_features)
# Index for binary classification
return model.predict_proba(df)[:, 1]
def output_fn(prediction, content_type):
"""
output_fn
prediction: the returned value from predict_fn above
content_type: the content type the endpoint expects to be returned. Ex: JSON, string
"""
respJSON = {"Output": list(prediction)}
return respJSON
Define SageMaker Training and Model Registry Helpersļ
The following functions are unrelated to RIME. They specify how the SageMaker training jobs will be launched and provide functionality to register the resultant models in the model registry.
[ ]:
def create_estimator(net_type: str, training_instance_type: str = "ml.c4.xlarge") -> Estimator:
"""Create an estimator of the specified net_type."""
role = get_execution_role()
output_location = get_s3_path(f"sagemaker/models/{net_type}/output")
print(f"Training artifacts will be uploaded to: {output_location}")
train_model_id, train_model_version, train_scope = "catboost-classification-model", "*", "training"
train_source_uri = get_s3_path("model_training_tar/code.tar.gz")
train_image_uri = image_uris.retrieve(
region=None,
framework=None,
model_id=train_model_id,
model_version=train_model_version,
image_scope=train_scope,
instance_type=training_instance_type,
)
# hyperparameters are passed to the training script as environment variables
hyperparameters = {'NET_TYPE': net_type}
tabular_estimator = Estimator(
role=role,
image_uri=train_image_uri,
source_dir=train_source_uri,
entry_point="train.py",
instance_count=1,
instance_type=training_instance_type,
max_run=360000,
hyperparameters=hyperparameters,
output_path=output_location,
dependencies=['code/requirements.txt']
)
return tabular_estimator
def create_model_package_group(model_package_group_name = "RIME-Fraud-Models") -> Tuple[str, str]:
"""Create the model package group."""
model_package_group_input_dict = {
"ModelPackageGroupName" : model_package_group_name,
"ModelPackageGroupDescription" : "Group if fraud-detection models developed for the RIME-SageMaker example."
}
try:
create_model_package_group_response = sm_client.describe_model_package_group(ModelPackageGroupName="RIME-Fraud-Models")
except ClientError:
create_model_package_group_response = sm_client.create_model_package_group(**model_package_group_input_dict)
return create_model_package_group_response['ModelPackageGroupArn'], model_package_group_name
def register_model(net_type: str, model_tar_uri: str, image_uri: str, model_package_group_name: str ):
# Specify the model source
modelpackage_inference_specification = {
"InferenceSpecification": {
"Containers": [
{
"Image": image_uri,
"ModelDataUrl": model_tar_uri
}
],
"SupportedContentTypes": [ "text/csv" ],
"SupportedResponseMIMETypes": [ "text/csv" ],
}
}
create_model_package_input_dict = {
"ModelPackageGroupName" : model_package_group_name,
"ModelPackageDescription" : f"Model of type {net_type} to detect fraud",
"ModelApprovalStatus" : "Approved",
"CustomerMetadataProperties": {
"net_type": net_type
}
}
create_model_package_input_dict.update(modelpackage_inference_specification)
create_model_package_response = sm_client.create_model_package(**create_model_package_input_dict)
return create_model_package_response["ModelPackageArn"]
def get_model_tar_uri(estimator: Estimator) -> str:
"""Return the trained model output."""
return estimator.output_path + '/' + estimator.latest_training_job.name + '/output/model.tar.gz'
def train_net(net_type: str, data_dir: str, logs: bool=True, wait: bool = False) -> Estimator:
"""Create and fit an estimator."""
tabular_estimator = create_estimator(net_type)
training_dataset_s3_path = get_s3_path(data_dir)
timestamp_suffix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
training_job_name = f"{net_type}-{timestamp_suffix}"
tabular_estimator.fit(
{"training": training_dataset_s3_path}, logs=logs, wait=wait, job_name=training_job_name
)
return {
'net_type': net_type,
'estimator': tabular_estimator,
}
def attach_to_estimator(estimator_dict: dict) -> dict:
"""Attach to training job and update dict."""
tabular_estimator: Estimator = estimator_dict['estimator']
net_type = estimator_dict['net_type']
tabular_estimator.attach(tabular_estimator.latest_training_job.name)
model_tar_uri = get_model_tar_uri(tabular_estimator)
model_tar_file = model_tar_uri[len(f"s3://{sagemaker_session.default_bucket()}/"):]
model_package_group_arn, model_package_group_name = create_model_package_group()
model_package_arn = register_model(net_type, model_tar_uri, tabular_estimator.image_uri, model_package_group_name)
estimator_dict['model_tar_uri'] = model_tar_uri
estimator_dict['model_tar_file'] = model_tar_file
estimator_dict['model_package_group_arn'] = model_package_group_arn
estimator_dict['model_package_arn'] = model_package_arn
return estimator_dict
Train Modelsļ
Now that weāve uploaded our model and data to S3, we can asynchronously train models to be evaluated by RIME.
[ ]:
# Upload the training scripts to be used by estimator
create_tarball_then_upload_to_s3('model_training', 'model_training_tar')
net_types = ["catboost-classifier", "sgd-classifier", "logistic-regression-classifier"]
estimators = []
for net_type in net_types:
estimators.append(train_net(net_type, str(data_dir), logs=True, wait=False))
# Wait for training to complete, then update model registry
for estimator_dict in estimators:
attach_to_estimator(estimator_dict)
# Download trained models
for estimator_dict in estimators:
training_job_name = estimator_dict['estimator'].latest_training_job.name
os.makedirs(f"trained_models/{training_job_name}", exist_ok=True)
local_model_tar = "trained_models/" + training_job_name + '/model.tar.gz'
download_s3_file(estimator_dict['model_tar_file'], local_model_tar)
estimator_dict['local_model_tar'] = local_model_tar
Connecting to RIMEļ
Now for the fun. To connect to RIME, create an API key and assign to the variable API_KEY
below. You can generate a new API key within the āWorkspace settingsā page on your RIME clusterās website.
Additionally, copy the url of the cluster (e.g., ārime.<cluster-name>.rime.dev
ā) to the RIME_CLUSTER_URL
variable below.
[ ]:
from rime_sdk import Client, Firewall, Job
RIME_CLUSTER_URL = "Add Cluster Here"
API_KEY = "Add API Key Here"
client = Client(RIME_CLUSTER_URL, API_KEY)
project = client.create_project(name='SageMaker Demo', description='Creating an e2e RIME Demo using SageMaker.')
[ ]:
# Define some constants to load / work with the data
# There is no need to alter any of these
LABEL_COL = "label"
PRED_COL = "pred"
TIMESTAMP_COL = "timestamp"
# Get the data to prepare for training
train = pd.read_csv(get_s3_path(path_dict["ref"])).drop(columns=["preds"])
test = pd.read_csv(get_s3_path(path_dict["eval"])).drop(columns=["preds"])
train_x = train.drop([LABEL_COL], axis=1)
test_x = test.drop([LABEL_COL], axis=1)
train_y = train[[LABEL_COL]]
test_y = test[[LABEL_COL]]
Prepare RIME Stress Testing Helper Functionsļ
Below, define the fraud_model.py
file to be uploaded to the RIME testing cluster. Additionally, create helper functions to help: - Upload the model directory to the cluster - Upload a dataset to the cluster
[ ]:
%%writefile fraud_model.py
"""This is an example model.py file."""
import joblib
import pickle
from pathlib import Path
import numpy as np
import pandas as pd
model_dir = Path(__file__).parent / "model_extras"
if model_dir.exists():
cat_features = pickle.load(open(model_dir / "cat_features.pkl", "rb"))
cont_features = pickle.load(open(model_dir / "cont_features.pkl", "rb"))
pipeline = joblib.load(model_dir / "model.joblib")
else:
raise ValueError("model files do not exist in desired format")
def preprocess_df(df: pd.DataFrame) -> pd.DataFrame:
"""Apply string preprocessing to categorical features."""
cat_df = df[cat_features].astype(str)
cont_df = df[cont_features]
return pd.concat([cat_df, cont_df], axis=1)
def predict_df(df: pd.DataFrame) -> np.ndarray:
"""Return predicted probabilities."""
df = preprocess_df(df)
# Index for binary classification
return pipeline.predict_proba(df)[:, 1]
[ ]:
# The following utilities help launch stress testing jobs
# and log test results
def get_model_file_contents() -> str:
"""Get the example `fraud_model.py` contents."""
with open('fraud_model.py', 'r') as f:
return f.read()
def upload_model_from_tarfile(
local_model_tar: str,
net_type: str,
client: Client,
) -> str:
"""Upload trained model to RIME cluster and return the resultant model directory path."""
upload_path = f"sagemaker_demo_{net_type}"
with TemporaryDirectory() as d:
_dir = Path(d) / net_type
extras_dir = _dir / "model_extras"
extras_dir.mkdir(parents=True)
with tarfile.open(local_model_tar, "r:gz") as tar:
tar.extractall(_dir)
for file in _dir.glob("**/*"):
if file.is_file():
file.rename(extras_dir / file.name)
shutil.copyfile("fraud_model.py", _dir / "model.py")
uploaded_path = client.upload_directory(str(_dir), upload_path=upload_path)
return uploaded_path + "/model.py"
def predict_from_tarfile(
local_model_tar: str,
df: pd.DataFrame
) -> np.ndarray:
"""Return predictions for model in the downloaded tarfile."""
with TemporaryDirectory() as d:
with tarfile.open(local_model_tar, "r:gz") as tar:
tar.extractall(d)
model_dir = Path(d) # / "model"
with open(model_dir / "cat_features.pkl", "rb") as f:
cat_features = pickle.load(f)
with open(model_dir / "cont_features.pkl", "rb") as f:
cont_features = pickle.load(f)
pipeline = joblib.load(model_dir / "model.joblib")
cat_df = df[cat_features].astype(str)
cont_df = df[cont_features]
preprocessed_df = pd.concat([cat_df, cont_df], axis=1)
return pipeline.predict_proba(preprocessed_df)[:, 1]
def upload_dataset_file(client: Client, df: pd.DataFrame, split: str) -> str:
"""Upload dataframe to RIME cluster."""
upload_path = f"sagemaker_walkthrough_{split}"
with TemporaryDirectory() as d:
f = Path(d) / "data.csv"
df.to_csv(f, index=False)
uploaded_name = client.upload_file(f.resolve(), upload_path=upload_path)
return uploaded_name
def add_preds_and_upload_dataset_file(
local_model_tar: str, net_type: str, df: pd.DataFrame, client: Client, pred_col: str, split: str
):
"""Make predictions and upload."""
df = df.copy()
df[pred_col] = predict_from_tarfile(local_model_tar, df)
dataset_split = f"{net_type}_{split}"
return upload_dataset_file(client, df, dataset_split)
def join_rime_stress_tests(
stress_test_job: Job
) -> dict:
"""Initiate a RIME test run."""
stress_test_job_run = stress_test_job.get_status(
verbose=True, wait_until_finish=True, poll_rate_sec=15
)
if stress_test_job_run["status"] == "JOB_STATUS_SUCCEEDED":
return stress_test_job_run
raise Exception(f"Stress test job run failing. {stress_test_job_run}")
def evaluate_model(net_type: str, local_model_tar: str, train: pd.DataFrame, test: pd.DataFrame) -> Job:
"""Train and Evaluate a Classifier."""
test_config = {
"run_name": f"{net_type} SageMaker Experiment",
"data_info": {
"label_col": LABEL_COL,
"pred_col": PRED_COL,
"ref_path": add_preds_and_upload_dataset_file(local_model_tar, net_type, train, client, PRED_COL, "ref"),
"eval_path": add_preds_and_upload_dataset_file(local_model_tar, net_type, test, client, PRED_COL, "eval")
},
"model_info": {
"path": upload_model_from_tarfile(local_model_tar, net_type, client)
},
"model_task": "Binary Classification"
}
return client.start_stress_test(
test_config, project_id=project.project_id
)
Train and Evaluate Modelsļ
After uploading the datasets to the testing cluster, train models and start stress test jobs to compare behavior.
[ ]:
# Launch Stress Test Jobs
for estimator_dict in estimators:
net_type, local_model_tar = estimator_dict['net_type'], estimator_dict['local_model_tar']
estimator_dict['stress_job'] = evaluate_model(net_type, local_model_tar, train=train, test=test)
# Wait for jobs to finish
for estimator_dict in tqdm(estimators):
estimator_dict["status"] = join_rime_stress_tests(estimator_dict["stress_job"])
Reviewing the Stress Testsļ
Stress tests are grouped into categories that measure various aspects of model robustness (model behavior, distribution drift, abnormal input, transformations, adversarial attacks, data cleanliness). Suggestions to improve your model are aggregated on the category level as well. Tests are ranked by default by a shared severity metric. Clicking on an individual test surfaces more detailed information.
You can view the detailed results in the UI by running the below cell and redirecting to the generated links. This page shows granular results for a given AI Stress Test run.
Additionally, you can compare the trained models by navigating to the project page and clicking the āCompareā button.
We have also added the RIME metrics to the MLFlow experiments. To view, click the āExperimentā icon at the top right of this notebook or navigate to the appropriate experiments section using the command bar on the left.
[ ]:
estimators[0]['stress_job'].get_test_run()
[ ]:
estimators[1]['stress_job'].get_test_run()
[ ]:
estimators[2]['stress_job'].get_test_run()
Deploy to Production and Create the AI Firewallļ
Suppose that we wish select the most āperformantā model based on the number of RIME tests it passes. We will select and deploy the associated model wrapped with the AI Firewall. The AI Firewall operates on both a datapoint and batch level. It automatically protects your model in real-time from ābadā incoming data and also alerts on statistically significant distributional drift.
In this scenario, the data scientist is short on time and decided to deploy the existing model to production. The data scientist also creates a firewall to monitor the model and data behavior. The AI Firewall is automatically configured based on the failures identified by AI Stress testing to protect the tested model in Production.
[ ]:
def select_model_with_highest_pass_rate(estimators: List[dict]) -> dict:
"""Select model naively using overall test pass rate."""
pass_rates = []
for run_result in estimators:
result_df: pd.DataFrame = run_result['stress_job'].get_test_run().get_result_df()
pass_rate = result_df['metrics.summary_counts.pass'] / result_df['metrics.summary_counts.total']
pass_rates.append(pass_rate)
return estimators[np.argmax(pass_rates)]
def deploy_model(model_package_arn: str, role: str, instance_type: str = 'ml.t2.medium'):
"""Deploy a model from the registry."""
model = ModelPackage(role=role,
model_package_arn=model_package_arn,
sagemaker_session=sagemaker.Session())
endpoint_name = "RIME-fraud-inference-pipeline-endpoint"
return model.deploy(initial_instance_count=1, instance_type=instance_type, endpoint_name=endpoint_name)
Next, the data scientist creates the RIME Firewall to connect with the production endpoint.
[ ]:
best_model_run = select_model_with_highest_pass_rate(estimators)
production_model = best_model_run["local_model_tar"]
test_run = best_model_run['stress_job'].get_test_run()
[ ]:
best_model_run = select_model_with_highest_pass_rate(estimators)
production_model = best_model_run["local_model_tar"]
test_run = best_model_run['stress_job'].get_test_run()
try:
firewall = project.create_firewall("SageMaker Firewall", bin_size="day", test_run_id=test_run.test_run_id)
except ValueError:
print(f"Updating to target model from stress test {test_run.test_run_id}")
firewall = project.get_firewall()
firewall.update_firewall_stress_test_run(test_run.test_run_id)
[ ]:
# Deploy the model
# SageMaker deployment is frequently too slow (>1.5 hrs)
# So we will continue to use the model files
do_deployment = False
if do_deployment:
role = get_execution_role()
instance_type = 'ml.t2.medium'
production_model = deploy_model(best_model_run['model_package_arn'], role, instance_type=instance_type)
## Uploading a Batch of Production Data & Model Predictions to Firewall
The fraud detection model has been in production for 30 days. Production data and model predictions have been collected and stored for the past 30 days. Now, we will use Firewall to track how the model performed across the last 30 days.
[ ]:
def run_firewall_on_bin(production_model_tar: str, df: pd.DataFrame, firewall: Firewall, group_split: str) -> Job:
"""Run the firewall on a bin of data."""
# A real deployment would already have predictions associated with the data, but we will
# make the predictions here for the sake of convenience
incremental_config = {
"eval_path": add_preds_and_upload_dataset_file(production_model_tar, "prod", df, client, PRED_COL, group_split),
"timestamp_col": "timestamp"
}
ct_job = firewall.start_continuous_test(test_run_config=incremental_config, disable_firewall_events=False)
ct_job.get_status(verbose=True, wait_until_finish=True, poll_rate_sec=15.0)
return ct_job
[ ]:
# Load production data
incremental_df = pd.read_csv(get_s3_path(path_dict["incremental"])).drop(columns=["preds"])
# Group by week to simulate running weekly batches
timestamps_dt = pd.DatetimeIndex(incremental_df[TIMESTAMP_COL])
for name, group in incremental_df[TIMESTAMP_COL].groupby(timestamps_dt.to_period("W-SUN").to_timestamp()):
group_split = name.split(' ')[0]
print(f"Running batch for week {name}")
batch_df = incremental_df.loc[group.index]
job = run_firewall_on_bin(production_model, batch_df, firewall, group_split)
Firewall CT Resultsļ
The AI Firewallās Continuous Tests operate at the batch level and provide a mechanism to monitor the health of ML deployments in production. They allow the user to understand when errors begin to occur and surface the underlying drivers of such errors.
You can explore the results in the UI by running the below cell and redirecting to the generated link.
[ ]:
# Clean up any and all deployments if performed
endpoint_prefix = "RIME-fraud-inference-"
sm_client = sagemaker_session.boto_session.client("sagemaker")
for endpoint in sm_client.list_endpoints()['Endpoints']:
if endpoint['EndpointName'].startswith(endpoint_prefix):
sm_client.delete_endpoint(EndpointName=endpoint['EndpointName'])
for endpoint_config in sm_client.list_endpoint_configs()['EndpointConfigs']:
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config['EndpointConfigName'])