Prism
v0.3.0
v0.3.0
  • 👋Welcome to Prism!
  • Getting Started
    • Installation
    • Creating your first project
    • Why Prism?
  • Fundamentals
    • PrismProject API
      • PrismProject().run
      • PrismProject().graph
    • Tasks
    • Targets
      • Multiple targets
    • CurrentRun API
      • CurrentRun.ref()
      • CurrentRun.conn()
      • CurrentRun.ctx()
  • Connectors
    • Overview
    • BigQueryConnector
    • PostgresConnector
    • RedshiftConnector
    • SnowflakeConnector
    • TrinoConnector
    • PrestoConnector
  • CLI
    • Command Line Interface
    • graph
    • init
    • run
  • Advanced features
    • Concurrency
    • Logging
    • Callbacks
    • Retries
    • Skipping tasks
  • API Reference
    • prism.task.PrismTask
    • @task(...)
    • @target(...)
    • @target_iterator(...)
    • prism.target.PrismTarget
  • Use Cases
    • Analytics on top of dbt
    • Machine Learning
  • Wiki
    • DAGs
Powered by GitBook
On this page
  1. Fundamentals
  2. CurrentRun API

CurrentRun.ctx()

PreviousCurrentRun.conn()NextOverview

Last updated 1 year ago

CurrentRun.ctx() is used access context variables. These can be variables passed into the PrismProject's , or it can be variables passed into the at runtime.

Here is the full method definition:

CurrentRun.ctx(self, key: str, default_value: Optional[Any] = None) -> Any:
    """
    Get the value associated with context variable `key`. Context variables can be
    set in two places: when instantiated the PrismProject (with the `ctx` keyword
    argument) and when creating the run job (with the `runtime_ctx` keyword argument
    in the PrismProject's `create_run_job` method).

    args:
        key: variable to retrieve
        default_value: default value to return if `key` is not found.
            Default is `None`
    returns:
        value associated with context variable `key`
    """

Example

Here's an example project entrypoint:

# example_project/main.py
import os
from pathlib import Path

from prism.client import PrismProject
from prism.connectors import SnowflakeConnector


snowflake_conn = SnowflakeConnector(
    id="snowflake-connector",
    account=os.getenv("account"),
    user=os.getenv("user"),
    password=os.getenv("password"),
    database=os.getenv("database"),
    schema=os.getenv("schema"),
    warehouse=os.getenv("warehouse"),
    role=os.getenv("role"),
)

project = PrismProject(
    tasks_dir=Path.cwd() / "tasks",
    connectors=[snowflake_conn]
    ctx={
        "OUTPUT": Path.cwd() / "output"
    }
)

if __name__ == "__main__":
    project.run()

Tasks within this project can call the OUTPUT context variable as follows:

# example_project/tasks/extract_from_snowflake.py

from prism.decorators import task, target
from prism.runtime import CurrentRun
import prism.target

@task(
     targets=[
         target(type=prism.target.PandasCsv, loc=CurrentRun.ctx("OUTPUT") / 'snowflake_df.csv')   
     ]
)
def extract_from_snowflake():
    conn = CurrentRun.conn(connector_id="snowflake-connector")
    df = conn.execute_sql(..., return_type="pandas")
    return df
ctx
runtime_ctx