PrismProject().run

Overview

The run method is used to actually execute Prism projects. Here is the method definition for PrismProject.run():

PrismProject.run(
    run_id: Optional[str] = None,
    task_ids: Optional[List[str]] = None,
    runtime_ctx: Optional[Dict[str, Any]] = None,
    all_tasks_upstream: bool = False,
    all_tasks_downstream: bool = False,
    on_success: Optional[List[Union[str, Callable[[], Any]]]] = None,
    on_failure: Optional[List[Union[str, Callable[[], Any]]]] = None,
    full_refresh: bool = False,
    log_level: Literal["info", "warning", "error", "debug", "critical"] = "info",
    rich_logging: bool = True,
    log_file: Optional[Union[str, Path, StringIO]] = None,
)
ArgumentDescription

run_id

unique ID to give this job. If not specified, then Prism will create an ID for you. This job ID will be in the format {project ID}-{uuid}.

task_ids

list of task IDs to run. If not specified, then all tasks are run. Tasks are retrieved from the tasks_dir path specified in the PrismProject's instantiation.

runtime_ctx

variables to add to PrismProject's ctx. Note that variables defined in theruntime_ctx will overwrite like-named variables defined in the PrismProject's ctx.

all_tasks_upstream

boolean controlling whether to run all tasks upstream of those specified in task_ids. Default is False.

all_tasks_downstream

boolean controlling whether to run all tasks downstream of those specified in task_ids. Default is True.

on_success

list of callbacks to run when the job succeeds. These are run in addition to the callbacks specified in the project's instantiation.

on_failure

list of callbacks to run when the job fails. These are run in addition to the callbacks specified in the project's instantiation.

full_refresh

run all the tasks, regardless of whether or not they are already done.

log_level

logging level, one of info, warn, error, debug, or critical

rich_logging

beautify logs in the console with the rich package. Default is True

log_file

file in which to save the logs. If None, then Prism will default to a file within ~/.prism/logs/.

Examples

Basic example
from prism.client import PrismProject

project = PrismProject()

if __name__ == "__main__":
    project.run()

In this basic example, we define a PrismProject class with all the default arguments. In addition, we call the run method with all the default arguments. Prism will assume that tasks live in a directory called tasks that lives in the same folder as this module, and it will execute all of these tasks in a single-threaded when you execute this module.

Running specific tasks
from prism.client import PrismProject

project = PrismProject(
    concurrency=2
)

if __name__ == "__main__":
    project.run(
        task_ids=["example.Task01", "example.Task02"],
    )

In this basic example, we define a PrismProject class with mostly default arguments. Here, we set concurrency=2, which means that Prism will use two threads to execute the project tasks. Then, we call the run method on two task IDs — example.Task01, and example.Task02. Here's what those tasks could look like:

# example.py

from prism.task import PrismTask

class Task01(PrismTask):
    def run():
        return "Hello from task 01!"


class Task02(PrismTask):
    def run():
        return "Hello from task 02!"

Note that we don't explicitly define task IDs in these tasks. Prism automatically generates them using the module name (example) and the class names (Task01 and Task02).

Overriding the project's ctx
from pathlib import Path
from prism.client import PrismProject

project = PrismProject(
    ctx={
        "OUTPUT": Path.cwd() / "output"
    }
)

if __name__ == "__main__":
    project.run(
        runtime_ctx={
            "OUTPUT": Path("<some_other_path>")
        }
    )

We can access these context variables via CurrentRun.ctx, i.e.,

from prism.decorators import task, target
import prism.target


@task(
    targets=[
        target(
            type=prism.target.Txt,
            loc=CurrentRun.ctx("OUTPUT") / "example_task_output.txt",
        )
    ],
)
def some_example_task():
    # do some stuff here

Since we define OUTPUT in our runtime_ctx, this overrides the OUTPUT value defined in our PrismProject's context. So, our target get saved to <some_other_path>/example_task_output.txt.

Last updated