Custom Integrations

Robust Intelligence provides a framework to load data from arbitrary sources. This is helpful for integrations Robust Intelligence doesn’t support natively.

Configuring Authentication for Custom Integrations

  1. Follow steps 1 through 5 of Adding an integration through the Robust Intelligence web UI

  2. From the Integration Type drop-down, select Custom.

  3. Type a name for the integration in Name.

  4. (Adding a Custom integration) Type the configuration information for the custom integration as key/value pairs and select a sensitivity for each pair. Sensitivity levels include:

    • Not sensitive

    • Workspace level

    • Members level

Using Custom Integrations

Robust Intelligence can load data from arbitrary sources defined in a Python file for use in a stress test or continuous test. Custom integrations may be used by the custom loader feature to provide authentication secrets as key/value pairs to the integration_dict parameter of the custom data loader function. See the example for your test type:

Stress test example with a custom data integration

In this example, we’ll create a custom data loader, custom_news_loader.py, that loads our reference and evaluation data sets. We’ll then use the Robust Intelligence SDK to create and register a data set that uses the custom data loader, and finally we’ll run a stress test with it.

Custom data loader example

The example custom data loader, custom_news_loader.py, specifies the data loading logic. This example parses data from an Amazon S3 bucket, but a custom data loader can be written to handle data from any source.

"""Data loader file for news file."""
import boto3
from datetime import datetime
import pandas as pd
​
BUCKET_NAME = "rime-datasets"
​
​
def get_news_data(start_time: int, end_time: int, integration_dict: dict) -> pd.DataFrame:
    start_time = datetime.fromtimestamp(start_time)
    end_time = datetime.fromtimestamp(end_time)
​
    master_df = pd.DataFrame()
    s3 = boto3.resource(
        's3',
        aws_access_key_id=integration_dict['ACCESS_KEY'],
        aws_secret_access_key=integration_dict['SECRET_ACCESS_KEY'],
    )
    my_bucket = s3.Bucket(BUCKET_NAME)
    for object_summary in my_bucket.objects.filter(Prefix="custom-loader-news/"):
        if ".csv" in object_summary.key:
            date_str = object_summary.key.split("/")[1].replace(".csv", "")
            date_str = date_str.replace("day_", "")
            file_time = datetime.strptime(date_str, "%Y-%m-%d")
            if start_time <= file_time <= end_time:
                obj = s3.Object(BUCKET_NAME, object_summary.key)
                curr_df = pd.read_csv(obj.get()["Body"])
                master_df = pd.concat([master_df, curr_df], ignore_index=True)
    return master_df

Get datasets for a stress test

Below, we introduce an example function, get_data_info_config() that calls the SDK function, upload_file, which in turn uses your custom data loader, get_news_data, to load the data into the Robust Intelligence cluster for testing.

For the connection_info object, you must specify:

  • path: the path to your loader file

  • load_func_name: the name of the data loading function that your custom data loader provides

You may optionally also specify:

  • loader_kwargs_json: Any parameters to be passed to the data loading function

  • data_endpoint_integration_id: UUID of the integration with secrets required by the data loading function

The DATA_LOADER_FILE argument here provides a path to your custom data loader script like, "s3://rime-datasets/custom-loader-news/custom_news_loader.py", and the MODEL_FILE argument here provides a path to your model.

uploaded_data_loader_path = client.upload_file(DATA_LOADER_FILE)
def get_data_info_config(start: int, end: int):
    loader_kwargs = f'{{"start_time": {start}, "end_time": {end}}}'
    data_info = {
        "connection_info": {
            "data_loading": {
                "path": uploaded_data_loader_path,
                "load_func_name": "get_news_data",
                "loader_kwargs_json": loader_kwargs,
                "data_endpoint_integration_id": "<inegration_uuid>",
            }
        },
        "data_params": {
            "timestamp_col":TIMESTAMP_COL,
        }
    }
    return data_info

Note! You will also use the client.upload_file SDK function to upload your model file.

Register the stress test artifacts

Use the get_data_info_config function to return data_info objects for the reference and evaluation data sets. We assume you will pass an integer start time value like 0 and an integer end time value like 1758608114. This example assumes you have uploaded your model at the uploaded_model_path referenced below.

# Define data/model info:
model_info = {"model_path": {"path": uploaded_model_path}}
ref_info = get_data_info_config(REF_START, REF_END)
eval_info = get_data_info_config(EVAL_START, EVAL_END)

# Register model & data:
model_id = project.register_model(str(uuid.uuid4()), model_config=model_info)
ref_data_id = project.register_dataset(str(uuid.uuid4()), ref_info)
eval_data_id = project.register_dataset(str(uuid.uuid4()), eval_info)

Configure the stress test

Now that your data has been loaded and registered, use the registered data and model IDs to configure a stress test.

stress_test_config = {
    "run_name": "news classification stress test",
    "data_info": {
        "ref_dataset_id": ref_data_id,
        "eval_dataset_id": eval_data_id,
    },
    "model_id": model_id,
}

Run the stress test

Call the SDK function, start_stress_test to run the test:

job = client.start_stress_test(test_run_config=stress_test_config)

Continuous test example with a custom data integration

Continuous tests rely on an established reference data source and do not need to specify one in the data dictionary. The contents of custom_news_loader.py do not change, but the custom data dictionary is different.

Get datasets for a continuous test

Create a function (this example creates one called get_data_info_config()) that calls the SDK function, upload_file, which in turn uses your custom data loader (get_news_data in this example) to load the data into the Robust Intelligence cluster for testing.

For the connection_info object, you must specify:

  • path: the path to your loader file

  • load_func_name: the name of the data loading function that your custom data loader provides

You may optionally also specify:

  • loader_kwargs_json: Any parameters to be passed to the data loading function

  • data_endpoint_integration_id: UUID of the integration with secrets required by the data loading function

The DATA_LOADER_FILE argument here provides a path to your custom data loader script like, "s3://rime-datasets/custom-loader-news/custom_news_loader.py".

uploaded_data_loader_path = client.upload_file(DATA_LOADER_FILE)
def get_data_info_config(start: int, end: int):
    loader_kwargs = f'{{"start_time": {start}, "end_time": {end}}}'
    data_info = {
        "connection_info": {
            "data_loading": {
                "path": uploaded_data_loader_path,
                "load_func_name": "get_news_data",
                "loader_kwargs_json": loader_kwargs,
                "data_endpoint_integration_id": "<inegration_uuid>",
            }
        },
        "data_params": {
            "timestamp_col":TIMESTAMP_COL,
        }
    }
    return data_info

Register the dataset for the continuous test

Use the get_data_info_config function to return a data_info object for the evaluation data set. Pass integer start and end time values.

# Define data info:
eval_info = get_data_info_config(EVAL_START, EVAL_END)

# Register data:
eval_data_id = project.register_dataset(str(uuid.uuid4()), eval_info)

Configure the continous test

Now that your data has been loaded and registered, use the registered data ID to configure a continuous test.

incremental_config = {
    "run_name": "news classification continuous test",
    "data_info": {
        "eval_dataset_id": eval_data_id,
    }
}