curia.infra.task module

Base class for creating tasks that can be run through the Curia API, with a backend of either a docker container or a databricks job.

class curia.infra.task.TaskInputDefinition(name: str, type: Literal['literal', 'dataset', 'model', 'modelJob'], optional: bool = False, description: str | None = None)

Bases: object

Base class for all airml-analytics input definitions, designed to interface with the AirML API

name: str
type: Literal['literal', 'dataset', 'model', 'modelJob']
optional: bool = False
description: str | None = None
describe() List[str]

Describe the input definition :return: The description, formatted as a list of strings, with each string representing a line

class curia.infra.task.TaskOutputDefinition(name: str, type: Literal['literal', 'dataset', 'model', 'modelJob'], description: str | None = None)

Bases: object

Base class for all airml-analytics output definitions, designed to interface with the AirML API

name: str
type: Literal['literal', 'dataset', 'model', 'modelJob']
description: str | None = None
describe() List[str]

Describe the output definition :return: The description, formatted as a list of strings, with each string representing a line

class curia.infra.task.TaskDefinition(name: str, task_slug: str, inputs: List[TaskInputDefinition], outputs: List[TaskOutputDefinition], description: str | None = None, _function: Callable | None = None, _module: str | None = None)

Bases: ABC

Base class for creating and managing tasks that can be run through the Curia API, with a backend of either a docker container or a databricks job.

name: str
task_slug: str
inputs: List[TaskInputDefinition]
outputs: List[TaskOutputDefinition]
description: str | None = None
task_type: ClassVar[TaskType] = 'Abstract'
classmethod get_curia_sdk(api_token: str, api_endpoint: str) Session

Get the Curia SDK using the provided API Key :param api_token: :param api_endpoint: :return:

verify_task_inputs_valid(task_inputs: Dict[str, Any]) None

Verify that the task inputs are valid by checking that all required inputs are present and that no unexpected inputs are present, based on the inputs defined for this task. :param task_inputs: The supplied task inputs to verify :return:

resolve_arguments(task_execution_id: str, api_token: str, api_endpoint: str) Dict[str, Any]

Utilize the Curia SDK to retrieve the arguments for the analytics task flow definition. :param task_execution_id: The task execution ID to retrieve the inputs for :param api_token: The Platform API token to use :param api_endpoint: The Platform API endpoint to use :return:

classmethod resolve_input(session: Session, task_input: TaskInputDefinition, task_execution_value: Any) Any

Resolve a single input, making additional API calls as necessary depending on the task.

Parameters:
  • session – The Curia SDK session

  • task_input – The task input definition

  • task_execution_value – The value of the task input from the Task Execution

Returns:

The resolved value

upload_results(task_execution_id: str, api_token: str, api_endpoint: str, results: Dict[str, Any]) None

Upload the results of the task’s execution to the Curia API and S3 :param task_execution_id: The task execution ID :param api_token: The API key to use to retrieve the task inputs from the Curia API :param api_endpoint: The API endpoint to use to retrieve the task inputs from the Curia API :param results: The results to upload :return: None

describe() List[str]

Describe the task flow definition :return: The description, formatted as a list of strings, with each string representing a line

property module: str

The module that the task flow definition is defined in :return:

property function_name: str

The name of the function that the task flow definition is defined in :return:

class Registry

Bases: object

register(task_definition: TaskDefinition) None
list() List[TaskDefinition]
abstract run(task_execution_id: str, api_token: str, api_endpoint: str, **kwargs) None

Run the analytics task flow definition :param task_execution_id: The task execution ID :param api_token: The API key to use to retrieve the task inputs from the Curia API :param api_endpoint: The API endpoint to use to retrieve the task inputs from the Curia API :param kwargs: Additional keyword arguments, specific to ContainerTaskDefinition or DatabricksTaskDefinition :return: The result

abstract build_task_type_specific_input_arguments(context: dict)