Spark

پیش‌تجمیع قبل از join: تقسیم shuffle بر ۱۶ هزار

اگه ۸۰۰ میلیون کلیک رو قبل از join به ۵۰ هزار ردیف تجمیع کنی، یک SortMergeJoin سنگین تبدیل میشه به BroadcastHashJoin: همون نتیجه با shuffle تقریبا صفر.

کاربرد

بازنویسی job گزارش‌گیری که جدول fact خام رو قبل از تجمیع join می‌کنه.

پیش‌نیازها

PySpark 3.x

Python
from pyspark.sql import functions as F

# AVANT : joindre 800 M de clics PUIS agréger -> shuffle des 800 M de lignes
lent = (clics.join(campagnes, "campaign_id")
             .groupBy("annonceur").agg(F.count("*").alias("clics")))

# APRÈS : agréger d'abord (50 k lignes), joindre ensuite en broadcast
rapide = (
    clics.groupBy("campaign_id").agg(F.count("*").alias("clics"))
         .join(F.broadcast(campagnes), "campaign_id")
         .groupBy("annonceur").agg(F.sum("clics").alias("clics"))
)
rapide.explain()

نتیجه

== Physical Plan ==
AdaptiveSparkPlan
+- HashAggregate(keys=[annonceur], functions=[sum(clics)])
   +- Project [annonceur, clics]
      +- BroadcastHashJoin [campaign_id], [campaign_id], Inner
         :- HashAggregate(keys=[campaign_id], functions=[count(1)])
         :     <- 800 M lignes réduites à 50 k AVANT la jointure
         +- BroadcastExchange

Durée mesurée : 14,2 min -> 1,8 min
Le shuffle porte sur 50 000 lignes au lieu de 800 000 000
PySparkShuffleBroadcastOptimisation

اسنیپت‌های مرتبط

بازگشت به آزمایشگاه داده