SCD Type 2 avec Delta MERGE
Historisation à double action : clôturer la version courante et insérer la nouvelle, via l'astuce du mergeKey NULL pour les lignes changées.
Cas d'usage
Dimension client historisée : conserver chaque version avec ses dates de validité.
Prérequis
PySpark 3.x, delta-spark
Python
from pyspark.sql import functions as F
from delta.tables import DeltaTable
# 1 ligne "update" (clôture) + 1 ligne "insert" (nouvelle version)
staged = (
updates.selectExpr("customer_id AS mergeKey", "*")
.unionByName(
updates.join(current_open, "customer_id", "left_semi")
.selectExpr("NULL AS mergeKey", "*"),
allowMissingColumns=True)
)
(
DeltaTable.forName(spark, "dim.customers").alias("t")
.merge(staged.alias("s"), "t.customer_id = s.mergeKey AND t.is_current = true")
.whenMatchedUpdate(
condition="t.hash_diff <> s.hash_diff",
set={"is_current": "false", "end_date": "s.effective_date"})
.whenNotMatchedInsert(values={
"customer_id": "s.customer_id", "hash_diff": "s.hash_diff",
"start_date": "s.effective_date", "end_date": "NULL",
"is_current": "true"})
.execute()
)Résultat
>>> spark.table("dim.customers").filter("customer_id = 'C-10042'").show()
+-----------+---------+----------+----------+----------+
|customer_id|hash_diff|start_date| end_date|is_current|
+-----------+---------+----------+----------+----------+
| C-10042| a3f1c9..|2024-11-02|2026-06-09| false|
| C-10042| 7e02bd..|2026-06-09| null| true|
+-----------+---------+----------+----------+----------+PySparkDelta LakeSCD2Historisation