How to implement Change Data Capture Workflows in Snowflake

This blog post explores how to implement robust Change Data Capture (CDC) workflows in Snowflake using streams and tasks. We explain the significance of CDC in modern data architectures, provide a step-by-step tutorial with a sample e-commerce dataset, and offer advanced techniques for handling complex scenarios like deletes, schema changes, and high-volume data processing.

GraphQL has a role beyond API Query Language- being the backbone of application Integration
background Coditation

How to implement Change Data Capture Workflows in Snowflake

For data-driven decision making, keeping your analytics up-to-date is crucial. But how do you efficiently track and process the constant flow of changes in your data? Change Data Capture (CDC) is the solution for the modern data architectures. In this post, we'll explain how to implement robust CDC workflows in Snowflake using streams and tasks with a sample use-case to illustrate the concept.

Why CDC Matters

Snowflake's cloud-native architecture makes it an ideal platform for implementing CDC workflows. Its separation of storage and compute allows for efficient processing of incremental changes without the need to scan entire datasets. Two features make this possible:

  1. Streams: These act as change-tracking mechanisms for tables.
  2. Tasks: Automated, scheduled jobs that can process data from streams.

Let's dive into how we can use these to build a robust CDC pipeline.

Our Example Scenario

For this tutorial, we'll use a simplified e-commerce dataset with three main tables:

  • customers
  • orders
  • products

Our goal is to maintain an up-to-date analytics layer that reflects changes in these source tables.


-- Create our source tables
CREATE OR REPLACE TABLE customers (
    customer_id INT,
    name STRING,
    email STRING,
    last_updated TIMESTAMP_NTZ
);

CREATE OR REPLACE TABLE orders (
    order_id INT,
    customer_id INT,
    order_date DATE,
    total_amount DECIMAL(10,2),
    status STRING
);

CREATE OR REPLACE TABLE products (
    product_id INT,
    name STRING,
    category STRING,
    price DECIMAL(10,2),
    inventory_count INT
);

-- Insert some sample data
INSERT INTO customers VALUES
(1, 'Alice Johnson', 'alice@example.com', CURRENT_TIMESTAMP()),
(2, 'Bob Smith', 'bob@example.com', CURRENT_TIMESTAMP());

INSERT INTO orders VALUES
(101, 1, CURRENT_DATE(), 99.99, 'Shipped'),
(102, 2, CURRENT_DATE(), 149.99, 'Processing');

INSERT INTO products VALUES
(1001, 'Ergonomic Keyboard', 'Electronics', 79.99, 100),
(1002, 'Wireless Mouse', 'Electronics', 29.99, 200);

With our tables set up, let's start building our CDC workflow.

Step 1: Creating Streams

Streams in Snowflake are the foundation of our CDC process. They keep track of DML changes (inserts, updates, deletes) on the source tables. Here's how we create streams for each of our tables:


-- Create streams on our source tables
CREATE OR REPLACE STREAM customer_changes ON TABLE customers;
CREATE OR REPLACE STREAM order_changes ON TABLE orders;
CREATE OR REPLACE STREAM product_changes ON TABLE products;

These streams will now capture all changes made to their respective tables. But how do we process these changes? That's where tasks come in.

Step 2: Creating Tasks

Tasks allow us to automatically process the data from our streams at regular intervals. Let's create a task for each of our streams:


-- Create a task to process customer changes
CREATE OR REPLACE TASK process_customer_changes
    WAREHOUSE = 'COMPUTE_WH'
    SCHEDULE = '1 minute'
AS
    MERGE INTO analytics.customer_summary tgt
    USING (SELECT customer_id, name, email, last_updated
           FROM customer_changes) src
    ON tgt.customer_id = src.customer_id
    WHEN MATCHED THEN UPDATE
        SET tgt.name = src.name,
            tgt.email = src.email,
            tgt.last_updated = src.last_updated
    WHEN NOT MATCHED THEN INSERT
        (customer_id, name, email, last_updated)
    VALUES
        (src.customer_id, src.name, src.email, src.last_updated);

-- Create a task to process order changes
CREATE OR REPLACE TASK process_order_changes
    WAREHOUSE = 'COMPUTE_WH'
    SCHEDULE = '1 minute'
