PrismProject API

The PrismProject class is the entrypoint into all Prism projects.

Overview

Starting in v0.3.0, users can use the PrismProject class to instantiate, manage, and run their Prism projects. It lives in the prism.client module. Here's the class definition for a PrismProject:

from prism.client import PrismProject

project = PrismProject(
    id: str = "",
    name: str = "",
    version: str = "",
    connectors: Optional[List[Union[str, Connector]]] = None,
    concurrency: int = 1,
    tasks_dir: Union[str, Path] = Path.cwd() / "tasks",
    package_lookups: Optional[List[Union[Path, str]]] = None,
    on_success: Optional[List[Union[str, Callable[[], Any]]]] = None,
    on_failure: Optional[List[Union[str, Callable[[], Any]]]] = None,
    ctx: Optional[Dict[str, Any]] = None,
)

Let's go over each of these arguments:

ArgumentDescription

id

unique ID to give this project. If not specified, then Prism will create an ID for you. This job ID will be in the format {project dir}-{version}.

name

human-readable name to give this project. If not specified, then all Prism will create a name for you. This name will be the name of the project directory (i.e., the directory in which the PrismProject lives).

version

project version. Defaults to 1.0.

connectors

list of connectors to use in your project. These can be accessed at runtime CurrentRun.conn(...). Using connectors prevents you from having to define your connection class in each module. Connectors should either be specified as a Connector object or as a string representing the import path to the Connector object.

concurrency

number of threads to use when running tasks. Default is 1 (i.e., single-threaded).

tasks_dir

directory containing tasks. Default is the tasks folder in the current directory.

package_lookups

additional directories / modules to look within when importing modules and functions in your code. The tasks_dir and its parent are automatically added to this list.

on_success

list of callbacks to run when the job succeeds. Callbacks should be functions that do not accept any argument. Callbacks should either be specified as a function or as a string representing the import path to the function.

on_failure

list of callbacks to run when the job fails. Callbacks should be functions that do not accept any argument. Callbacks should either be specified as a function or as a string representing the import path to the function.

Most of these arguments are fairly self-explanatory. However, it's worth spending a bit of extra time on package_lookups.

Accessing other modules via package_lookups

package_lookups allows you to modify your sys.path at runtime and access modules and functions in other directories. Note that sys.path is only temporarily modified during the course of your project run. When your project run finishes, extraneous items are deleted from your sys.path.

This usually isn't necessary if you're running an isolated project. However, this changes if you have an entire repository of Prism projects and are collaborating with other folks. Common functionality (e.g., helper or utility functions) used across projects may live outside any one project, so you'll need to ensure your project has access to those modules via package_lookups.

For example, suppose your project directory looks like this:

all_prism_projects
├── project1
    ├── main.py
    ├── utils1.py
    └── tasks
        ├── task1.py
        └── ...
├── project2
    ├── main.py
    ├── utils2.py
    └── tasks
        ├── task1.py
        └── ...
└── common
    ├── utils.py
    └── helpers.py

By default, each project's task directory and associated parent is added to package_lookups. That means that project1 can access utils1.py and project2 can access utils2.py. However, the parent directory (i.e., all_prism_projects/) is not added to the project's sys.path by default, so the modules in common are not accessible to either project.

Let's see how we can remedy this with package_lookups:

# all_prism_projects/project1/main.py

from pathlib import Path
from prism.client import PrismProject

# Directory
PROJECT1_DIR = Path.cwd()

# Project
project = PrismProject(
    name="project1",
    tasks_dir=PROJECT1_DIR / "tasks",
    package_lookups=[
        PROJECT1_DIR.parent
    ]
)

if __name__ == "__main__":
    project.run()

Now, a task in project1 can import common/ at runtime:

# all_prism_projects/project1/tasks/example_task.py

import common.utils  # this is OK since all_prism_projects/ is in package_lookups
import prism.task

class ExampleTask(prism.task.PrismTask)
    ...

Important: we recommend specifying paths as relative paths rather than absolute paths (i.e., hard-coded string paths). This enables reproducibility across different machines.

Examples

Basic Example
from prism.client import PrismProject

project = PrismProject()

if __name__ == "__main__":
    project.run()

In this basic example, we define a PrismProject class with all the default arguments. Prism will assume that tasks live in a directory called tasks that lives in the same folder as this module, and it will execute these tasks in a single-threaded when you execute this module.

Project with on_success and on_failure callbacks
from prism.client import PrismProject

def print_success():
    print("Success!")
    

project = PrismProject(
    on_success=[print_success]
)

if __name__ == "__main__":
    project.run()

In this first example, we use define our callback function print_success in the same module as our PrismProject. If the project run succeeds, then Success! will be printed in the console output.

What if the connector doesn't live inside our project directory? Then, we can package_lookups and the connector import path instead. For example:

from prism.client import PrismProject


project = PrismProject(
    package_lookups=[
        Path.cwd().parent / "common",  # some path outside our working directory
    ]
    on_success=[
        "callbacks.<some callback function>"  # callbacks.py exists within `common/`
    ]
)

if __name__ == "__main__":
    project.run()
Project with connectors
from prism.client import PrismProject
from prism.connectors import SnowflakeConnector

snowflake = SnowflakeConnector(
    id="snowflake-connector",
    ...
)

project = PrismProject(
    connectors=[snowflake]
)

if __name__ == "__main__":
    project.run()

In this first example, we define our SnowflakeConnector in the same module as our PrismProject instance. Tasks can access this connector via CurrentRun.conn(id="snowflake-connector".

What if we want to share connectors across projects? We would ideally place these connectors in a separate module that can be imported by all relevant projects. Then, we can use package_lookups and the connectors' import path in each PrismProject instance:

from prism.client import PrismProject
from prism.connectors import SnowflakeConnector

project = PrismProject(
    package_lookups=[
        Path.cwd().parent / "connectors",  # some path outside our working directory
    ]
    connectors=[
        "snowflake_conn.SmallSnowflakeWH",  # `snowflake_conn.py` exists within `connectors/`
        "postgres_conn.HerokuProd",  # `postgres_conn.py` exists within `connectors/`
    ]
)

if __name__ == "__main__":
    project.run()
Project with ctx
from pathlib import Path
from prism.client import PrismProject

project = PrismProject(
    ctx={
        "OUTPUT": Path.cwd() / "output"
    }
)

if __name__ == "__main__":
    project.run()

We can access these context variables via CurrentRun.ctx, i.e.,

from prism.decorators import task, target
import prism.target


@task(
    targets=[
        target(
            type=prism.target.Txt,
            loc=CurrentRun.ctx("OUTPUT") / "some_example_task_output.txt",
        )
    ],
)
def some_example_task():
    # do some stuff here

Methods

In addition, the class has two methods: run and graph.

# Run
project.run(
    run_id: Optional[str] = None,
    task_ids: Optional[List[str]] = None,
    runtime_ctx: Optional[Dict[str, Any]] = None,
    all_tasks_upstream: bool = False,
    all_tasks_downstream: bool = False,
    on_success: Optional[List[Union[str, Callable[[], Any]]]] = None,
    on_failure: Optional[List[Union[str, Callable[[], Any]]]] = None,
    full_refresh: bool = False,
    log_level: Literal["info", "warning", "error", "debug", "critical"] = "info",
    rich_logging: bool = True,
    log_file: Optional[Union[str, Path, StringIO]] = None,
)

# Visualize
project.graph(
    port: int = 8000,
    open_window: bool = True,
    hot_reload: bool = True,
    log_level: Literal["info", "warning", "error", "debug", "critical"] = "info",
)

We'll go over each of these arguments next.

Last updated