How to implement Change Data Capture Workflows in Snowflake

This blog post explores how to implement robust Change Data Capture (CDC) workflows in Snowflake using streams and tasks. We explain the significance of CDC in modern data architectures, provide a step-by-step tutorial with a sample e-commerce dataset, and offer advanced techniques for handling complex scenarios like deletes, schema changes, and high-volume data processing.

GraphQL has a role beyond API Query Language- being the backbone of application Integration
background Coditation

How to implement Change Data Capture Workflows in Snowflake

Data-driven decision-making hinges on real-time insights. But how can you ensure your analytics always reflect the latest data changes? Change Data Capture (CDC) is the answer. This post delves into implementing robust CDC workflows in Snowflake using streams and tasks. A practical use case will illuminate the process.

Why CDC Matters

Snowflake's cloud-native architecture positions it as an ideal platform for efficient Change Data Capture (CDC) workflows. Its decoupled storage and compute layers enable the processing of incremental data changes without the need for full dataset scans. Two key features facilitate this:

  1. Streams: These act as change-tracking mechanisms for tables.
  2. Tasks: Automated, scheduled jobs that can process data from streams.

Now, let's explore how to leverage these tools to create a powerful CDC pipeline.

Our Example Scenario

For this tutorial, we'll use a simplified e-commerce dataset with three main tables:

  • customers
  • orders
  • products

We aim to keep our analytics layer current, ensuring it mirrors the most recent data from its source tables.


-- Create our source tables
CREATE OR REPLACE TABLE customers (
    customer_id INT,
    name STRING,
    email STRING,
    last_updated TIMESTAMP_NTZ
);

CREATE OR REPLACE TABLE orders (
    order_id INT,
    customer_id INT,
    order_date DATE,
    total_amount DECIMAL(10,2),
    status STRING
);

CREATE OR REPLACE TABLE products (
    product_id INT,
    name STRING,
    category STRING,
    price DECIMAL(10,2),
    inventory_count INT
);

-- Insert some sample data
INSERT INTO customers VALUES
(1, 'Alice Johnson', 'alice@example.com', CURRENT_TIMESTAMP()),
(2, 'Bob Smith', 'bob@example.com', CURRENT_TIMESTAMP());

INSERT INTO orders VALUES
(101, 1, CURRENT_DATE(), 99.99, 'Shipped'),
(102, 2, CURRENT_DATE(), 149.99, 'Processing');

INSERT INTO products VALUES
(1001, 'Ergonomic Keyboard', 'Electronics', 79.99, 100),
(1002, 'Wireless Mouse', 'Electronics', 29.99, 200);

With our tables set up, let's start building our CDC workflow.

Step 1: Creating Streams

Snowflake Streams are the cornerstone of our Change Data Capture (CDC) pipeline. By monitoring Data Manipulation Language (DML) operations - insertions, updates, and deletions - on source tables, Streams enable us to efficiently track and process data changes. Let's delve into the process of creating Streams for each of our tables.


-- Create streams on our source tables
CREATE OR REPLACE STREAM customer_changes ON TABLE customers;
CREATE OR REPLACE STREAM order_changes ON TABLE orders;
CREATE OR REPLACE STREAM product_changes ON TABLE products;

By capturing all modifications to designated tables, these data streams provide a real-time record of change. To leverage this information effectively, we introduce the concept of tasks: automated processes designed to handle and analyze these changes.

Step 2: Creating Tasks

Tasks allow us to automatically process the data from our streams at regular intervals. Let's create a task for each of our streams:


-- Create a task to process customer changes
CREATE OR REPLACE TASK process_customer_changes
    WAREHOUSE = 'COMPUTE_WH'
    SCHEDULE = '1 minute'
AS
    MERGE INTO analytics.customer_summary tgt
    USING (SELECT customer_id, name, email, last_updated
           FROM customer_changes) src
    ON tgt.customer_id = src.customer_id
    WHEN MATCHED THEN UPDATE
        SET tgt.name = src.name,
            tgt.email = src.email,
            tgt.last_updated = src.last_updated
    WHEN NOT MATCHED THEN INSERT
        (customer_id, name, email, last_updated)
    VALUES
        (src.customer_id, src.name, src.email, src.last_updated);

-- Create a task to process order changes
CREATE OR REPLACE TASK process_order_changes
    WAREHOUSE = 'COMPUTE_WH'
    SCHEDULE = '1 minute'
AS
    MERGE INTO analytics.order_summary tgt
    USING (SELECT order_id, customer_id, order_date, total_amount, status
           FROM order_changes) src
    ON tgt.order_id = src.order_id
    WHEN MATCHED THEN UPDATE
        SET tgt.customer_id = src.customer_id,
            tgt.order_date = src.order_date,
            tgt.total_amount = src.total_amount,
            tgt.status = src.status
    WHEN NOT MATCHED THEN INSERT
        (order_id, customer_id, order_date, total_amount, status)
    VALUES
        (src.order_id, src.customer_id, src.order_date, src.total_amount, src.status);