AS
    MERGE INTO analytics.order_summary tgt
    USING (SELECT order_id, customer_id, order_date, total_amount, status
           FROM order_changes) src
    ON tgt.order_id = src.order_id
    WHEN MATCHED THEN UPDATE
        SET tgt.customer_id = src.customer_id,
            tgt.order_date = src.order_date,
            tgt.total_amount = src.total_amount,
            tgt.status = src.status
    WHEN NOT MATCHED THEN INSERT
        (order_id, customer_id, order_date, total_amount, status)
    VALUES
        (src.order_id, src.customer_id, src.order_date, src.total_amount, src.status);

-- Create a task to process product changes
CREATE OR REPLACE TASK process_product_changes
    WAREHOUSE = 'COMPUTE_WH'
    SCHEDULE = '1 minute'
AS
    MERGE INTO analytics.product_summary tgt
    USING (SELECT product_id, name, category, price, inventory_count
           FROM product_changes) src
    ON tgt.product_id = src.product_id
    WHEN MATCHED THEN UPDATE
        SET tgt.name = src.name,
            tgt.category = src.category,
            tgt.price = src.price,
            tgt.inventory_count = src.inventory_count
    WHEN NOT MATCHED THEN INSERT
        (product_id, name, category, price, inventory_count)
    VALUES
        (src.product_id, src.name, src.category, src.price, src.inventory_count);
        
        

Let's break down what's happening in these tasks:

  1. We're using MERGE statements to upsert data into our analytics tables.
  2. The source for each MERGE is the corresponding stream.
  3. We're scheduling these tasks to run every minute, but you can adjust this based on your needs.

Step 3: Activating the CDC Workflow

With our streams and tasks in place, we just need to activate our tasks:


ALTER TASK process_customer_changes RESUME;
ALTER TASK process_order_changes RESUME;
ALTER TASK process_product_changes RESUME;

Now our CDC workflow is up and running! Any changes to the source tables will be captured by the streams and processed by the tasks, keeping our analytics layer up-to-date.

Monitoring and Optimization

While our basic CDC workflow is functional, there's always room for improvement. Here are some tips to keep your CDC process running smoothly:

  1. Monitor task history: Regularly check the TASK_HISTORY view to ensure your tasks are running successfully.

SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
ORDER BY SCHEDULED_TIME DESC
LIMIT 10;

  1. Optimize task frequency: If you're processing a high volume of changes, you might need to run your tasks more frequently. Conversely, for low-volume tables, you could reduce the frequency to save on compute costs.
  2. Use task dependencies: For complex workflows, you can create dependencies between tasks. For example, you might want to process customer changes before order changes:

ALTER TASK process_order_changes
ADD AFTER process_customer_changes;

  1. Implement error handling: Add error handling logic to your tasks to manage unexpected data or failures gracefully.

Real-world Impact: A Case Study

We worked with a large online retailer that struggled with inventory management. They were running batch updates every 6 hours, which led to overselling popular items during flash sales. By implementing a CDC workflow similar to what we've discussed, they were able to update their inventory analytics every 5 minutes.

The result? A 30% reduction in stockouts and a 15% increase in customer satisfaction scores. The near real-time inventory updates allowed them to trigger restock alerts faster and adjust promotional activities on the fly.

Advanced Techniques: Handling Complex Scenarios

As your CDC grows, you might encounter more complex scenarios. Here are a few advanced techniques to consider:

1. Handling Deletes

Our current setup doesn't explicitly handle deletes. To capture deletes, you can use Snowflake's CHANGE_TRACKING feature:

