Spark

Pré-agréger avant de joindre : le shuffle divisé par 16 000

Agréger 800 M de clics en 50 k lignes AVANT la jointure transforme un SortMergeJoin massif en BroadcastHashJoin : même résultat, shuffle quasi nul.

Cas d'usage

Réécrire un job de reporting qui joint la table de faits brute avant d'agréger.

Prérequis

PySpark 3.x

Python
from pyspark.sql import functions as F

# AVANT : joindre 800 M de clics PUIS agréger -> shuffle des 800 M de lignes
lent = (clics.join(campagnes, "campaign_id")
             .groupBy("annonceur").agg(F.count("*").alias("clics")))

# APRÈS : agréger d'abord (50 k lignes), joindre ensuite en broadcast
rapide = (
    clics.groupBy("campaign_id").agg(F.count("*").alias("clics"))
         .join(F.broadcast(campagnes), "campaign_id")
         .groupBy("annonceur").agg(F.sum("clics").alias("clics"))
)
rapide.explain()

Résultat

== Physical Plan ==
AdaptiveSparkPlan
+- HashAggregate(keys=[annonceur], functions=[sum(clics)])
   +- Project [annonceur, clics]
      +- BroadcastHashJoin [campaign_id], [campaign_id], Inner
         :- HashAggregate(keys=[campaign_id], functions=[count(1)])
         :     <- 800 M lignes réduites à 50 k AVANT la jointure
         +- BroadcastExchange

Durée mesurée : 14,2 min -> 1,8 min
Le shuffle porte sur 50 000 lignes au lieu de 800 000 000
PySparkShuffleBroadcastOptimisation

Snippets liés

Retour au Data Lab