Post

Building a simple CDC ingestion pattern

In this article let us see a simple CDC ingestion pattern that works well when you want to bring source system changes into a data lake or warehouse without building something too fancy on day one. CDC is useful because full reloads become slow and expensive once the tables grow, and in most projects we only need to move the changed records and apply them correctly at the destination.

For our use case, let us assume the source system writes change events into files in object storage. Each record has a business key, an operation type, and a last updated timestamp. We will land the files into a bronze layer, standardize them in silver, and then merge them into a curated gold table. This is not the only pattern, but it is one of the easier ones to reason about and support.

What do we need from the source?

Before building the pipeline, make sure the source sends enough metadata. At minimum I usually look for these columns:

  • customer_id or another stable business key
  • op to tell whether the row is an insert, update, or delete
  • updated_at from the source system
  • a batch id, file name, or extract timestamp for traceability

A sample CDC file could look like this:

1
2
3
{"customer_id":101,"name":"John","city":"Melbourne","op":"I","updated_at":"2025-02-24T10:00:00Z"}
{"customer_id":101,"name":"John","city":"Sydney","op":"U","updated_at":"2025-02-24T10:05:00Z"}
{"customer_id":102,"name":"Mary","city":"Perth","op":"D","updated_at":"2025-02-24T10:06:00Z"}

This is enough for a basic pipeline. If your source can also send transaction ids or sequence numbers then even better, because ordering becomes safer.

A simple layer-by-layer approach

I generally keep CDC pipelines boring. The more custom logic we add early, the more pain we create later. A small pattern like below is easier to maintain.

LayerPurposeWhat happens here
BronzeRaw landingStore files as received, add ingestion metadata
SilverClean CDC streamCast data types, standardize op codes, remove obvious bad rows
GoldCurrent state tableApply merge logic so the table reflects latest state

Bronze gives us replayability. Silver gives us a clean and predictable stream of changes. Gold is what downstream reports or applications usually read.

Step 1: Land the raw files

In bronze I prefer to keep the raw records almost untouched. We can enrich them with a few operational columns such as ingested_at, source_file, and load_date. For example, in Spark:

1
2
3
4
5
6
7
8
from pyspark.sql import functions as F

bronze_df = (spark.read.json("s3://demo-raw/customer_cdc/2025-02-24/*.json")
  .withColumn("ingested_at", F.current_timestamp())
  .withColumn("source_file", F.input_file_name())
  .withColumn("load_date", F.to_date("ingested_at")))

bronze_df.write.format("delta").mode("append").saveAsTable("bronze.customer_cdc_raw")

For our use case, this table is mostly for audit and replay. If something breaks in silver or gold, we can start again from bronze instead of asking the source team to resend data.

Step 2: Standardize the CDC records

The silver layer is where I do the minimum cleanup needed for reliable downstream processing. This includes:

  • converting timestamps into a consistent type
  • normalizing operation codes like INSERT, I, ins into one format
  • filtering records with null business keys
  • deduplicating multiple events for the same key inside one micro batch

A common pattern is to keep only the latest event per key within the batch before merging. That avoids unnecessary churn.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
with ranked_changes as (
    select
        customer_id,
        name,
        city,
        upper(op) as op,
        cast(updated_at as timestamp) as updated_at,
        ingested_at,
        row_number() over (
            partition by customer_id
            order by updated_at desc, ingested_at desc
        ) as rn
    from bronze.customer_cdc_raw
    where customer_id is not null
)
select
    customer_id,
    name,
    city,
    case
        when op in ('INSERT', 'I') then 'I'
        when op in ('UPDATE', 'U') then 'U'
        when op in ('DELETE', 'D') then 'D'
        else 'UNKNOWN'
    end as op,
    updated_at
from ranked_changes
where rn = 1
  and op <> 'UNKNOWN';

Some teams try to deduplicate across all history at this stage. I usually avoid that in a simple design because it can become expensive. Batch-level dedupe plus a correct merge at gold is enough for many projects.

Step 3: Merge into the current-state table

Now we apply the CDC logic to the gold table. If your platform supports MERGE, this becomes straightforward. Below is the shape of the SQL I normally use:

1
2
3
4
5
6
7
8
9
10
11
12
merge into gold.customer_current as tgt
using silver.customer_cdc_latest as src
on tgt.customer_id = src.customer_id
when matched and src.op = 'D' then delete
when matched and src.op in ('U', 'I') and src.updated_at >= tgt.updated_at then
  update set
    tgt.name = src.name,
    tgt.city = src.city,
    tgt.updated_at = src.updated_at
when not matched and src.op in ('I', 'U') then
  insert (customer_id, name, city, updated_at)
  values (src.customer_id, src.name, src.city, src.updated_at);

The important part here is the timestamp check during update. Without that guard, a late arriving older event could overwrite a newer row and silently corrupt the current state. This is one of those small details that matters a lot in CDC pipelines.

What should the orchestration look like?

For a simple demo, a scheduled job is enough:

  1. Detect new CDC files
  2. Load them into bronze
  3. Build or refresh the silver change set
  4. Merge into gold
  5. Record audit counts

The audit counts are worth keeping even in a basic solution. I like to store batch id, source file count, inserted count, updated count, deleted count, rejected count, and job status. When someone asks why customer 102 disappeared, this table is where I first look.

Things to be careful about

CDC sounds easy until edge cases show up. A few things can break the pattern if we ignore them:

  • Out of order delivery: older events may arrive after newer ones. Use source timestamps or sequence numbers in merge conditions.
  • Duplicate files: retried extracts can send the same data twice. Keep source file names or batch ids so you can detect reprocessing.
  • Delete handling: some sources send hard deletes, others send soft delete flags. The merge logic should match that behavior.
  • Schema changes: adding a new column is manageable, but column renames from the source can create confusion if not coordinated.
  • Bad keys: if the business key changes in the source, CDC logic can create duplicate current-state records.

In practice, I have found that delete events and late arriving updates cause most of the debugging time. So it is worth writing test cases for both from the beginning.

What would change in production?

For a real production use case, I would keep the overall pattern the same but add more controls around it:

  • checkpointing or watermarking so we know exactly what was processed
  • stronger idempotency checks at file or batch level
  • data quality rules for null keys, invalid operations, and timestamp issues
  • alerting when deletes suddenly spike or when no CDC files arrive
  • partitioning and compaction strategy if the tables become large

If the source system provides database log based CDC from tools like Debezium or native connectors, we can also preserve the event stream and build both current-state and history tables from the same feed. That is cleaner than only keeping the final merged table, but it is also a slightly bigger design.

Final thoughts

This pattern is simple, but that is exactly why I like it. Land the raw CDC events, clean them just enough, and merge them carefully into the target table. You get a pipeline that is easy to explain, easy to replay, and good enough for many data platforms before you move into more advanced streaming designs.

This post is licensed under CC BY 4.0 by the author.