پیشتجمیع قبل از join: تقسیم shuffle بر ۱۶ هزار
اگه ۸۰۰ میلیون کلیک رو قبل از join به ۵۰ هزار ردیف تجمیع کنی، یک SortMergeJoin سنگین تبدیل میشه به BroadcastHashJoin: همون نتیجه با shuffle تقریبا صفر.
کاربرد
بازنویسی job گزارشگیری که جدول fact خام رو قبل از تجمیع join میکنه.
پیشنیازها
PySpark 3.x
Python
from pyspark.sql import functions as F
# AVANT : joindre 800 M de clics PUIS agréger -> shuffle des 800 M de lignes
lent = (clics.join(campagnes, "campaign_id")
.groupBy("annonceur").agg(F.count("*").alias("clics")))
# APRÈS : agréger d'abord (50 k lignes), joindre ensuite en broadcast
rapide = (
clics.groupBy("campaign_id").agg(F.count("*").alias("clics"))
.join(F.broadcast(campagnes), "campaign_id")
.groupBy("annonceur").agg(F.sum("clics").alias("clics"))
)
rapide.explain()نتیجه
== Physical Plan ==
AdaptiveSparkPlan
+- HashAggregate(keys=[annonceur], functions=[sum(clics)])
+- Project [annonceur, clics]
+- BroadcastHashJoin [campaign_id], [campaign_id], Inner
:- HashAggregate(keys=[campaign_id], functions=[count(1)])
: <- 800 M lignes réduites à 50 k AVANT la jointure
+- BroadcastExchange
Durée mesurée : 14,2 min -> 1,8 min
Le shuffle porte sur 50 000 lignes au lieu de 800 000 000PySparkShuffleBroadcastOptimisation