Using Snowflake’s Change Stream for CDC in Reverse ETL
The first time I heard about reverse ETL was a few years ago when I was working for a company trying to improve the customer experience on their app. Our team dealt with an average of 40,000 orders every day and over 100,000 unique retailers every week, so we wanted to introduce a section in our product listing page where we recommended products to customers based on the following:
- Top-selling products in the previous week
- Frequently purchased products
Instead of calculating these values on the fly every time customers loaded the product listing page, we made things easier by doing all the heavy analytics workload in our data warehouse and setting up an airflow job to send the results to a Postgres table every Monday morning before the start of business for the week.
In my head, it was a straightforward task: the data warehouse was the source, and our production Postgres instance used to serve customers was the destination — simply ETL. Reverse ETL, like the Modern Data Stack, is a concept mostly invented by software companies for marketing purposes. Even though the term often mystifies business stakeholders, it’s a simple concept.
ETL is the process of extracting data from a source, transforming it, and loading it into a destination.
Reverse ETL, as the name implies, means moving data in the other direction — from your analytical platform into another system where you can leverage it, usually for transactional purposes/workloads. It may sound complex, but it can be a straightforward and useful process depending on how you implement it!
A Different CDC
If you’re based in the U.S., hearing the term CDC probably makes you think of the Centers for Disease Control and Prevention. But I want to talk to you about a different CDC — change data capture.
CDC is a process where you track changes to your data (e.g., inserts, updates, deletes) in real-time, usually as logs that you can use to replicate the changes in another location to keep it in sync with your source. CDC can keep a downstream system in sync with a source while maintaining a record of all actions taken on the source. But there’s an important distinction between incrementally syncing data between two systems and CDC.
Incremental syncs only care about the difference of an object between two arbitrary points in time. In contrast, CDC cares about all the intermediate actions between those two points. Since you capture all the changes on the source system, you can use CDC in data replication and to implement things like SCD type 2 tables that help you keep track of historical changes to your data.
A Need for CDC and Reverse ETL
If you wonder why you’d ever need a system like the one I’m describing, let me provide a use case and context for you!
I recently worked on a product whose unique selling point (USP) was to provide customers with neatly curated and properly structured data. On paper, the idea was simple. The data we were curating is all over the internet and can be found through a little searching. Instead of letting you go through all that trouble, we pulled it from all the available sources, pieced it together neatly, and provided it in an application!
There were layers of complexity to deal with, including but not limited to implementing web scraping at scale to pull the data from its different sources, bringing the data from all these sources into a unified schema, tracking changes across all scraped sources, keeping the curated data fresh, deduplication, entity resolution to resolve records across different sources, and of course reverse ETL!
Data had to flow from different sources into a centralized data warehouse (in our case, Snowflake) to get cleaned up, unified, and resolved, and then it had to flow into a production database.
Altering the state of a production database while avoiding downtimes meant finding a way to replicate the changes in Snowflake into the production database system without negatively impacting the system’s performance.
We experimented with several options. The simplest was truncating the destination production tables. Since the target system is a live application, truncating or otherwise tying up the database indefinitely wasn’t acceptable because the application wasn’t allowed to experience significant downtime or performance load and the replication tables contained other dependencies. This meant truncating the destination table wasn’t an option, as that would sever any dependent relationships in the production database. We ultimately decided the best way forward was to only push changes that occurred in Snowflake as CDC-styled messages with appropriate metadata to inform Postgres about what to do with each message (Insert/ Update/ Delete).
Snowflake CDC: Stream Objects vs Changes Clause
Snowflake provides two features that can implement CDC:
- Streams
- Change Tracking
Streams
Snowflake stream objects allow you to track changes made on a Snowflake table and allow for the consumption of the stream object via DML statements like INSERT and MERGE. The major limitation of using streams is that they can only be consumed once. After changes are consumed, the stream object is cleared and will only have new records if new changes are made to the table the stream object is attached to.
Change Tracking
Conversely, using the changes clause enables you to query the change tracking metadata for a table or view within a specified time interval without creating a stream with an explicit transactional offset.
Multiple queries can retrieve the change tracking metadata between transactional start and endpoints. Using the changes clause to track metadata about changes on a table allowed for greater flexibility for our client because we could control and track changes at different points in time. Additionally, using the changes clause meant that we could implement a reverse ETL process that was fault tolerant and could be replayed if there was an issue with a particular load. This flexibility and fault tolerance made it a more desirable way to implement our custom reverse ETL system.
To implement change tracking on a table in Snowflake, you simply need to alter the table to turn change tracking on. Depending on the kind of data you’re interested in, there are a few considerations to keep in mind:
- Change tracking streams are directly impacted by the table retention period. If the retention period is set to one day, you cannot retrieve changes that occurred in the table more than one day ago.
- The retention period is set to a default value of one and the maximum possible value depends on your Snowflake edition and type of table.
- On the Snowflake enterprise edition, the permanent tables retention period can be extended up to 90 days but this will impact your monthly storage cost on Snowflake.
- There are different types of stream objects that can be used to determine the type of changes to be fetched by a query, so it’s important to know your objectives and what type of stream is best suited to meet your needs.
A Few Hurdles to Cross
Since we were interested in all DML changes (insert, update, and delete), we had to set the stream type on our table to “standard” (other options can be found here). The cadence at which the Reverse ETL process was executed was also an important consideration because the ability to track historical changes also relied on the table’s retention period, which by default is one day.
We needed a way to track state (i.e., metadata about the timestamp used to calculate the offset) to provide a timestamp value for the offset starting point every time we ran our job.
A stream becomes stale when its offset is outside of the data retention period for its source table. Transient tables have a max time travel period of one day, so we had to change the table type to extend the retention and time travel days.
We needed a mechanism to trigger our process and an orchestrator to coordinate the entire process. Because we use dbt to handle different transformation workloads, we had to alter dbt’s default behavior for materializing data into our table with change tracking enabled so that the table is never dropped and we don’t lose the change tracking metadata.
Finally, we needed a way to send the delta changes off to Postgres. We eventually settled for exporting the data as parquet files to an S3 bucket that will trigger a lambda function designed to consume the change files and update the appropriate tables in the Postgres instance.
For this project, we relied on Prefect for our ETL workflows. Prefect Cloud was leveraged to manage and monitor our job while an EC2 instance was set up as the dedicated infrastructure for running our Prefect jobs. As a result, it was a natural choice for orchestrating our reverse ETL flow. Since we leveraged dbt for transformation workloads and had a production job that ran up to four times a day, we configured a dbt cloud webhook and leveraged AWS Lambda to listen for job completion events. We then used this information to trigger our reverse ETL job on successful runs.
If this sounds overly complex, there are simpler ways to do it. You can actually implement this workflow without any external systems by doing something as simple as creating a Snowflake task running on a cron schedule.
Looking Ahead
In this article, I introduced you to two features of Snowflake that enable you to implement CDC, as well as a use case where I had to design a reverse ETL flow and a few of the challenges we faced trying to set up a CDC-styled reverse ETL system. In the next blog post in this series, I’ll walk you through the finer details of how we implemented the system using Python, Prefect, and AWS Lambda.
Need help leveraging change streams in Snowflake to set up a CDC-styled reverse ETL system? Reach out. As a Snowflake Elite Services partner, we have the expertise to help you make the most of Snowflake so you can harness the power of your data.