Configuring your Scheduled Continuous Test
Configuration of schedules for Continuous Tests is done through the SDK or the UI. This abstraction allows Robust Intelligence to ‘pull’ data into the platform at a regular cadence.
See Scheduled CT How-To-Guide for instructions on how to activate and deactivate scheduling for Continuous Tests.
Arguments
data_info
: Dict, requiredThe data info of the evaluation data for scheduled CT.
data_info.data_params
must include atimestamp_col
for Continuous Testing.data_integration_id
: string or null, default =null
The integration id of the evaluation data for scheduled CT.
pred_integration_id
: string or null, default =null
The integration id of the evaluation prediction for scheduled CT.
pred_info
: Dict or null, default =null
The prediction info of the evaluation data for scheduled CT.
pred_info.pred_params
must include atimestamp_col
for Continuous Testing.rolling_window_duration
: datetime.timedelta or null, default =null
The length of the rolling window to use as a reference dataset in scheduled runs.
Templates for Data Location Types
Data Collector
Data Collector requires you to specify connection_info.data_collector.data_stream_id
in data_info
and pred_info
.
# Arguments for activating a CT
# Schedule with the Data Collector
firewall.activate_ct_scheduling(
data_info={
"connection_info": {
"data_collector": {
"data_stream_id": "<YOUR_DATA_STREAM_ID>",
},
},
"data_params": {
"label_col": "is_fraud",
"timestamp_col": "date",
},
},
data_integration_id="<YOUR_DATA_INTEGRATION_ID>",
pred_integration_id="<YOUR_PREDICTION_INTEGRATION_ID>",
pred_info={
"connection_info": {
"data_collector": {
"data_stream_id": "<YOUR_PREDICTION_STREAM_ID>",
},
},
"pred_params": {
"label_col": "is_fraud",
"timestamp_col": "date",
},
},
)
Databricks
Delta Lake on Databricks requires you to specify connection_info.databricks.table_name
in data_info
and pred_info
. See
the section on integrations to configure
Delta Lake.
from datetime import timedelta
firewall.activate_ct_scheduling(
data_info={
"connection_info": {
"databricks": {
"table_name": "hive_metastore.default.system_test_ref",
}
},
"data_params": {
"label_col": "is_fraud",
"timestamp_col": "date",
}
},
data_integration_id="<YOUR_DATA_INTEGRATION_ID>",
pred_integration_id="<YOUR_PREDICTION_INTEGRATION_ID>",
pred_info={
"connection_info": {
"databricks": {
"table_name": "hive_metastore.default.system_test_ref_pred",
}
},
"pred_params": {
"pred_col": "is_fraud_preds",
"timestamp_col": "date",
}
},
rolling_window_size=timedelta(hours=1),
)
Custom Loader
Our custom loader integration is designed to integrate with any location. Configuring the
custom loader requires you to specify data_info.connection_info.custom_location
in data_info
and pred_info
. The custom_location
field includes the
path to the loading script and the name of the function that does the loading. You can
specify additional arguments in loader_kwargs_json
. For
locations that require access credentials such as secrets or tokens, specify the value
of those secrets as environment variables. The load function must accept as
options the parameters start_time
, end_time
, which are integers that specify a
timestamp as a number of seconds from the UNIX epoch.
from datetime import timedelta
# Arguments for activating a CT
# Schedule with Custom Loader
firewall.activate_ct_scheduling(
data_info={
"connection_info": {
"custom_location": {
"path": "s3://bucket/path/to/loader.py",
"load_func_name": "custom_data_loader_func",
"loader_kwargs_json": "{\"some_param\": 5}",
},
},
"data_params": {
"label_col": "is_fraud",
"timestamp_col": "date",
}
},
data_integration_id="<YOUR_DATA_INTEGRATION_ID>",
pred_integration_id="<YOUR_PREDICTION_INTEGRATION_ID>",
pred_info={
"connection_info": {
"custom_location": {
"path": "s3://bucket/path/to/loader.py",
"load_func_name": "custom_prediction_loader_func",
"loader_kwargs_json": "{\"some_param\": 5}",
},
},
"pred_params": {
"pred_col": "is_fraud_preds",
"timestamp_col": "date",
}
},
rolling_window_size=timedelta(hours=1),
)
For scheduled CT, the loader function must accept as options the parameters start_time
, end_time
, which are Python datetime.datetime
objects.
These arguments are not required outside of scheduled CT.
Type hints must be specified for the loader function.
The return type must be either pd.DataFrame
or an Iterable[pd.DataFrame]
.
The loader function may use integration variables stored in the corresponding data_integration_id
/pred_integration_id
, and these are available as environment variables..
With the example above, the following could be valid contents of "s3://bucket/path/to/loader.py"
, where this example retrieves the data from a Delta Lake table all the data entries between the given start and end times.
"""Custom loader file with Delta Lake."""
import os
from datetime import datetime
import pandas as pd
from databricks import sql
def custom_data_loader_func(
start_time: datetime, end_time: datetime, some_param: int
) -> pd.DataFrame:
"""Load the data."""
connection = sql.connect(
server_hostname=os.environ["DATABRICKS_SERVER_HOSTNAME"],
http_path=os.environ["DATABRICKS_HTTP_PATH"],
access_token=os.environ["DATABRICKS_ACCESS_TOKEN"],
)
query = (
f"SELECT * FROM hivemetastore.default.my_table WHERE "
f"{start_time}>'timestamp' AND "
f"{end_time}<'timestamp'"
)
with connection.cursor() as cursor:
cursor.execute(query)
results = cursor.fetchmany_arrow(1000)
return results.to_pandas()
For data sources that require authentication other than Databricks, you may use our (custom integrations)[../../administration/configuring_workspaces/integrations/configuring_integrations.md#custom-integrations] feature to store the arbitrary credentials as environment variables.
Templates for Configuring Data Params
The dictionary below outlines all the keys you can specify for data params. If these are not provided, the defaults are taken from the reference dataset. For more info about the types see our General Tabular Parameters Section.
data_params = {
"label_col": "",
"pred_col": "",
"timestamp_col": "",
"class_names": [],
"ranking_info": {
"query_col": "",
"nqueries": None, # int or None (default). If None, all queries are used.
"nrows_per_query": None, # int or None (default). If None, all rows are used.
"drop_query_id": True,
},
"nrows": None, # int or None (default). If None, all rows are used.
"nrows_per_time_bin": None, # int or None (default). If None, all rows are used.
"sample": True,
"categorical_features": [],
"protected_features": [],
"features_not_in_model": [],
"text_features": [],
"image_features": [],
"intersections": {
"features": [],
},
"loading_kwargs": "",
"feature_type_path": "",
"pred_path": "",
"image_load_path": "",
}
Templates for Configuring Reference Sets
When specifying a reference dataset you can choose to use the default value or change the dataset to a rolling window.
To choose the default value, you don’t need to specify rolling_window_duration:
# Fill in the dictionaries
# with the appropriate keys as above
data_info_dict = {}
prediction_info_dict = {}
# General Arguments for Activating a Schedule
# if using a default reference set
firewall.activate_ct_scheduling(
data_info=data_info_dict,
data_integration_id="<YOUR_DATA_INTEGRATION_ID>",
pred_integration_id="<YOUR_PREDICTION_INTEGRATION_ID>",
pred_info=prediction_info_dict
)
To choose a rolling window:
from datetime import timedelta
# Fill in the dictionaries
# with the appropriate keys as above
data_info_dict = {}
prediction_info_dict= {}
rolling_window_size = timedelta(days=1)
# General Arguments for Activating a Schedule
# if specifying a reference set with a rolling window
firewall.activate_ct_scheduling(
data_info=data_info_dict,
data_integration_id="<YOUR_DATA_INTEGRATION_ID>",
pred_integration_id="<YOUR_PREDICTION_INTEGRATION_ID>",
pred_info=prediction_info_dict,
rolling_window_size=rolling_window_size,
)