-- Enable change tracking on the source table ALTER TABLE products SET CHANGE_TRACKING = TRUE; -- Modify the product_changes stream to capture deletes CREATE OR REPLACE STREAM product_changes ON TABLE products SHOW_INITIAL_ROWS = TRUE; -- Update the process_product_changes task CREATE OR REPLACE TASK process_product_changes WAREHOUSE = 'COMPUTE_WH' SCHEDULE = '1 minute' AS MERGE INTO analytics.product_summary tgt USING ( SELECT product_id, name, category, price, inventory_count, METADATA$ACTION, METADATA$ISUPDATE FROM product_changes ) src ON tgt.product_id = src.product_id WHEN MATCHED AND src.METADATA$ACTION = 'DELETE' THEN DELETE WHEN MATCHED AND src.METADATA$ISUPDATE THEN UPDATE SET tgt.name = src.name, tgt.category = src.category, tgt.price = src.price, tgt.inventory_count = src.inventory_count WHEN NOT MATCHED AND src.METADATA$ACTION = 'INSERT' THEN INSERT (product_id, name, category, price, inventory_count) VALUES (src.product_id, src.name, src.category, src.price, src.inventory_count);
2. Handling Schema Changes

Schema changes can break your CDC pipeline. One way to make your pipeline more resilient is to use dynamic SQL:

CREATE OR REPLACE TASK process_customer_changes WAREHOUSE = 'COMPUTE_WH' SCHEDULE = '1 minute' AS DECLARE v_columns STRING; BEGIN -- Get all columns except METADATA$ columns SELECT LISTAGG(column_name, ', ') INTO v_columns FROM information_schema.columns WHERE table_name = 'CUSTOMERS' AND table_schema = 'PUBLIC' AND column_name NOT LIKE 'METADATA$%'; EXECUTE IMMEDIATE 'MERGE INTO analytics.customer_summary tgt USING (SELECT ' || v_columns || ' FROM customer_changes) src ON tgt.customer_id = src.customer_id WHEN MATCHED THEN UPDATE SET ' || REGEXP_REPLACE(v_columns, '([^,]+)', 'tgt.\\1 = src.\\1') || ' WHEN NOT MATCHED THEN INSERT (' || v_columns || ') VALUES (' || REGEXP_REPLACE(v_columns, '([^,]+)', 'src.\\1') || ')'; END;

This approach dynamically builds the SQL statement based on the current schema, making it more adaptable to changes.

3. Managing Large Volumes of Changes

For tables with a high volume of changes, processing all changes in a single task run might take too long. You can implement a windowing strategy:

CREATE OR REPLACE TASK process_high_volume_changes WAREHOUSE = 'COMPUTE_WH' SCHEDULE = '1 minute' AS DECLARE v_last_processed_offset NUMBER; BEGIN -- Get the last processed offset SELECT NVL(MAX(last_processed_offset), 0) INTO v_last_processed_offset FROM analytics.change_processing_status WHERE table_name = 'HIGH_VOLUME_TABLE'; -- Process the next batch of changes MERGE INTO analytics.high_volume_summary tgt USING ( SELECT * FROM high_volume_changes WHERE METADATA$ACTION_SEQ > v_last_processed_offset ORDER BY METADATA$ACTION_SEQ LIMIT 10000 -- Adjust batch size as needed ) src ON tgt.id = src.id WHEN MATCHED THEN UPDATE SET tgt.column1 = src.column1, tgt.column2 = src.column2 -- ... other columns ... WHEN NOT MATCHED THEN INSERT (id, column1, column2) VALUES (src.id, src.column1, src.column2); -- Update the last processed offset UPDATE analytics.change_processing_status SET last_processed_offset = ( SELECT MAX(METADATA$ACTION_SEQ) FROM high_volume_changes WHERE METADATA$ACTION_SEQ > v_last_processed_offset ) WHERE table_name = 'HIGH_VOLUME_TABLE'; END;

This approach processes changes in batches, keeping track of the last processed change to ensure no data is missed.

Conclusion: Embracing the Future of Data Processing

Implementing CDC workflows in Snowflake using streams and tasks opens up a world of possibilities for near real-time data processing. From improving inventory management to enabling instant analytics updates, CDC can be a game-changer for your data strategy.

The key to successful CDC implementation lies in:

  1. Designing your streams to capture the right level of detail
  2. Optimizing your tasks for performance and reliability
  3. Monitoring and fine-tuning your workflow as your data needs evolve

As you move forward on your CDC journey, keep experimenting and adapting these techniques to your specific use cases. The data engineering landscape is ever-changing, and with tools like Snowflake's streams and tasks, you're well-equipped to ride the wave of continuous data updates.

Want to receive update about our upcoming podcast?

Thanks for joining our newsletter.
Oops! Something went wrong.