-- Create a task to process product changes
CREATE OR REPLACE TASK process_product_changes
    WAREHOUSE = 'COMPUTE_WH'
    SCHEDULE = '1 minute'
AS
    MERGE INTO analytics.product_summary tgt
    USING (SELECT product_id, name, category, price, inventory_count
           FROM product_changes) src
    ON tgt.product_id = src.product_id
    WHEN MATCHED THEN UPDATE
        SET tgt.name = src.name,
            tgt.category = src.category,
            tgt.price = src.price,
            tgt.inventory_count = src.inventory_count
    WHEN NOT MATCHED THEN INSERT
        (product_id, name, category, price, inventory_count)
    VALUES
        (src.product_id, src.name, src.category, src.price, src.inventory_count);
        
        

Let's break down what's happening in these tasks:

  1. We're using MERGE statements to upsert data into our analytics tables.
  2. The source for each MERGE is the corresponding stream.
  3. We're scheduling these tasks to run every minute, but you can adjust this based on your needs.

Step 3: Activating the CDC Workflow

With our streams and tasks in place, we just need to activate our tasks:


ALTER TASK process_customer_changes RESUME;
ALTER TASK process_order_changes RESUME;
ALTER TASK process_product_changes RESUME;

Our CDC pipeline is now live! Changes to source tables are automatically detected, streamed, and processed, ensuring our analytics layer remains current.

Monitoring and Optimization

Optimizing Your CDC Workflow: Key Tips

  1. Monitor task history: Regularly check the TASK_HISTORY view to ensure your tasks are running successfully.

SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
ORDER BY SCHEDULED_TIME DESC
LIMIT 10;

  1. Optimize task frequency: If you're processing a high volume of changes, you might need to run your tasks more frequently. Conversely, for low-volume tables, you could reduce the frequency to save on compute costs.
  2. Use task dependencies: For complex workflows, you can create dependencies between tasks. For example, you might want to process customer changes before order changes:

ALTER TASK process_order_changes
ADD AFTER process_customer_changes;

  1. Implement error handling: Add error handling logic to your tasks to manage unexpected data or failures gracefully.

Real-world Impact: A Case Study

A major online retailer faced challenges with inventory management, particularly during high-traffic flash sales. Their batch update process, running every 6 hours, often resulted in overselling popular items. By adopting a real-time Change Data Capture (CDC) workflow, they significantly improved inventory accuracy, enabling them to update their analytics every 5 minutes.

By leveraging near real-time inventory updates, the company achieved a remarkable 30% reduction in stockouts and a 15% boost in customer satisfaction. This timely data enabled prompt restock alerts and agile promotional adjustments.

Advanced Techniques: Handling Complex Scenarios

Scaling your CDC can present intricate challenges. Explore these advanced strategies to optimize your data pipeline:

1. Handling Deletes

Our current setup doesn't explicitly handle deletes. To capture deletes, you can use Snowflake's CHANGE_TRACKING feature:

-- Enable change tracking on the source table ALTER TABLE products SET CHANGE_TRACKING = TRUE; -- Modify the product_changes stream to capture deletes CREATE OR REPLACE STREAM product_changes ON TABLE products SHOW_INITIAL_ROWS = TRUE; -- Update the process_product_changes task CREATE OR REPLACE TASK process_product_changes WAREHOUSE = 'COMPUTE_WH' SCHEDULE = '1 minute' AS MERGE INTO analytics.product_summary tgt USING ( SELECT product_id, name, category, price, inventory_count, METADATA$ACTION, METADATA$ISUPDATE FROM product_changes ) src ON tgt.product_id = src.product_id WHEN MATCHED AND src.METADATA$ACTION = 'DELETE' THEN DELETE WHEN MATCHED AND src.METADATA$ISUPDATE THEN UPDATE SET tgt.name = src.name, tgt.category = src.category, tgt.price = src.price, tgt.inventory_count = src.inventory_count WHEN NOT MATCHED AND src.METADATA$ACTION = 'INSERT' THEN INSERT (product_id, name, category, price, inventory_count) VALUES (src.product_id, src.name, src.category, src.price, src.inventory_count);
2. Handling Schema Changes

Schema changes can break your CDC pipeline. One way to make your pipeline more resilient is to use dynamic SQL:

