
Abstract class used to define tasks in a Prism project.



class prism.task.PrismTask(bool_run: bool = True, func: Optional[Callable[..., Any]] = None)

  • Parameters

    • bool_run: a boolean indicating whether to run the task. Default is True.

    • func: optional callable that overwrites the run function. Default is None.

Warning: you will never need to worry about instantiating a PrismTask class and setting bool_run / func when working with your tasks. This is all handled on the backend.


PrismTask.done() -> bool

Check if this task is already done. If this task is already done, then the task will be skipped. If not, it will be executed.

Note that if the --full-refresh option is specified, then all tasks are run from scratch (even the ones that are done).

  • Outputs:

    • True if the task is already done. False otherwise. -> Any

Core logic for your task. This task must return a non-null output.

  • Outputs:

    • Any non-null output.

Attributes and underlying data


The ID to associate with the task. The default is <module_name>.<class name>.

# tasks/

import prism.task

class ExampleTask(prism.task.PrismTask):
    task_id = "dummy_example"  # if this were not specified, then the task ID would be example_task.ExampleTask
    def run(self):
        test_str = "Hello, world!"
        return test_str

Number of times to retry a task upon failure. Default is 0.

# tasks/

import prism.task

class ExampleTask(prism.task.PrismTask):
    retries = 1
    def run(self):
        test_str = "Hello, world!"
        return test_str

Number of seconds to wait in between task retries. Default is 0. Must be specified alongside retries.

# tasks/

import prism.task

class ExampleTask(prism.task.PrismTask):
    retries = 1
    retry_delay_seconds = 60
    def run(self):
        test_str = "Hello, world!"
        return test_str

Last updated