Snapshots diarios → flujo CDC (INSERT / UPDATE / DELETE)
Un full_outer join entre los snapshots del día D y D-1, comparados mediante un hash sha2 de las columnas rastreadas, clasifica cada fila como INSERT, UPDATE o DELETE: un flujo CDC real, sin conector.
Requisitos
PySpark 3.x
Python
from pyspark.sql import functions as F
cles = ["customer_id"]
suivi = [c for c in snap_today.columns if c not in cles]
def avec_hash(d):
return d.withColumn("h", F.sha2(F.concat_ws("||", *suivi), 256))
auj, hier = avec_hash(snap_today).alias("n"), avec_hash(snap_yesterday).alias("o")
cdc = (
auj.join(hier, cles, "full_outer")
.withColumn("op",
F.when(F.col("o.h").isNull(), "INSERT")
.when(F.col("n.h").isNull(), "DELETE")
.when(F.col("n.h") != F.col("o.h"), "UPDATE"))
.filter("op IS NOT NULL")
)
cdc.groupBy("op").count().orderBy("op").show()Resultado
+------+-----+ | op|count| +------+-----+ |DELETE| 118| |INSERT| 2204| |UPDATE| 9817| +------+-----+ Snapshot J : 1 204 882 lignes | J-1 : 1 202 796 12 139 changements détectés (1.0 %) — les lignes inchangées ne transitent plus
PySparkCDCsha2full_outerSnapshot