CREATE OR REPLACE TASK process_customer_changes WAREHOUSE = 'COMPUTE_WH' SCHEDULE = '1 minute' AS DECLARE v_columns STRING; BEGIN -- Get all columns except METADATA$ columns SELECT LISTAGG(column_name, ', ') INTO v_columns FROM information_schema.columns WHERE table_name = 'CUSTOMERS' AND table_schema = 'PUBLIC' AND column_name NOT LIKE 'METADATA$%'; EXECUTE IMMEDIATE 'MERGE INTO analytics.customer_summary tgt USING (SELECT ' || v_columns || ' FROM customer_changes) src ON tgt.customer_id = src.customer_id WHEN MATCHED THEN UPDATE SET ' || REGEXP_REPLACE(v_columns, '([^,]+)', 'tgt.\\1 = src.\\1') || ' WHEN NOT MATCHED THEN INSERT (' || v_columns || ') VALUES (' || REGEXP_REPLACE(v_columns, '([^,]+)', 'src.\\1') || ')'; END;

This approach dynamically builds the SQL statement based on the current schema, making it more adaptable to changes.

3. Managing Large Volumes of Changes

For tables with a high volume of changes, processing all changes in a single task run might take too long. You can implement a windowing strategy:

CREATE OR REPLACE TASK process_high_volume_changes WAREHOUSE = 'COMPUTE_WH' SCHEDULE = '1 minute' AS DECLARE v_last_processed_offset NUMBER; BEGIN -- Get the last processed offset SELECT NVL(MAX(last_processed_offset), 0) INTO v_last_processed_offset FROM analytics.change_processing_status WHERE table_name = 'HIGH_VOLUME_TABLE'; -- Process the next batch of changes MERGE INTO analytics.high_volume_summary tgt USING ( SELECT * FROM high_volume_changes WHERE METADATA$ACTION_SEQ > v_last_processed_offset ORDER BY METADATA$ACTION_SEQ LIMIT 10000 -- Adjust batch size as needed ) src ON tgt.id = src.id WHEN MATCHED THEN UPDATE SET tgt.column1 = src.column1, tgt.column2 = src.column2 -- ... other columns ... WHEN NOT MATCHED THEN INSERT (id, column1, column2) VALUES (src.id, src.column1, src.column2); -- Update the last processed offset UPDATE analytics.change_processing_status SET last_processed_offset = ( SELECT MAX(METADATA$ACTION_SEQ) FROM high_volume_changes WHERE METADATA$ACTION_SEQ > v_last_processed_offset ) WHERE table_name = 'HIGH_VOLUME_TABLE'; END;

This approach processes changes in batches, keeping track of the last processed change to ensure no data is missed.

Conclusion: Embracing the Future of Data Processing

Leveraging Snowflake's Streams and Tasks to supercharge your data pipeline with Change Data Capture (CDC). This powerful combination enables near-instantaneous data processing, unlocking a universe of possibilities for real-time analytics and operational insights.

The key to successful CDC implementation lies in:

  1. Designing your streams to capture the right level of detail
  2. Optimizing your tasks for performance and reliability
  3. Monitoring and fine-tuning your workflow as your data needs evolve

As you navigate your continuous data change (CDC) journey, remember to embrace experimentation and tailor these techniques to your specific use cases. The dynamic data engineering landscape, coupled with powerful tools like Snowflake's streams and tasks, empowers you to seamlessly adapt to the ever-evolving world of real-time data.

Want to receive update about our upcoming podcast?

Thanks for joining our newsletter.
Oops! Something went wrong.

Latest Articles

Implementing Custom Instrumentation for Application Performance Monitoring (APM) Using OpenTelemetry

Application Performance Monitoring (APM) has become crucial for businesses to ensure optimal software performance and user experience. As applications grow more complex and distributed, the need for comprehensive monitoring solutions has never been greater. OpenTelemetry has emerged as a powerful, vendor-neutral framework for instrumenting, generating, collecting, and exporting telemetry data. This article explores how to implement custom instrumentation using OpenTelemetry for effective APM.

Mobile Engineering
time
5
 min read

Implementing Custom Evaluation Metrics in LangChain for Measuring AI Agent Performance

As AI and language models continue to advance at breakneck speed, the need to accurately gauge AI agent performance has never been more critical. LangChain, a go-to framework for building language model applications, comes equipped with its own set of evaluation tools. However, these off-the-shelf solutions often fall short when dealing with the intricacies of specialized AI applications. This article dives into the world of custom evaluation metrics in LangChain, showing you how to craft bespoke measures that truly capture the essence of your AI agent's performance.

AI/ML
time
5
 min read

Enhancing Quality Control with AI: Smarter Defect Detection in Manufacturing

In today's competitive manufacturing landscape, quality control is paramount. Traditional methods often struggle to maintain optimal standards. However, the integration of Artificial Intelligence (AI) is revolutionizing this domain. This article delves into the transformative impact of AI on quality control in manufacturing, highlighting specific use cases and their underlying architectures.

AI/ML
time
5
 min read