CurrentRun.conn is used to access the connectors that are passed into the PrismProject's instantiation.
Here is the full method definition:
CurrentRun.conn(self, connector_id: str) -> Connector:
"""
Get the connector object associated with ID `connector_id`. These are defined in
the client's instantiation.
args:
connector_id: ID of task from which to retrieve output
returns:
connector object associated with `connector_id`
raises:
prism.exception.ConnectorDoesNotExistException if the connector ID is not
found
"""
This is largely a utility method designed to make your life easier. You could just as easily define your connector within the task itself.
Example
Here's an example project entrypoint:
# example_project/main.py
import os
from pathlib import Path
from prism.client import PrismProject
from prism.connectors import SnowflakeConnector
snowflake_conn = SnowflakeConnector(
id="snowflake-connector",
account=os.getenv("account"),
user=os.getenv("user"),
password=os.getenv("password"),
database=os.getenv("database"),
schema=os.getenv("schema"),
warehouse=os.getenv("warehouse"),
role=os.getenv("role"),
)
project = PrismProject(
tasks_dir=Path.cwd() / "tasks",
connectors=[snowflake_conn]
)
if __name__ == "__main__":
project.run()
Tasks within this project can call the SnowflakeConnector instance as follows:
# example_project/tasks/extract_from_snowflake.py
from prism.decorators import task
from prism.runtime import CurrentRun
@task()
def extract_from_snowflake():
conn = CurrentRun.conn(connector_id="snowflake-connector") # the same connector ID that was used for the Connector object
df = conn.execute_sql(..., return_type="pandas")
return df
Note that the connector_id argument value is same ID used when creating the SnowflakeConnector instance.