از snapshotهای روزانه تا جریان CDC (شامل INSERT / UPDATE / DELETE)
یه full_outer join بین snapshotهای امروز و دیروز، با مقایسهی hash از نوع sha2 روی ستونهای پایششده، هر ردیفو INSERT یا UPDATE یا DELETE برچسب میزنه: یه جریان CDC واقعی، بدون کانکتور.
کاربرد
تغذیهی تاریخچهی تغییرات وقتی منبع فقط استخراج کامل تحویل میده.
پیشنیازها
PySpark 3.x
Python
from pyspark.sql import functions as F
cles = ["customer_id"]
suivi = [c for c in snap_today.columns if c not in cles]
def avec_hash(d):
return d.withColumn("h", F.sha2(F.concat_ws("||", *suivi), 256))
auj, hier = avec_hash(snap_today).alias("n"), avec_hash(snap_yesterday).alias("o")
cdc = (
auj.join(hier, cles, "full_outer")
.withColumn("op",
F.when(F.col("o.h").isNull(), "INSERT")
.when(F.col("n.h").isNull(), "DELETE")
.when(F.col("n.h") != F.col("o.h"), "UPDATE"))
.filter("op IS NOT NULL")
)
cdc.groupBy("op").count().orderBy("op").show()نتیجه
+------+-----+ | op|count| +------+-----+ |DELETE| 118| |INSERT| 2204| |UPDATE| 9817| +------+-----+ Snapshot J : 1 204 882 lignes | J-1 : 1 202 796 12 139 changements détectés (1.0 %) — les lignes inchangées ne transitent plus
PySparkCDCsha2full_outerSnapshot