PrismProject API

The PrismProject class is the entrypoint into all Prism projects.

Overview

Starting in v0.3.0, users can use the PrismProject class to instantiate, manage, and run their Prism projects. It lives in the prism.client module. Here's the class definition for a PrismProject:

from prism.client import PrismProject

project = PrismProject(
    id: str = "",
    name: str = "",
    version: str = "",
    connectors: Optional[List[Union[str, Connector]]] = None,
    concurrency: int = 1,
    tasks_dir: Union[str, Path] = Path.cwd() / "tasks",
    package_lookups: Optional[List[Union[Path, str]]] = None,
    on_success: Optional[List[Union[str, Callable[[], Any]]]] = None,
    on_failure: Optional[List[Union[str, Callable[[], Any]]]] = None,
    ctx: Optional[Dict[str, Any]] = None,
)

Let's go over each of these arguments:

Most of these arguments are fairly self-explanatory. However, it's worth spending a bit of extra time on package_lookups.

Accessing other modules via package_lookups

package_lookups allows you to modify your sys.path at runtime and access modules and functions in other directories. Note that sys.path is only temporarily modified during the course of your project run. When your project run finishes, extraneous items are deleted from your sys.path.

This usually isn't necessary if you're running an isolated project. However, this changes if you have an entire repository of Prism projects and are collaborating with other folks. Common functionality (e.g., helper or utility functions) used across projects may live outside any one project, so you'll need to ensure your project has access to those modules via package_lookups.

For example, suppose your project directory looks like this:

all_prism_projects
├── project1
    ├── main.py
    ├── utils1.py
    └── tasks
        ├── task1.py
        └── ...
├── project2
    ├── main.py
    ├── utils2.py
    └── tasks
        ├── task1.py
        └── ...
└── common
    ├── utils.py
    └── helpers.py

By default, each project's task directory and associated parent is added to package_lookups. That means that project1 can access utils1.py and project2 can access utils2.py. However, the parent directory (i.e., all_prism_projects/) is not added to the project's sys.path by default, so the modules in common are not accessible to either project.

Let's see how we can remedy this with package_lookups:

# all_prism_projects/project1/main.py

from pathlib import Path
from prism.client import PrismProject

# Directory
PROJECT1_DIR = Path.cwd()

# Project
project = PrismProject(
    name="project1",
    tasks_dir=PROJECT1_DIR / "tasks",
    package_lookups=[
        PROJECT1_DIR.parent
    ]
)

if __name__ == "__main__":
    project.run()

Now, a task in project1 can import common/ at runtime:

# all_prism_projects/project1/tasks/example_task.py

import common.utils  # this is OK since all_prism_projects/ is in package_lookups
import prism.task

class ExampleTask(prism.task.PrismTask)
    ...

Important: we recommend specifying paths as relative paths rather than absolute paths (i.e., hard-coded string paths). This enables reproducibility across different machines.

Examples

Basic Example
from prism.client import PrismProject

project = PrismProject()

if __name__ == "__main__":
    project.run()

In this basic example, we define a PrismProject class with all the default arguments. Prism will assume that tasks live in a directory called tasks that lives in the same folder as this module, and it will execute these tasks in a single-threaded when you execute this module.

Project with on_success and on_failure callbacks
from prism.client import PrismProject

def print_success():
    print("Success!")
    

project = PrismProject(
    on_success=[print_success]
)

if __name__ == "__main__":
    project.run()

In this first example, we use define our callback function print_success in the same module as our PrismProject. If the project run succeeds, then Success! will be printed in the console output.

What if the connector doesn't live inside our project directory? Then, we can package_lookups and the connector import path instead. For example:

from prism.client import PrismProject


project = PrismProject(
    package_lookups=[
        Path.cwd().parent / "common",  # some path outside our working directory
    ]
    on_success=[
        "callbacks.<some callback function>"  # callbacks.py exists within `common/`
    ]
)

if __name__ == "__main__":
    project.run()
Project with connectors
from prism.client import PrismProject
from prism.connectors import SnowflakeConnector

snowflake = SnowflakeConnector(
    id="snowflake-connector",
    ...
)

project = PrismProject(
    connectors=[snowflake]
)

if __name__ == "__main__":
    project.run()

In this first example, we define our SnowflakeConnector in the same module as our PrismProject instance. Tasks can access this connector via CurrentRun.conn(id="snowflake-connector".

What if we want to share connectors across projects? We would ideally place these connectors in a separate module that can be imported by all relevant projects. Then, we can use package_lookups and the connectors' import path in each PrismProject instance:

from prism.client import PrismProject
from prism.connectors import SnowflakeConnector

project = PrismProject(
    package_lookups=[
        Path.cwd().parent / "connectors",  # some path outside our working directory
    ]
    connectors=[
        "snowflake_conn.SmallSnowflakeWH",  # `snowflake_conn.py` exists within `connectors/`
        "postgres_conn.HerokuProd",  # `postgres_conn.py` exists within `connectors/`
    ]
)

if __name__ == "__main__":
    project.run()
Project with ctx
from pathlib import Path
from prism.client import PrismProject

project = PrismProject(
    ctx={
        "OUTPUT": Path.cwd() / "output"
    }
)

if __name__ == "__main__":
    project.run()

We can access these context variables via CurrentRun.ctx, i.e.,

from prism.decorators import task, target
import prism.target


@task(
    targets=[
        target(
            type=prism.target.Txt,
            loc=CurrentRun.ctx("OUTPUT") / "some_example_task_output.txt",
        )
    ],
)
def some_example_task():
    # do some stuff here

Methods

In addition, the class has two methods: run and graph.

# Run
project.run(
    run_id: Optional[str] = None,
    task_ids: Optional[List[str]] = None,
    runtime_ctx: Optional[Dict[str, Any]] = None,
    all_tasks_upstream: bool = False,
    all_tasks_downstream: bool = False,
    on_success: Optional[List[Union[str, Callable[[], Any]]]] = None,
    on_failure: Optional[List[Union[str, Callable[[], Any]]]] = None,
    full_refresh: bool = False,
    log_level: Literal["info", "warning", "error", "debug", "critical"] = "info",
    rich_logging: bool = True,
    log_file: Optional[Union[str, Path, StringIO]] = None,
)

# Visualize
project.graph(
    port: int = 8000,
    open_window: bool = True,
    hot_reload: bool = True,
    log_level: Literal["info", "warning", "error", "debug", "critical"] = "info",
)

We'll go over each of these arguments next.

Last updated