Book Your Meeting

Implementing incremental data processing using Databricks Delta Lake's change data feed

Discover how to implement efficient incremental data processing with Databricks Delta Lake's Change Data Feed. This comprehensive guide walks through enabling CDF, reading change data, and building robust processing pipelines that only handle modified data. Learn advanced patterns for schema evolution, large data volumes, and exactly-once processing, plus real-world applications including real-time analytics dashboards and data quality monitoring. Perfect for data engineers looking to optimize resource usage and processing time.

GraphQL has a role beyond API Query Language- being the backbone of application Integration
background Coditation

Implementing incremental data processing using Databricks Delta Lake's change data feed

# Implementing Incremental Data Processing Using Databricks Delta Lake's Change Data Feed

Incremental data processing represents a fundamental shift in how organizations handle large-scale data operations. Rather than repeatedly processing entire datasets, incremental approaches focus only on what's changed since the last processing cycle. This methodology significantly reduces computational resources, processing time, and ultimately costs.

Databricks Delta Lake introduced Change Data Feed (CDF) as a powerful feature to enable efficient incremental data processing. This capability tracks row-level changes between versions of a Delta table, making it possible to capture inserts, updates, and deletes for downstream processing.

This article explores how to implement robust incremental data processing pipelines using Delta Lake's Change Data Feed, complete with practical code examples and architectural considerations.

## Understanding Delta Lake's Change Data Feed

Change Data Feed provides a systematic way to track and access changes made to a Delta table. When enabled, CDF maintains a log of all changes, including:

- Inserted rows
- Updated rows (both before and after states)
- Deleted rows

Each change record includes metadata about the operation type, allowing downstream processes to understand exactly what changed and how.

### How CDF Works

When CDF is enabled on a Delta table, Delta Lake automatically tracks changes at the file level. As modifications occur, Delta Lake records:

1. The operation type (INSERT, UPDATE, DELETE)
2. The timestamp of the change
3. The complete row data (before and after states for updates)
4. The version of the Delta table when the change occurred

These changes are stored alongside the Delta table's transaction log, making them durable and consistent with the table's state.

## Enabling Change Data Feed

Before implementing incremental processing, you need to enable CDF on your Delta tables. There are two approaches:

### 1. Enable CDF on a New Table

