Custom Integrations

Robust Intelligence provides a framework to load data from arbitrary sources. This is helpful for integrations Robust Intelligence doesn’t support natively.

Configure Authentication for a Custom Integration

  1. Follow steps 1 through 5 of the section, Add an integration through the Robust Intelligence web UI.

  2. From the Integration Type drop-down, select Custom.

  3. Type a name for the integration in Name.

  4. Type the configuration information for the custom integration as key/value pairs and select a sensitivity for each pair. Sensitivity levels include:

    • Not sensitive

    • Workspace level

    • Members level

See Configuring Integrations for an explanation of sensitivity levels.

Note the ID (UUID) of your saved integration. You will need it when you load datasets for your tests.

Use a Custom Integration

Robust Intelligence can load data from arbitrary sources defined in a Python file for use in a stress test or continuous test. Custom integrations may be used by the custom loader feature to provide authentication secrets as key/value pairs to the integration_dict parameter of the custom data loader function. (See also the custom loader example, which is part of the Scheduled Continuous Testing example notebook.)

Below, find an example for your test type:

Stress test example with a custom data integration

In this example, we’ll create a custom data loader, custom_news_loader.py, that loads our reference and evaluation data sets. We’ll then use the Robust Intelligence SDK to create and register a data set that uses the custom data loader, and finally we’ll run a stress test with it.

Custom data loader example

The example custom data loader, custom_news_loader.py, specifies the data loading logic. This example parses data from an Amazon S3 bucket, but a custom data loader can be written to handle data from any source.

"""Data loader file for news file."""
import boto3
from datetime import datetime
import pandas as pd

BUCKET_NAME = "rime-datasets"

def get_news_data(
    start_time: datetime, # REQUIRED
    end_time: datetime, # REQUIRED
    integration_dict: dict,
) -> pd.DataFrame:
    start_time = datetime.fromtimestamp(start_time)
    end_time = datetime.fromtimestamp(end_time)

    master_df = pd.DataFrame()
    s3 = boto3.resource(
        's3',
        aws_access_key_id=integration_dict['ACCESS_KEY'],
        aws_secret_access_key=integration_dict['SECRET_ACCESS_KEY'],
    )
    my_bucket = s3.Bucket(BUCKET_NAME)
    for object_summary in my_bucket.objects.filter(Prefix="custom-loader-news/"):
        if ".csv" in object_summary.key:
            date_str = object_summary.key.split("/")[1].replace(".csv", "")
            date_str = date_str.replace("day_", "")
            file_time = datetime.strptime(date_str, "%Y-%m-%d")
            if start_time <= file_time <= end_time:
                obj = s3.Object(BUCKET_NAME, object_summary.key)
                curr_df = pd.read_csv(obj.get()["Body"])
                master_df = pd.concat([master_df, curr_df], ignore_index=True)
    return master_df

Get datasets for a stress test

Below, we introduce an example function, get_data_info_config() that calls the SDK function, upload_file, which in turn uses your custom data loader, get_news_data, to load the data into the Robust Intelligence cluster for testing.

For the connection_info object, you must specify:

  • path: the path to your loader file

  • load_func_name: the name of the data loading function that your custom data loader provides

You may optionally also specify:

  • loader_kwargs_json: Any parameters to be passed to the data loading function

  • data_endpoint_integration_id: UUID of the saved integration with secrets required by the data loading function. See Get the UUID of a saved integration.

The DATA_LOADER_FILE argument here provides a path to your custom data loader script like, "s3://rime-datasets/custom-loader-news/custom_news_loader.py", and the MODEL_FILE argument here provides a path to your model.

uploaded_data_loader_path = client.upload_file(DATA_LOADER_FILE)
def get_data_info_config(start: int, end: int):
    loader_kwargs = f'{{"start_time": {start}, "end_time": {end}}}'
    data_info = {
        "connection_info": {
            "data_loading": {
                "path": uploaded_data_loader_path,
                "load_func_name": "get_news_data",
                "loader_kwargs_json": loader_kwargs,
                "data_endpoint_integration_id": "<integration_uuid>",
            }
        },
        "data_params": {
            "timestamp_col":TIMESTAMP_COL,
        }
    }
    return data_info

Note! You will also use the client.upload_file SDK function to upload your model file.

Register the stress test artifacts

Use the get_data_info_config function to return data_info objects for the reference and evaluation data sets. We assume you will pass an integer start time value like 0 and an integer end time value like 1758608114. This example assumes you have uploaded your model at the uploaded_model_path referenced below.

# Define data/model info:
model_info = {"model_path": {"path": uploaded_model_path}}
ref_info = get_data_info_config(REF_START, REF_END)
eval_info = get_data_info_config(EVAL_START, EVAL_END)

# Register model & data:
model_id = project.register_model(str(uuid.uuid4()), model_config=model_info)
ref_data_id = project.register_dataset(str(uuid.uuid4()), ref_info)
eval_data_id = project.register_dataset(str(uuid.uuid4()), eval_info)

Configure the stress test

Now that your data has been loaded and registered, use the registered data and model IDs to configure a stress test.

stress_test_config = {
    "run_name": "news classification stress test",
    "data_info": {
        "ref_dataset_id": ref_data_id,
        "eval_dataset_id": eval_data_id,
    },
    "model_id": model_id,
}

Run the stress test

Call the SDK function, start_stress_test to run the test:

job = client.start_stress_test(test_run_config=stress_test_config)

Continuous test example with a custom data integration

Continuous tests rely on an established reference data source and do not need to specify one in the data dictionary. The contents of custom_news_loader.py do not change, but the custom data dictionary is different.

Get datasets for a continuous test

Create a function (this example creates one called get_data_info_config()) that calls the SDK function, upload_file, which in turn uses your custom data loader (get_news_data in this example) to load the data into the Robust Intelligence cluster for testing.

For the connection_info object, you must specify:

  • path: the path to your loader file

  • load_func_name: the name of the data loading function that your custom data loader provides

You may optionally also specify:

  • loader_kwargs_json: Any parameters to be passed to the data loading function

  • data_endpoint_integration_id: UUID of the saved integration with secrets required by the data loading function. See Get the UUID of a saved integration.

The DATA_LOADER_FILE argument here provides a path to your custom data loader script like, "s3://rime-datasets/custom-loader-news/custom_news_loader.py".

uploaded_data_loader_path = client.upload_file(DATA_LOADER_FILE)
def get_data_info_config(start: int, end: int):
    loader_kwargs = f'{{"start_time": {start}, "end_time": {end}}}'
    data_info = {
        "connection_info": {
            "data_loading": {
                "path": uploaded_data_loader_path,
                "load_func_name": "get_news_data",
                "loader_kwargs_json": loader_kwargs,
                "data_endpoint_integration_id": "<integration_uuid>",
            }
        },
        "data_params": {
            "timestamp_col":TIMESTAMP_COL,
        }
    }
    return data_info

Register the dataset for the continuous test

Use the get_data_info_config function to return a data_info object for the evaluation data set. Pass integer start and end time values.

# Define data info:
eval_info = get_data_info_config(EVAL_START, EVAL_END)

# Register data:
eval_data_id = project.register_dataset(str(uuid.uuid4()), eval_info)

Configure the continous test

Now that your data has been loaded and registered, use the registered data ID to configure a continuous test.

incremental_config = {
    "run_name": "news classification continuous test",
    "data_info": {
        "eval_dataset_id": eval_data_id,
    }
}