Enterprises generally prefer to use any of the modern tools like Matillion, SnapLogic, Alteryx, RedShift, SSIS, Informatica Cloud, Skyvia, HEVO, Ab Initio, Talend, Pentaho. etc. for ETL/ELT needs. Primary reasons could be existing licenses, ease of migrating orchestration & transformation jobs, enterprise requirement for a full-blown ELT/ETL solution, inclined towards tools with detailed GUI, debugging mechanism and audit & error logs, etc. In this article, we will look into how we can build an ETL process using Snowflake and it’s native features – primarily Streams & Tasks. In other words, ETL without using any external tool.
If you already have Matillion and looking to migrate to Snowflake – Using Matillion for Data Loading into Snowflake – Metadata Driven Approach
Snowflake is a cloud data platform and data warehouse that supports the most common standardized version of SQL: ANSI. This implies that most common SQL operations are usable within Snowflake. Snowflake supports DDL & DML operations that enable data warehousing implementation. Most of all, ETL operations can be easily written in Snowflake including transformation, aggregation, SCDs, batch processing, ranking/scoring, etc.
Here, we look into how we can implement SCD using Snowflake.
Slowly Changing Dimensions (SCD) — dimensions that change slowly over time, rather than changing on a regular schedule, time-base.
In a Data Warehouse, there is a need to track the changes in dimension attributes, in order to report historical data. In other words, implementing one of the SCD types should enable users to assign proper dimension attribute value for a given date. Examples of such dimensions could be customer, geography, employee.
There are many approaches on how to deal with SCD. Universally used types of SCD’s are:
|SCD1||Overwriting the data in the target without maintaining track of the previous record|
|SCD2||Maintaining the history of a record in the target table at a row-level|
|SCD3||Maintaining the history of a record in the target table at a column level|
For maintaining historical data, the most commonly used method is SCD type 2. In SCD2, we capture the change in data at row level and maintain the historical records along with the current latest record in the same table.
We can represent the historical record & current record by any of the following parameters:
- Flag (Y/N)
- Versioning (0,1,2…..)
- Date (StartDate & EndDate)
Building an ETL process in Snowflake is very simple using the Streams & Tasks functionalities that Snowflake recently announced at the Snowflake Summit.
A stream object records DML changes made to tables, including inserts, updates, deletes and metadata about each change, so that actions can be taken using the changed data. This capability can be used for change data capture (CDC). An individual table stream tracks the changes made to rows in a source table. A table stream allows you to query a table and consume a set of changes to a table, at row level, between two transactional points in time.
Note that a stream itself does not contain any table data. A stream only stores the offset for the source table and returns CDC records by leveraging the versioning history for the source table. When the first stream for a table is created, a pair of hidden columns are added to the source table and begin storing change tracking metadata. These columns consume a small amount of storage.
A stream maintains a point of time into the transactional versioned timeline of the source table, called an offset, which starts at the transactional point when the stream contents were last consumed using a DML statement.
The stream maintains only the delta of the changes; if multiple DML statements change a row, the stream contains only the latest action taken on that row.
Multiple queries can independently consume the same change data from a stream without changing the offset.To ensure multiple statements access the same change records in the stream, surround them with an explicit transaction statement (BEGIN .. COMMIT). This locks the stream. DML updates to the source table in parallel transactions are tracked by the change tracking system but do not update the stream until the explicit transaction statement is committed and the existing change data is consumed.
The following diagram shows how the contents of a standard table stream change as rows in the source table are updated. Whenever a DML statement consumes the stream contents. The stream position advances to track the next set of DML changes to the table
A stream stores data in the same shape as the source table (i.e. the same column names and ordering) with following additional columns:
|METADATA$ACTION||Indicates the DML operation (INSERT, DELETE) recorded|
|METADATA$ISUPDATE||Indicates whether the operation was part of an UPDATE statement. Updates to rows in the source table are represented as a pair of DELETE and INSERT records in the stream with a metadata column METADATA$ISUPDATE values set to TRUE.|
|METADATA$ROW_ID||Specifies the unique and immutable ID for the row, which can be used to track changes to specific rows over time|
For more detailed information on Streams, you can visit the Snowflake website for Streams.
User-defined tasks allow scheduled execution of SQL statements. Tasks run as per specified execution configuration, using any combination of a set interval and/or a flexible schedule using a subset of familiar cron utility syntax. Tasks can execute a single SQL statement, including a call to a stored procedure.
Tasks can also be used independently to generate periodic reports by inserting or merging rows into a report table or perform other periodic work. A task runs on a schedule, which can be defined when creating a task (using CREATE TASK) or later (using ALTER TASK).
Snowflake ensures only one instance of a task with a schedule (i.e. a standalone task or the root task in a tree of tasks) is executed at a given time. If a task is still running when the next scheduled execution time occurs, then that scheduled time is skipped.
Users can define a simple tree-like structure of tasks that starts with a root task and is linked together by task dependencies.
A predecessor task can be defined when creating a task (using CREATE TASK … AFTER) or later (using ALTER TASK … ADD AFTER). The root task in the tree should have a defined schedule, while each of the other tasks in the tree have a defined predecessor to link them together. When a run of a predecessor task finishes successfully, it triggers the run of any child tasks that identify this task as the predecessor in their definition. The following diagram shows a tree of tasks that requires 5 minutes on average to complete for each run. The diagram shows the window for 2 runs of the tree of tasks to complete. This window is calculated from the time the root task is scheduled to start until the last child task in the tree has completed running.
We can check the status of every task created in the given Snowflake account by using the TASK_HISTORY table function in the Information Schema.
Here we can see the scheduled time, completed time and next schedule of a given task along with its dependencies.
The average difference between the scheduled and completed times for a task is the expected average run time for the task, including any period in which the task was queued. A task is queued when other processes are currently using all of the servers in the warehouse. This function returns task activity within the last 7 days.
Tasks can be combined with table streams for continuous ELT workflows to process recently changed table rows.
In this article, we will look more into detail on one of the practical scenarios of implementing ETL with SCD, in Snowflake, by leveraging the full power of Snowflake Streams & Tasks.
For more detailed information on Tasks, you can visit the Snowflake website for Tasks.
Building an end-to-end orchestration & transformation pipeline using Snowflake Streams & Tasks.
In the design discussed here, we have a basic ELT model of loading the data from the source all the way to the reporting/analytics layer.
The source is a CSV file, which provides the incremental data, which is then loaded into a Snowflake table (landing layer). The source file will only have delta records and no historical data present.
The landing table has a similar columnar structure as the source and will have only delta records – having a truncate and load concept.
The staging table is where SCD2 is implemented, maintaining the historical as well as incremental delta records.
The reporting or analytics table has only the most recent delta data (active records) along with aggregate calculated measures and other facts & dimensions, which is utilized by business for analytics.
We have the source file (Orders.csv) placed at an AWS location (or SFTP or any other external file transfer location. For this implementation, we have used AWS S3 as our file storage system), where we get an incremental file (with latest changes).
Every time an updated file is placed in the S3 path, the data is copied into the landing table.
We will have a temporary table (ORDERS_SRC) to load this source file data and then we will overwrite the landing table (ORDERS_LANDING) using this temporary table.
Below query is used to process this source file into a Snowflake table:
The temporary table is always truncated to incorporate only the latest file data (delta).
Then the landing table is overwritten with this updated data in the temporary table. We will use the MERGE command to overwrite the changes to the Landing table.
For every new transaction data the table will be refreshed, with latest data.
But we need to track the changes in the Landing tables and maintain history, i.e. every time the data in the landing table changes, the changes need to be tracked (Insert/Update).
Tracking this CDC can be done using Streams.
Create a stream on the landing table to monitor the CDC.
Once the stream is created, the data from the temporary table is loaded into the landing table, so that the metadata is updated as expected.
Once the records are loaded into landing, the corresponding stream is also updated.
Based on the METADATA$ACTION & METADATA$ISUPDATE columns in the stream, we identify whether the record is a New or Updated record.
Unless the data is pulled from the stream, all records will remain as it is in the stream and CDC will not be captured if the update is processed before extracting data from the Stream.
Now we process the data in the Stream into the staging table (ORDERS_STAGING).
As per the normal SCD2 process, Staging table will not be populated from the landing table, rather it will be as per the metadata info available in the Stream corresponding to the Landing table. This will make the SCD implementation much easier.
So after inserting the data from the Stream into the Staging table, using the metadata info, the data will be as below:
If we observe, the stream is cleared/empty, once the data from it is processed.
The delta for the records will be tracked on the O_ORDERSTATUS column.
The history of this data is tracked using: STARTDATE, ENDDATE, FLAG.
Now, the requirement is, with every new insert & update in landing, the data in staging should get updated. Below is an example of incremental process:
After the data is processed into ORDERS_LANDING, let us see the data change in the Stream.
If we observe, there are 2 entries for the records that were updated:
- Insert into Landing table ($ACTION = INSERT, $ISUPDATE = FALSE)
- Update on the previous landing record ($ACTION = DELETE, $ISUPDATE = TRUE)
This is where we implement SCD2, in order to maintain the history for records with O_ORDERKEY (1800033, 1800035).
The output in STAGING will thus be updated as below:
All the records in the Staging table having Flag=’Y’ are Active records. Only these active records will be part of the Analytics table.
As per the design, each time the staging table is updated, the Analytics table is refreshed & active records are available to the customers.
Here we see how to automate this entire process from Source to Analytics without having to run the queries manually after every data change. This we do using Tasks.
Below are the tasks that we will have as part of the above ETL process:
- Truncate Temporary table before every load
- Load data from Source File to Temporary Table
- Overwrite landing data with latest Temp table data
- Landing to Staging, when there are records present in the Stream of Landing, i.e. New or Updated records have been processed
- Staging to Analytics, processing only Active records.
- Remove the file on S3
Each task will be dependent on the task prior to it, thus creating a pipeline.
Let us have a look at each task in detail.
In TASK1, we will truncate the temporary table in order to load the new/updated data from the source file present in S3.
As per the definition for the TASK1, the task is scheduled to execute every 15mins. In short, an updated transaction is available every 15mins and the ORDERS_SRC table will be overwritten with the updated file data.
In TASK2, we will load the source file data into the ORDERS_SRC temporary table.
This task will execute after TASK1, by addition of the AFTER clause. So if TASK1 goes into SKIPPED state, TASK2 will never get executed for those runs, thus saving unnecessary execution & compute time.
In TASK3, the data will be overwritten into the ORDERS_LANDING table from the temporary table. Here, only the data that has changed will be overwritten, thus implementing an SCD1 design.
In TASK4, the changed data will be captured in the staging layer, as per the data in the landing layer
So in short, the task will check the Landing Stream for every 15mins for data & if the data is present then it will execute the Merge statement, else it will not be executed, and will skip the current task & wait for the next schedule.
If we observe, the last 3 runs were skipped, because there was no data change detected in the Landing table, i.e. the Stream was Empty.
Now, if TASK4 is successful, i.e. it finds data in the Stream, then Staging is updated & this data now automatically flows to the Analytics layer (Only the active data). In order for this to happen we have another adjoining task.
In TASK6, once the Analytics layer load is completed, we will remove/archive the source file present in S3. In this scenario, we have removed the file.
The list of tasks in the schema can be viewed using SHOW TASKS command.
And that’s about the implementation of a data pipeline with SCD2 using Streams & Tasks in Snowflake.
Advantages of using Streams & Tasks
- Restrict the use of external ETL/ELT tools for ingestion, extraction, transformation and scheduling.
- Simple and hassle-free coding.
- Ease in branching, looping and dependency setting of multiple queries/tasks.
Would like to hear about your experience of using Streams & Tasks with Snowflake.