```python
# Create a new Delta table with CDF enabled
spark.sql("""
CREATE TABLE customer_data (
  customer_id INT,
  name STRING,
  email STRING,
  signup_date DATE,
  last_purchase_date DATE
)
USING DELTA
TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

2. Enable CDF on an Existing Table

# Enable CDF on an existing Delta table
spark.sql("ALTER TABLE customer_data SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

Once enabled, Delta Lake begins tracking changes from that point forward. Note that historical changes before enabling CDF won't be available.

Reading Change Data

After enabling CDF, you can access the change data using the readChangeData method or through SQL:

Using DataFrame API

# Read changes between versions 5 and 10
changes_df = spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 5) \
  .option("endingVersion", 10) \
  .table("customer_data")

# Alternatively, read changes from a timestamp
changes_df = spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2023-01-01T00:00:00.000Z") \
  .option("endingTimestamp", "2023-01-02T00:00:00.000Z") \
  .table("customer_data")

Using SQL

SELECT * FROM table_changes('customer_data', 5, 10)

-- Or using timestamps
SELECT * FROM table_changes('customer_data', 
  '2023-01-01T00:00:00.000Z', 
  '2023-01-02T00:00:00.000Z')

The resulting DataFrame includes special columns:

  • _change_type: Indicates the operation (INSERT, UPDATE_PREIMAGE, UPDATE_POSTIMAGE, DELETE)
  • _commit_version: The version when the change occurred
  • _commit_timestamp: The timestamp when the change occurred

Building an Incremental Processing Pipeline

Now let's implement a complete incremental processing pipeline using CDF. We'll build a system that:

  1. Tracks changes in a source table
  2. Processes those changes incrementally
  3. Updates a target table with the processed results
  4. Maintains state for the next incremental run

Step 1: Set Up Source and Target Tables

# Create source table with CDF enabled
spark.sql("""
CREATE TABLE sales_source (
  order_id STRING,
  customer_id STRING,
  product_id STRING,
  quantity INT,
  price DOUBLE,
  order_date TIMESTAMP,
  status STRING
)
USING DELTA
TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Create target table for aggregated data
spark.sql("""
CREATE TABLE customer_sales_summary (
  customer_id STRING,
  total_orders INT,
  total_spend DOUBLE,
  last_order_date TIMESTAMP,
  last_updated TIMESTAMP
)
USING DELTA
""")

# Create a state table to track processing progress
spark.sql("""
CREATE TABLE processing_state (
  table_name STRING,
  last_processed_version LONG,
  last_processed_timestamp TIMESTAMP,
  last_run_timestamp TIMESTAMP
)
USING DELTA
""")

Step 2: Initialize State Tracking

# Initialize state tracking for our pipeline
from pyspark.sql.functions import current_timestamp, lit

# Check if we already have state information
state_df = spark.table("processing_state").filter("table_name = 'sales_source'")

if state_df.count() == 0:
    # Initialize with the current version of the source table
    current_version = spark.sql("DESCRIBE HISTORY sales_source LIMIT 1").select("version").collect()[0][0]

    # Create initial state record
    initial_state = spark.createDataFrame([
        ("sales_source", current_version, current_timestamp(), current_timestamp())
    ], ["table_name", "last_processed_version", "last_processed_timestamp", "last_run_timestamp"])

    initial_state.write.format("delta").mode("append").saveAsTable("processing_state")

    print(f"Initialized state tracking at version {current_version}")
else:
    print("State tracking already initialized")

Step 3: Implement the Incremental Processing Function

def process_incremental_changes():
    """
    Process changes from the sales_source table and update the customer_sales_summary table
    """
    from pyspark.sql.functions import col, when, max, sum, count, current_timestamp
    from delta.tables import DeltaTable

    # Get the last processed version
    state = spark.table("processing_state").filter("table_name = 'sales_source'").collect()[0]
    last_version = state.last_processed_version

    # Get the current version of the source table
    current_version = spark.sql("DESCRIBE HISTORY sales_source LIMIT 1").select("version").collect()[0][0]

    if current_version <= last_version:
        print(f"No new changes to process. Current version: {current_version}, Last processed: {last_version}")
        return

    print(f"Processing changes from version {last_version + 1} to {current_version}")

    # Read the change data
    changes_df = spark.read.format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingVersion", last_version + 1) \
        .option("endingVersion", current_version) \
        .table("sales_source")

    # Filter out UPDATE_PREIMAGE records as we only want the final state
    filtered_changes = changes_df.filter(
        "(_change_type = 'INSERT' OR _change_type = 'UPDATE_POSTIMAGE' OR _change_type = 'DELETE')"
    )

    # Add a weight column: 1 for inserts/updates, -1 for deletes
    weighted_changes = filtered_changes.withColumn(
        "weight", 
        when(col("_change_type") == "DELETE", -1).otherwise(1)
    )

    # Calculate the impact of these changes on customer metrics
    customer_changes = weighted_changes.groupBy("customer_id").agg(
        sum("weight").alias("order_count_change"),
        sum(col("price") * col("quantity") * col("weight")).alias("spend_change"),
        max(when(col("weight") > 0, col("order_date"))).alias("max_order_date")
    )

    # Get the target table as a DeltaTable
    customer_summary_delta = DeltaTable.forName(spark, "customer_sales_summary")

    # Perform a merge operation to update the summary
    customer_summary_delta.alias("target").merge(
        customer_changes.alias("source"),
        "target.customer_id = source.customer_id"
    ).whenMatched().updateExpr(
        {
            "total_orders": "target.total_orders + source.order_count_change",
            "total_spend": "target.total_spend + source.spend_change",
            "last_order_date": "CASE WHEN source.max_order_date IS NOT NULL AND " +
                              "(target.last_order_date IS NULL OR source.max_order_date > target.last_order_date) " +
                              "THEN source.max_order_date ELSE target.last_order_date END",
            "last_updated": "current_timestamp()"
        }
    ).whenNotMatched().insertExpr(
        {
            "customer_id": "source.customer_id",
            "total_orders": "source.order_count_change",
            "total_spend": "source.spend_change",
            "last_order_date": "source.max_order_date",
            "last_updated": "current_timestamp()"
        }
    ).execute()

    # Update the processing state
    spark.createDataFrame([
        ("sales_source", current_version, current_timestamp(), current_timestamp())
    ], ["table_name", "last_processed_version", "last_processed_timestamp", "last_run_timestamp"]) \
    .write \
    .format("delta") \
    .mode("overwrite") \
    .option("replaceWhere", "table_name = 'sales_source'") \
    .saveAsTable("processing_state")

    print(f"Successfully processed changes up to version {current_version}")

    # Return stats about the processed changes
    return {
        "records_processed": changes_df.count(),
        "from_version": last_version + 1,
        "to_version": current_version,
        "customers_affected": customer_changes.count()
    }

Step 4: Schedule the Incremental Processing

In a production environment, you would schedule this function to run periodically. Here's how you might set it up in a Databricks job:

# This would be in a separate notebook that's scheduled to run
from datetime import datetime

# Log the start of the job
start_time = datetime.now()
print(f"Starting incremental processing job at {start_time}")

# Run the incremental processing
try:
    stats = process_incremental_changes()
    if stats:
        print(f"Processed {stats['records_processed']} records affecting {stats['customers_affected']} customers")
    else:
        print("No changes to process")
except Exception as e:
    print(f"Error processing changes: {str(e)}")
    raise e

# Log the end of the job
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print(f"Completed incremental processing job at {end_time} (duration: {duration} seconds)")

Advanced Patterns and Optimizations

Let's explore some advanced patterns and optimizations for working with Change Data Feed.

Handling Schema Evolution

Delta Lake supports schema evolution, but this can complicate change data processing. Here's how to handle it:

def process_with_schema_evolution():
    from pyspark.sql.functions import col, struct

    # Get the current schema of the target table
    target_schema = spark.table("customer_sales_summary").schema
    target_columns = [field.name for field in target_schema.fields]

    # Read change data
    changes_df = spark.read.format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingVersion", last_processed_version + 1) \
        .table("sales_source")

    # Select only columns that exist in both schemas to avoid errors
    source_columns = [col for col in changes_df.columns if col not in ["_change_type", "_commit_version", "_commit_timestamp"]]
    common_columns = [col for col in source_columns if col in target_columns]

    # Process only with common columns
    filtered_changes = changes_df.select(
        "_change_type", 
        "_commit_version",
        *[col(c) for c in common_columns]
    )

    # Continue processing as before...

Optimizing for Large Change Volumes

When dealing with large volumes of changes, you can optimize processing:

def process_large_change_volumes():
    # Process changes in smaller batches by version ranges
    current_version = spark.sql("DESCRIBE HISTORY sales_source LIMIT 1").select("version").collect()[0][0]
    last_processed_version = get_last_processed_version()

    # Define batch size (number of versions to process at once)
    batch_size = 5

    for start_version in range(last_processed_version + 1, current_version + 1, batch_size):
        end_version = min(start_version + batch_size - 1, current_version)

        print(f"Processing batch from version {start_version} to {end_version}")

        # Read and process this batch of changes
        changes_df = spark.read.format("delta") \
            .option("readChangeFeed", "true") \
            .option("startingVersion", start_version) \
            .option("endingVersion", end_version) \
            .table("sales_source")

        # Process this batch...

        # Update the processing state after each batch
        update_processing_state(end_version)

Implementing Exactly-Once Processing

To ensure exactly-once semantics, you can use transaction IDs and idempotent operations:

from pyspark.sql.functions import sha2, concat_ws

def implement_exactly_once_processing():
    # Generate a deterministic transaction ID for each change record
    changes_df = spark.read.format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingVersion", last_processed_version + 1) \
        .table("sales_source")

    # Create a deterministic ID for each change
    changes_with_id = changes_df.withColumn(
        "change_id", 
        sha2(concat_ws("||", 
                      col("_change_type"),
                      col("_commit_version"),
                      col("order_id")), 
             256)
    )

    # Check which changes we've already processed
    processed_changes = spark.table("processed_change_ids")

    # Filter out already processed changes
    new_changes = changes_with_id.join(
        processed_changes,
        changes_with_id.change_id == processed_changes.change_id,
        "left_anti"  # Only keep changes that don't exist in processed_changes
    )

    # Process the new changes...

    # Record the processed change IDs
    new_changes.select("change_id").write.mode("append").saveAsTable("processed_change_ids")

Real-World Use Cases

Let's examine some practical applications of Delta Lake's Change Data Feed:

1. Real-Time Analytics Dashboard

def update_analytics_dashboard():
    """Update real-time analytics based on recent changes"""

    # Get changes in the last 15 minutes
    from datetime import datetime, timedelta

    fifteen_min_ago = (datetime.now() - timedelta(minutes=15)).strftime("%Y-%m-%dT%H:%M:%S.000Z")

    recent_changes = spark.read.format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingTimestamp", fifteen_min_ago) \
        .table("sales_source")

    # Calculate real-time metrics
    from pyspark.sql.functions import count, sum, avg, window

    metrics = recent_changes.filter("_change_type IN ('INSERT', 'UPDATE_POSTIMAGE')") \
        .withWatermark("order_date", "5 minutes") \
        .groupBy(window("order_date", "5 minutes")) \
        .agg(
            count("order_id").alias("order_count"),
            sum("price").alias("revenue"),
            avg("quantity").alias("avg_items_per_order")
        )

    # Write to a dashboard table
    metrics.write.format("delta").mode("append").saveAsTable("real_time_dashboard")

2. Data Quality Monitoring

def monitor_data_quality():
    """Monitor data quality issues using change data feed"""

    from pyspark.sql.functions import col, when, count

    # Get recent changes
    changes = spark.read.format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingVersion", last_processed_version + 1) \
        .table("sales_source")

    # Check for potential data quality issues
    quality_issues = changes.select(
        "_change_type",
        "_commit_version",
        "order_id",
        "customer_id",
        "price",
        "quantity"
    ).withColumn(
        "issue_type",
        when(col("price") < 0, "negative_price")
        .when(col("quantity") <= 0, "invalid_quantity")
        .when(col("customer_id").isNull(), "missing_customer_id")
        .otherwise("no_issue")
    ).filter("issue_type != 'no_issue'")

    # Log issues for investigation
    if quality_issues.count() > 0:
        quality_issues.write.mode("append").saveAsTable("data_quality_issues")

        # Alert if there are too many issues
        issue_count = quality_issues.count()
        if issue_count > 100:
            send_alert(f"High number of data quality issues detected: {issue_count}")

3. Slowly Changing Dimension (SCD) Type 2 Implementation

def implement_scd_type2():
    """Implement SCD Type 2 for customer dimension using change data feed"""

    from pyspark.sql.functions import current_timestamp, lit
    from delta.tables import DeltaTable

    # Get customer changes
    customer_changes = spark.read.format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingVersion", last_processed_version + 1) \
        .table("customer_source")

    # Filter to relevant changes (only updates and inserts)
    relevant_changes = customer_changes.filter(
        "_change_type IN ('INSERT', 'UPDATE_POSTIMAGE')"
    ).select(
        "customer_id",
        "name",
        "email",
        "address",
        "phone",
        "_commit_timestamp"
    )

    # Get the customer dimension table
    customer_dim = DeltaTable.forName(spark, "customer_dimension")

    # Perform SCD Type 2 merge
    customer_dim.alias("target").merge(
        relevant_changes.alias("source"),
        "target.customer_id = source.customer_id AND target.is_current = true"
    ).whenMatched(
        # Only update if there's an actual change in the tracked columns
        """
        target.name != source.name OR 
        target.email != source.email OR 
        target.address != source.address OR 
        target.phone != source.phone
        """
    ).updateExpr(
        {
            # Close the current record
            "is_current": "false",
            "end_date": "source._commit_timestamp",
            "last_updated": "current_timestamp()"
        }
    ).whenNotMatched().insertExpr(
        {
            "customer_id": "source.customer_id",
            "name": "source.name",
            "email": "source.email",
            "address": "source.address",
            "phone": "source.phone",
            "start_date": "source._commit_timestamp",
            "end_date": "null",
            "is_current": "true",
            "last_updated": "current_timestamp()"
        }
    ).execute()

    # Insert new versions of updated records
    updated_customers = spark.sql("""
        SELECT 
            source.customer_id,
            source.name,
            source.email,
            source.address,
            source.phone,
            source._commit_timestamp as start_date,
            NULL as end_date,
            true as is_current,
            current_timestamp() as last_updated
        FROM customer_dimension target
        JOIN (
            SELECT * FROM delta.`/path/to/customer_changes`
            WHERE _change_type = 'UPDATE_POSTIMAGE'
        ) source
        ON target.customer_id = source.customer_id
        WHERE target.is_current = false
        AND target.end_date = source._commit_timestamp
    """)

    updated_customers.write.format("delta").mode("append").saveAsTable("customer_dimension")

Performance Considerations and Best Practices

When implementing incremental processing with Change Data Feed, consider these best practices:

  1. Retention Period: Configure an appropriate retention period for change data:
ALTER TABLE sales_source SET TBLPROPERTIES (delta.changeDataFeed.retention = '7 days')
  1. Partition Pruning: Ensure your Delta tables are properly partitioned to optimize change data reads:
# Create a partitioned table with CDF enabled
spark.sql("""
CREATE TABLE sales_data (
  order_id STRING,
  customer_id STRING,
  product_id STRING,
  quantity INT,
  price DOUBLE,
  order_date TIMESTAMP,
  status STRING
)
USING DELTA
PARTITIONED BY (date_trunc('day', order_date))
TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
  1. Batch Size Control: Process changes in manageable batches to avoid memory issues:
def process_in_batches(table_name, batch_size=100000):
    """Process changes in controlled batch sizes"""

    # Get total number of changes
    changes_count = spark.read.format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingVersion", last_processed_version + 1) \
        .table(table_name) \
        .count()

    # Calculate number of batches
    num_batches = (changes_count + batch_size - 1) // batch_size

    for i in range(num_batches):
        # Process each batch
        batch_df = spark.read.format("delta") \
            .option("readChangeFeed", "true") \
            .option("startingVersion", last_processed_version + 1) \
            .table(table_name) \
            .limit(batch_size)

        # Process this batch...

        # Update the last processed version if needed
  1. Monitoring and Alerting: Implement monitoring for your incremental pipelines:
def monitor_cdf_pipeline():
    """Monitor the CDF pipeline for issues"""

    # Check for processing lag
    state = spark.table("processing_state").filter("table_name = 'sales_source'").collect()[0]
    last_processed = state.last_processed_version

    current_version = spark.sql("DESCRIBE HISTORY sales_source LIMIT 1").select("version").collect()[0][0]

    version_lag = current_version - last_processed

    # Alert if lag is too high
    if version_lag > 10:
        send_alert(f"CDF processing lag is high: {version_lag} versions behind")

    # Check for processing errors
    errors = spark.table("pipeline_errors").filter("timestamp > current_timestamp() - INTERVAL 1 DAY")

    if errors.count() > 0:
        send_alert(f"CDF pipeline has {errors.count()} errors in the last 24 hours")

Conclusion

Delta Lake's Change Data Feed provides a robust foundation for implementing incremental data processing pipelines. By focusing only on what's changed, organizations can significantly reduce processing time and resource consumption while maintaining data accuracy.

The patterns and code examples in this article demonstrate how to:

  1. Enable and configure Change Data Feed
  2. Read and process change data incrementally
  3. Update target tables efficiently
  4. Implement advanced patterns like exactly-once processing and schema evolution handling
  5. Apply CDF to real-world use cases like real-time analytics and SCD Type 2 implementations

By adopting these practices, data engineers can build resilient, efficient, and scalable data pipelines that process only what's necessary, when it's necessary. ```

