Prism
v0.3.0
v0.3.0
  • 👋Welcome to Prism!
  • Getting Started
    • Installation
    • Creating your first project
    • Why Prism?
  • Fundamentals
    • PrismProject API
      • PrismProject().run
      • PrismProject().graph
    • Tasks
    • Targets
      • Multiple targets
    • CurrentRun API
      • CurrentRun.ref()
      • CurrentRun.conn()
      • CurrentRun.ctx()
  • Connectors
    • Overview
    • BigQueryConnector
    • PostgresConnector
    • RedshiftConnector
    • SnowflakeConnector
    • TrinoConnector
    • PrestoConnector
  • CLI
    • Command Line Interface
    • graph
    • init
    • run
  • Advanced features
    • Concurrency
    • Logging
    • Callbacks
    • Retries
    • Skipping tasks
  • API Reference
    • prism.task.PrismTask
    • @task(...)
    • @target(...)
    • @target_iterator(...)
    • prism.target.PrismTarget
  • Use Cases
    • Analytics on top of dbt
    • Machine Learning
  • Wiki
    • DAGs
Powered by GitBook
On this page
  • What are tasks?
  • Class-based tasks
  • Function-based tasks
  • Task IDs
  1. Fundamentals

Tasks

In its most basic form, any data pipeline can be thought of as a series of discrete steps that run in some sort of sequence. For example, ETL pipelines generally have three steps: extract -> transform -> load.

Prism projects are no different. A Prism project is composed of a set of tasks, and these tasks contain the brunt of the project's core logic.

What are tasks?

In Prism, tasks can be either classes or functions. Here what they look like:

# tasks/hello_world.py

import prism.task
import prism.target

class HelloWorld(prism.task.PrismTask):
        
    def run(self):
        test_str = "Hello, world!"
        return test_str
# tasks/hello_world.py

from prism.decorators import task

@task()
def hello_world():
    test_str = "Hello, world!"
    return test_str

We'll go into the technical details of both next.

Class-based tasks

Tasks are classes that inherit an abstract class called PrismTask. There are two requirements to which all tasks must adhere:

Each task must have method called run. This method must adhere to three requirements:

  1. It should not use contain any arguments

  2. It should encapsulate all the business logic for the task

  3. It should return a non-null output.

Important: the output of a task's run function is what's used by downstream tasks in your pipeline. The return value can be anything – a Pandas or Spark DataFrame, a Numpy array, a string, a dictionary, whatever – but it cannot be null. Prism will throw an error if it is.

Apart from these two conditions, feel free to structure and define your tasks however you'd like, i.e., add other class methods, class attributes, etc:

# tasks/hello_world.py

from prism.task import PrismTask

class HelloWorld(PrismTask):

    def some_other_function(*args, **kwargs):
        # do something
        
    def run(self):
        test_str = "Hello, world!"
        _ = some_other_function()
        return test_str

As you can see, our HelloWorld task is lives in the tasks directory. It inherits the PrismTask class, and it contains a run function that returns a non-null string.

Critical: The run function has two mandatory parameters: tasks, and hooks. Both are critical, and Prism will throw an error if it finds a run function without these two parameters.

And that's it! Create a class that inherits the PrismTask class and implement the run method. Prism will take care of the rest.

Good to know: Although user-defined tasks can be arbitrarily long or complex, it is helpful to think of them as discrete steps or objectives in your pipeline. For example, if you are creating an ETL pipeline, then you may want to split your code into three tasks: an extract task, a transform task, and a load task.

Function-based tasks

You can also define tasks using functions rather than entire classes. There's no real difference between a function-based task and a class-based task — we created the feature so that you could work with what you're most comfortable with.

In order for a function to be a task, it must be decorated with the prism.decorators.task function. Similar to a class-based task, functions that are tasks do not accept any arguments and must return a non-null value.

Let's take a look at our original example:

# tasks/hello_world.py

from prism.decorators import task

@task()
def hello_world():
    test_str = "Hello, world!"
    return test_str 

Task IDs

Every task in a Prism project must be associated with a unique ID. This ID is then referenced by downstream tasks (via CurrentRun.ref(...)) to grab the task's output.

User's can specify their own task ID when creating a task:

When using a class-based task, you can specify a custom task ID using the task_id class attribute.

# tasks/hello_world.py

from prism.task import PrismTask

class HelloWorld(PrismTask):
    task_id = "hello-world-task-cls" 
        
    def run(self):
        test_str = "Hello, world!"
        _ = some_other_function()
        return test_str

When using a class-based task, you can specify a custom task ID using the task_id keyword argument in the @task decorator:

# tasks/hello_world.py

from prism.decorators import task

@task(
    task_id="hello-world-task-fn"    
)
def hello_world():
    test_str = "Hello, world!"
    return test_str 

If you don't specify a custom task ID, then Prism automatically creates one for you. The format of this task ID will be <module_name>.<function or class name>. For example:

# tasks/hello_world.py

from prism.task import PrismTask

class HelloWorld(PrismTask):
    task_id = "hello-world-task-cls" 
        
    def run(self):
        test_str = "Hello, world!"
        _ = some_other_function()
        return test_str

The auto-generated ID for this task will be hello_world.HelloWorld.

# tasks/hello_world.py

from prism.decorators import task

@task()
def hello_world():
    test_str = "Hello, world!"
    return test_str 

The auto-generated ID for this task will be hello_world.hello_world.

Important: for readability purposes, we recommend always setting task IDs in your classes or functions.

PreviousPrismProject().graphNextTargets

Last updated 1 year ago

For additional information, consult the .

The technical specifications for the @task decorator can be found in the .

API reference
API reference