πŸ‘‹Welcome to Prism!

These docs current for version v0.3.0.

Prism is the easiest way to create data pipelines in Python. With it, users can break down their data flows into modular tasks, manage dependencies, and execute complex computations in sequence.

CHANGELOG

There are significant differences between this version previous version (v0.2.8). These include:

✨ Enhancements

PrismProject entrypoint

  • In previous versions, users managed their project with a prism_project.py file and optional profile.yaml and triggers.yaml files. The entrypoint to a Prism project was the CLI β€” users had to run prism run to actually execute their project.

  • Starting in v0.3.0, the PrismProject class is the recommended entrypoint to a Prism project. This class has the following benefits:

    • Programatic access to Prism projects β€” i.e., Prism projects can be instantiated and run via standard Python

    • The PrismProject class provides more fine-grained control over running Prism projects.

    • Projects as code, not as YAML. Instead of dealing with multiple files, Prism projects can be written as code that lives in a single file.

  • The PrismProject class has two methods: run and graph. As the names suggest, run is used to execute Prism projects, and graph is used to launch the Prism Visualizer UI.

  • Here's how one uses the PrismProject in action:

    project = PrismProject(
        version="1.0",
        tasks_dir=Path.cwd() / "tasks",
        concurrency=2,
        ctx={
            "OUTPUT":  Path.cwd() / "output"
        },
    )
    
    if __name__ == "__main__":
        project.run()

CurrentRun context object

  • Rather than tasks and refs, object stores information about the current project run.

  • Specific methods include:

    CurrentRun.ctx(key: str, default_value: Any) -> Any    # for grabbing variables from the PrismProject's base context or runtime context
    CurrentRun.conn(self, connector_id: str) -> Connector    # for grabbing a connector class defined in the PrismProject's instantiation
    CurrentRun.ref(self, task_id: str) -> Any    # for grabbing the output of a task
  • Here's how to use it in a task:

    # example_task.py
    
    from prism.decorators import task
    from prism.runtime import CurrentRun
    
    @task(id="example-task-id")
    def example_task():
        other_output = CurrentRun.ref(task_id="other_task_id")
        ....
  • This is a slightly cleaner user experience (one, unified context object instead of two), and it enables users to take advantage of autocomplete functionality in their IDE.

Connectors

  • Instead of defining adapters in profile.yaml, connectors are defined in Python as instances of the Connector class.

  • There are five Connector subclasses:

    • BigQueryConnector

    • PostgresConnector

    • PrestoConnector

    • RedshiftConnector

    • SnowflakeConnector

    • TrinoConnector

  • PrismProject accepts a list of Connector objects via the connectors keyword argument, i.e.,

    snowflake = SnowflakeConnector(
        id="snowflake-connector",
        ...
    )
    
    project = PrismProject(
        ...,
        connectors=[snowflake],
        ...,
    )
  • These connector objects can be accessed via CurrentRun, i.e.,

    conn = CurrentRun.conn(connector_id="snowflake-connector")
    conn.execute_sql(...)
  • Callbacks

    • Instead of defining custom code to run after a project has succeeded via the triggers.yaml file, users can specify custom code as Python functions.

    • PrismProject accepts a list of functions via the on_success and on_failure keyword arguments

    • Callback functions should not accept any arguments, e.g.,

      def print_success():
          print("Success!")
      
      project = PrismProject(
          ...,
          on_success=[print_success],
          ...,
      )
πŸ› οΈ Other improvements
  • Prism uses rich to create beautiful logs for the user. This includes logs for events, tasks, and exceptions.

  • Updated triggers naming convention to callbacks. See above for more details.

  • Updated adapters naming convention to connectors. See above for more details.

Why use Prism?

Prism was built to streamline the development and deployment of complex data pipelines. Here are some of its main features:

  • Real-time dependency declaration: With Prism, users can declare dependencies using a simple function call. No need to explicitly keep track of the pipeline order β€” at runtime, Prism automatically parses the function calls and builds the dependency graph.

  • Intuitive logging: Prism automatically logs events for parsing the configuration files, compiling the tasks and creating the project, and executing the tasks. No configuration is required.

  • Flexible CLI: Users can instantiate, run, and visualize projects using a simple, but powerful command-line interface.

  • β€œBatteries included”: Prism comes with all the essentials needed to get up and running quickly. Users can create and run their first project in less than 2 minutes.

  • Integrations: Prism integrates with several tools that are popular in the data community, including Snowflake, Google BigQuery, Redshift, Trino, and Presto. We're adding more integrations every day, so let us know what you'd like to see!

What is a Prism project?

The PrismProject class is the entrypoint into all Prism projects. This class allows for fine-grained control of project runs. In order to run a Prism project, two things are needed:

  1. A Python module instantiating the PrismProject class.

  2. A directory containing tasks to run. This should be supplied to the PrismProject instance via the tasks_dir keyword argument.

Note: both of the components listed above are supplied in the default project that is created via prism init!

Here's a simple example for what this could look like:

new_project/
β”œβ”€β”€ output/
β”œβ”€β”€ tasks/
    β”œβ”€β”€ extract.py
    β”œβ”€β”€ transform.py
    └── load.py
β”œβ”€β”€ main.py
# new_project/main.py

from pathlib import Path
from prism.client import PrismProject

# Project
project = PrismProject(tasks_dir=Path.cwd() / "tasks")

# Run
if __name__ == "__main__":
    project.run()

Guides: Jump right in

Follow our handy guides to get started on the basics as quickly as possible:

If you have any feedback about the product or the docs, please let us know!

Last updated