Prism
v0.3.0
v0.3.0
  • 👋Welcome to Prism!
  • Getting Started
    • Installation
    • Creating your first project
    • Why Prism?
  • Fundamentals
    • PrismProject API
      • PrismProject().run
      • PrismProject().graph
    • Tasks
    • Targets
      • Multiple targets
    • CurrentRun API
      • CurrentRun.ref()
      • CurrentRun.conn()
      • CurrentRun.ctx()
  • Connectors
    • Overview
    • BigQueryConnector
    • PostgresConnector
    • RedshiftConnector
    • SnowflakeConnector
    • TrinoConnector
    • PrestoConnector
  • CLI
    • Command Line Interface
    • graph
    • init
    • run
  • Advanced features
    • Concurrency
    • Logging
    • Callbacks
    • Retries
    • Skipping tasks
  • API Reference
    • prism.task.PrismTask
    • @task(...)
    • @target(...)
    • @target_iterator(...)
    • prism.target.PrismTarget
  • Use Cases
    • Analytics on top of dbt
    • Machine Learning
  • Wiki
    • DAGs
Powered by GitBook
On this page
  1. Fundamentals
  2. CurrentRun API

CurrentRun.conn()

CurrentRun.conn is used to access the connectors that are passed into the PrismProject's instantiation.

Here is the full method definition:

CurrentRun.conn(self, connector_id: str) -> Connector:
    """
    Get the connector object associated with ID `connector_id`. These are defined in
    the client's instantiation.

    args:
        connector_id: ID of task from which to retrieve output
    returns:
        connector object associated with `connector_id`
    raises:
        prism.exception.ConnectorDoesNotExistException if the connector ID is not
        found
    """

This is largely a utility method designed to make your life easier. You could just as easily define your connector within the task itself.

Example

Here's an example project entrypoint:

# example_project/main.py
import os
from pathlib import Path

from prism.client import PrismProject
from prism.connectors import SnowflakeConnector


snowflake_conn = SnowflakeConnector(
    id="snowflake-connector",
    account=os.getenv("account"),
    user=os.getenv("user"),
    password=os.getenv("password"),
    database=os.getenv("database"),
    schema=os.getenv("schema"),
    warehouse=os.getenv("warehouse"),
    role=os.getenv("role"),
)

project = PrismProject(
    tasks_dir=Path.cwd() / "tasks",
    connectors=[snowflake_conn]
)

if __name__ == "__main__":
    project.run()

Tasks within this project can call the SnowflakeConnector instance as follows:

# example_project/tasks/extract_from_snowflake.py

from prism.decorators import task
from prism.runtime import CurrentRun

@task()
def extract_from_snowflake():
    conn = CurrentRun.conn(connector_id="snowflake-connector")  # the same connector ID that was used for the Connector object
    df = conn.execute_sql(..., return_type="pandas")
    return df

Note that the connector_id argument value is same ID used when creating the SnowflakeConnector instance.

PreviousCurrentRun.ref()NextCurrentRun.ctx()

Last updated 1 year ago