Spark

SCD Type 2 with Delta MERGE

Two-action history tracking: close out the current version and insert the new one, using the NULL mergeKey trick for changed rows.

Prerequisites

PySpark 3.x, delta-spark

Python
from pyspark.sql import functions as F
from delta.tables import DeltaTable

# 1 ligne "update" (clôture) + 1 ligne "insert" (nouvelle version)
staged = (
    updates.selectExpr("customer_id AS mergeKey", "*")
    .unionByName(
        updates.join(current_open, "customer_id", "left_semi")
               .selectExpr("NULL AS mergeKey", "*"),
        allowMissingColumns=True)
)

(
    DeltaTable.forName(spark, "dim.customers").alias("t")
    .merge(staged.alias("s"), "t.customer_id = s.mergeKey AND t.is_current = true")
    .whenMatchedUpdate(
        condition="t.hash_diff <> s.hash_diff",
        set={"is_current": "false", "end_date": "s.effective_date"})
    .whenNotMatchedInsert(values={
        "customer_id": "s.customer_id", "hash_diff": "s.hash_diff",
        "start_date": "s.effective_date", "end_date": "NULL",
        "is_current": "true"})
    .execute()
)

Result

>>> spark.table("dim.customers").filter("customer_id = 'C-10042'").show()
+-----------+---------+----------+----------+----------+
|customer_id|hash_diff|start_date|  end_date|is_current|
+-----------+---------+----------+----------+----------+
|    C-10042| a3f1c9..|2024-11-02|2026-06-09|     false|
|    C-10042| 7e02bd..|2026-06-09|      null|      true|
+-----------+---------+----------+----------+----------+
PySparkDelta LakeSCD2Historisation

Related snippets

Back to the Data Lab