Want to receive update about our upcoming podcast?

Thanks for joining our newsletter.
Oops! Something went wrong.

Latest Articles

Implementing custom windowing and triggering mechanisms in Apache Flink for advanced event aggregation

Dive into advanced Apache Flink stream processing with this comprehensive guide to custom windowing and triggering mechanisms. Learn how to implement volume-based windows, pattern-based triggers, and dynamic session windows that adapt to user behavior. The article provides practical Java code examples, performance optimization tips, and real-world implementation strategies for complex event processing scenarios beyond Flink's built-in capabilities.

time
15
 min read

Implementing feature flags for controlled rollouts and experimentation in production

Discover how feature flags can revolutionize your software deployment strategy in this comprehensive guide. Learn to implement everything from basic toggles to sophisticated experimentation platforms with practical code examples in Java, JavaScript, and Node.js. The post covers essential implementation patterns, best practices for flag management, and real-world architectures that have helped companies like Spotify reduce deployment risks by 80%. Whether you're looking to enable controlled rollouts, A/B testing, or zero-downtime migrations, this guide provides the technical foundation you need to build robust feature flagging systems.

time
12
 min read

Implementing custom embeddings in LlamaIndex for domain-specific information retrieval

Discover how to dramatically improve search relevance in specialized domains by implementing custom embeddings in LlamaIndex. This comprehensive guide walks through four practical approaches—from fine-tuning existing models to creating knowledge-enhanced embeddings—with real-world code examples. Learn how domain-specific embeddings can boost precision by 30-45% compared to general-purpose models, as demonstrated in a legal tech case study where search precision jumped from 67% to 89%.

time
15
 min read