CurrentRun.conn()

CurrentRun.conn is used to access the connectors that are passed into the PrismProject's instantiation.

Here is the full method definition:

CurrentRun.conn(self, connector_id: str) -> Connector:
    """
    Get the connector object associated with ID `connector_id`. These are defined in
    the client's instantiation.

    args:
        connector_id: ID of task from which to retrieve output
    returns:
        connector object associated with `connector_id`
    raises:
        prism.exception.ConnectorDoesNotExistException if the connector ID is not
        found
    """

This is largely a utility method designed to make your life easier. You could just as easily define your connector within the task itself.

Example

Here's an example project entrypoint:

# example_project/main.py
import os
from pathlib import Path

from prism.client import PrismProject
from prism.connectors import SnowflakeConnector


snowflake_conn = SnowflakeConnector(
    id="snowflake-connector",
    account=os.getenv("account"),
    user=os.getenv("user"),
    password=os.getenv("password"),
    database=os.getenv("database"),
    schema=os.getenv("schema"),
    warehouse=os.getenv("warehouse"),
    role=os.getenv("role"),
)

project = PrismProject(
    tasks_dir=Path.cwd() / "tasks",
    connectors=[snowflake_conn]
)

if __name__ == "__main__":
    project.run()

Tasks within this project can call the SnowflakeConnector instance as follows:

# example_project/tasks/extract_from_snowflake.py

from prism.decorators import task
from prism.runtime import CurrentRun

@task()
def extract_from_snowflake():
    conn = CurrentRun.conn(connector_id="snowflake-connector")  # the same connector ID that was used for the Connector object
    df = conn.execute_sql(..., return_type="pandas")
    return df

Note that the connector_id argument value is same ID used when creating the SnowflakeConnector instance.

Last updated