Configuring your Scheduled Continuous Test

Configuration of schedules for Continuous Tests is done through the SDK or the UI. This abstraction allows Robust Intelligence to ‘pull’ data into the platform at a regular cadence.

See Scheduled CT How-To-Guide for instructions on how to activate and deactivate scheduling for Continuous Tests.

Arguments

  • data_info: Dict, required

    The data info of the evaluation data for scheduled CT. data_info.data_params must include a timestamp_col for Continuous Testing.

  • data_integration_id: string or null, default = null

    The integration id of the evaluation data for scheduled CT.

  • pred_integration_id: string or null, default = null

    The integration id of the evaluation prediction for scheduled CT.

  • pred_info: Dict or null, default = null

    The prediction info of the evaluation data for scheduled CT. pred_info.pred_params must include a timestamp_col for Continuous Testing.

  • rolling_window_duration: datetime.timedelta or null, default = null

    The length of the rolling window to use as a reference dataset in scheduled runs.

Templates for Data Location Types

Data Collector

Data Collector requires you to specify connection_info.data_collector.data_stream_id in data_info and pred_info.

# Arguments for activating a CT
# Schedule with the Data Collector
firewall.activate_ct_scheduling(
    data_info={
        "connection_info": {
            "data_collector": {
                "data_stream_id": "<YOUR_DATA_STREAM_ID>",
            },
        },
        "data_params": {
            "label_col": "is_fraud",
            "timestamp_col": "date",
        },
    },
    data_integration_id="<YOUR_DATA_INTEGRATION_ID>",
    pred_integration_id="<YOUR_PREDICTION_INTEGRATION_ID>",
    pred_info={
        "connection_info": {
            "data_collector": {
                "data_stream_id": "<YOUR_PREDICTION_STREAM_ID>",
            },
        },
        "pred_params": {
            "label_col": "is_fraud",
            "timestamp_col": "date",
        },
    },
)

Databricks

Delta Lake on Databricks requires you to specify connection_info.databricks.table_name in data_info and pred_info. See the section on integrations to configure Delta Lake.

from datetime import timedelta
firewall.activate_ct_scheduling(
    data_info={
        "connection_info": {
            "databricks": {
                "table_name": "hive_metastore.default.system_test_ref",
            }
        },
        "data_params": {
            "label_col": "is_fraud",
            "timestamp_col": "date",
        }
    },
    data_integration_id="<YOUR_DATA_INTEGRATION_ID>",
    pred_integration_id="<YOUR_PREDICTION_INTEGRATION_ID>",
    pred_info={
        "connection_info": {
            "databricks": {
                "table_name": "hive_metastore.default.system_test_ref_pred",
            }
        },
        "pred_params": {
            "pred_col": "is_fraud_preds",
            "timestamp_col": "date",
        }
    },
    rolling_window_size=timedelta(hours=1),
)

Custom Loader

Our custom loader integration is designed to integrate with any location. Configuring the custom loader requires you to specify data_info.connection_info.custom_location in data_info and pred_info. The custom_location field includes the path to the loading script and the name of the function that does the loading. You can specify additional arguments in loader_kwargs_json. For locations that require access credentials such as secrets or tokens, specify the value of those secrets as environment variables. The load function must accept as options the parameters start_time, end_time, which are integers that specify a timestamp as a number of seconds from the UNIX epoch.

from datetime import timedelta

# Arguments for activating a CT
# Schedule with Custom Loader
firewall.activate_ct_scheduling(
    data_info={
        "connection_info": {
            "custom_location": {
                "path": "s3://bucket/path/to/loader.py",
                "load_func_name": "custom_data_loader_func",
                "loader_kwargs_json": "{\"some_param\": 5}",
            },
        },
        "data_params": {
            "label_col": "is_fraud",
            "timestamp_col": "date",
        }
    },
    data_integration_id="<YOUR_DATA_INTEGRATION_ID>",
    pred_integration_id="<YOUR_PREDICTION_INTEGRATION_ID>",
    pred_info={
        "connection_info": {
            "custom_location": {
                "path": "s3://bucket/path/to/loader.py",
                "load_func_name": "custom_prediction_loader_func",
                "loader_kwargs_json": "{\"some_param\": 5}",
            },
        },
        "pred_params": {
            "pred_col": "is_fraud_preds",
            "timestamp_col": "date",
        }
    },
    rolling_window_size=timedelta(hours=1),
)

