Prism
v0.2.8
v0.2.8
  • 👋Welcome to Prism!
  • Getting Started
    • Installation
    • Creating your first project
    • Why Prism?
  • Fundamentals
    • Tasks
      • tasks
        • tasks.ref()
      • hooks
        • hooks.sql
        • hooks.spark
        • hooks.dbt_ref
        • hooks.get_connection
        • hooks.get_cursor
    • Targets
      • Multiple targets
    • Config files
      • prism_project.py
        • RUN_ID / SLUG
        • SYS_PATH_CONF
        • THREADS
        • PROFILE_YML_PATH / PROFILE
        • PRISM_LOGGER
        • TRIGGERS_YML_PATH / TRIGGERS
      • Profile YML
      • Triggers YML
    • Jinja
      • __file__ and Path
      • prism_project
      • wkdir
      • parent_dir
      • concat
      • env
  • Adapters
    • Overview
    • sql
      • BigQuery
      • Postgres
      • Redshift
      • Snowflake
      • Trino
      • Presto
    • PySpark
    • dbt
  • Agents
    • Overview
    • Docker
    • EC2
  • CLI
    • Command Line Interface
    • agent
      • apply
      • run
      • build
      • delete
    • compile
    • connect
    • create
      • agent
      • task
      • trigger
    • graph
    • init
    • run
    • spark-submit
  • Advanced features
    • Concurrency
    • Logging
    • Triggers
    • Retries
    • Python Client
    • Skipping tasks
  • API Reference
    • prism.task.PrismTask
    • @task(...)
    • @target(...)
    • @target_iterator(...)
    • TaskManager
      • tasks.ref(...)
    • PrismHooks
      • load_hooks
      • hooks.sql(...)
      • hooks.dbt_ref(...)
      • hooks.get_connection(...)
      • hooks.get_cursor(...)
    • prism.target.PrismTarget
  • Use Cases
    • Analytics on top of dbt
    • Machine Learning
  • Wiki
    • DAGs
Powered by GitBook
On this page
  1. API Reference

prism.target.PrismTarget

Subclasses of the prism.target.Target class

class PrismTarget:

    def __init__(self, obj, loc, hooks):
        self.obj = obj
        self.loc = loc
        self.hooks = hooks

    def save(self):
        raise prism.exceptions.RuntimeException(message="`save` method not implemented")

    @classmethod
    def open(cls, loc, hooks):
        raise prism.exceptions.RuntimeException(message="`open` method not implemented")


class PySparkParquet(PrismTarget):

    def save(self, **kwargs):
        self.obj.write.parquet(self.loc, **kwargs)

    @classmethod
    def open(cls, loc, hooks):
        # Imports
        from pyspark.sql import SparkSession

        # Identify SparkSession alias
        spark = None
        for attr in dir(hooks):
            _tmp = getattr(hooks, attr)
            if isinstance(_tmp, SparkSession):
                spark = _tmp
                break

        # If we couldn't find the spark session, raise error
        if spark is None:
            raise prism.exceptions.RuntimeException("could not find SparkSession in PrismHooks!")  # noqa: E501

        # Object
        obj = spark.read.parquet(loc)
        return cls(obj, loc, hooks)


class PandasCsv(PrismTarget):

    def save(self, **kwargs):
        self.obj.to_csv(self.loc, **kwargs)

    @classmethod
    def open(cls, loc, hooks):
        import pandas as pd
        obj = pd.read_csv(loc)
        return cls(obj, loc, hooks)


class NumpyTxt(PrismTarget):

    def save(self, **kwargs):
        import numpy as np
        np.savetxt(self.loc, self.obj, **kwargs)

    @classmethod
    def open(cls, loc, hooks):
        import numpy as np
        obj = np.loadtxt(loc)
        return cls(obj, loc, hooks)


class Txt(PrismTarget):

    def save(self, **kwargs):
        with open(self.loc, "w") as f:
            f.write(self.obj, **kwargs)
        f.close()

    @classmethod
    def open(cls, loc, hooks):
        with open(loc, 'r') as f:
            obj = f.read()
        return cls(obj, loc, hooks)


class MatplotlibPNG(PrismTarget):

    def save(self, **kwargs):
        self.obj.savefig(self.loc, **kwargs)

    @classmethod
    def open(cls, loc, hooks):
        from PIL import Image
        obj = Image.open(loc)
        return cls(obj, loc, hooks)


class JSON(PrismTarget):

    def save(self, **kwargs):
        import json
        json_object = json.dumps(self.obj, **kwargs)
        with open(self.loc, "w") as f:
            f.write(json_object)

    @classmethod
    def open(cls, loc, hooks):
        import json
        with open(loc, 'r') as f:
            obj = json.loads(f.read())
        return cls(obj, loc, hooks)
Previoushooks.get_cursor(...)NextAnalytics on top of dbt