For scheduled CT, the loader function must accept as options the parameters start_time, end_time, which are Python datetime.datetime objects. These arguments are not required outside of scheduled CT. Type hints must be specified for the loader function. The return type must be either pd.DataFrame or an Iterable[pd.DataFrame]. The loader function may use integration variables stored in the corresponding data_integration_id/pred_integration_id, and these are available as environment variables.. With the example above, the following could be valid contents of "s3://bucket/path/to/loader.py", where this example retrieves the data from a Delta Lake table all the data entries between the given start and end times.

"""Custom loader file with Delta Lake."""
import os
from datetime import datetime

import pandas as pd
from databricks import sql


def custom_data_loader_func(
    start_time: datetime, end_time: datetime, some_param: int
) -> pd.DataFrame:
  """Load the data."""
  connection = sql.connect(
    server_hostname=os.environ["DATABRICKS_SERVER_HOSTNAME"],
    http_path=os.environ["DATABRICKS_HTTP_PATH"],
    access_token=os.environ["DATABRICKS_ACCESS_TOKEN"],
  )
  query = (
    f"SELECT * FROM hivemetastore.default.my_table WHERE "
    f"{start_time}>'timestamp' AND "
    f"{end_time}<'timestamp'"
  )
  with connection.cursor() as cursor:
    cursor.execute(query)
    results = cursor.fetchmany_arrow(1000)
    return results.to_pandas()

For data sources that require authentication other than Databricks, you may use our (custom integrations)[../../administration/configuring_workspaces/integrations/configuring_integrations.md#custom-integrations] feature to store the arbitrary credentials as environment variables.

Templates for Configuring Data Params

The dictionary below outlines all the keys you can specify for data params. If these are not provided, the defaults are taken from the reference dataset. For more info about the types see our General Tabular Parameters Section.

data_params = {
    "label_col": "",
    "pred_col": "",
    "timestamp_col": "",
    "class_names": [],
    "ranking_info": {
        "query_col": "",
        "nqueries": None,  # int or None (default). If None, all queries are used.
        "nrows_per_query": None,  # int or None (default). If None, all rows are used.
        "drop_query_id": True,
    },
    "nrows": None,  # int or None (default). If None, all rows are used.
    "nrows_per_time_bin": None,  # int or None (default). If None, all rows are used.
    "sample": True,
    "categorical_features": [],
    "protected_features": [],
    "features_not_in_model": [],
    "text_features": [],
    "image_features": [],
    "intersections": {
        "features": [],
    },
    "loading_kwargs": "",
    "feature_type_path": "",
    "pred_path": "",
    "image_load_path": "",
}

Templates for Configuring Reference Sets

When specifying a reference dataset you can choose to use the default value or change the dataset to a rolling window.

To choose the default value, you don’t need to specify rolling_window_duration:

# Fill in the dictionaries
# with the appropriate keys as above
data_info_dict = {}
prediction_info_dict = {}

# General Arguments for Activating a Schedule
# if using a default reference set
firewall.activate_ct_scheduling(
    data_info=data_info_dict,
    data_integration_id="<YOUR_DATA_INTEGRATION_ID>",
    pred_integration_id="<YOUR_PREDICTION_INTEGRATION_ID>",
    pred_info=prediction_info_dict
)

To choose a rolling window:

from datetime import timedelta
# Fill in the dictionaries
# with the appropriate keys as above
data_info_dict = {}
prediction_info_dict= {}

rolling_window_size = timedelta(days=1)
# General Arguments for Activating a Schedule
# if specifying a reference set with a rolling window
firewall.activate_ct_scheduling(
    data_info=data_info_dict,
    data_integration_id="<YOUR_DATA_INTEGRATION_ID>",
    pred_integration_id="<YOUR_PREDICTION_INTEGRATION_ID>",
    pred_info=prediction_info_dict,
    rolling_window_size=rolling_window_size,
)