Spark
Spark en conditions réelles
Un carnet de terrain d'ingénieur data : chaque recette PySpark répond à un piège réel de production — skew, small files, lineage explosif, NULL qui ne matchent pas, count() qui scanne 1,2 To — et se vérifie par une sortie console authentique (df.show, plan d'explain, métriques de MERGE). Du PySpark mesuré et instrumenté (gains x34, x88), orienté Delta Lake, fenêtrage temporel et qualité de données.
20 snippets phares
- Fenêtre temporelle réelle avec rangeBetween sur timestampAgréger sur les 24 dernières heures glissantes (et non les N dernières lignes) en convertissant le timestamp en secondes.
- Forward fill : propager la dernière valeur connuelast(ignorenulls=True) sur fenêtre non bornée à gauche pour combler les trous d'une série — le ffill de pandas en distribué.
- Sessionization : découper en sessions par gap d'inactivitéAttribution d'un identifiant de session quand l'écart entre deux événements dépasse 30 minutes — cumul d'un flag de rupture.
- Semi-join : filtrer par existence sans ramener de colonnesleft_semi filtre la table de gauche sur l'existence d'une clé à droite, sans dupliquer les lignes même si la droite contient des doublons.
- Jointure null-safe avec eqNullSafeL'égalité standard élimine les NULL des deux côtés (NULL == NULL est NULL) ; eqNullSafe les apparie, indispensable sur des clés composites nullable.
- Salting : casser le skew d'une jointureAjout d'un sel aléatoire côté table skewée et explosion côté référentiel pour répartir une clé hyper-dominante sur N partitions.
- Écrasement dynamique de partitions (backfill sûr)Le mode dynamic ne remplace que les partitions présentes dans le DataFrame écrit, au lieu de purger toute la table — le réglage qui sauve les backfills.
- Bucketing : pré-shuffler une table de jointure récurrenteTables bucketées sur la même clé et le même nombre de buckets : les jointures et agrégations suivantes évitent le shuffle.
- Remplacer une UDF Python par des fonctions nativesUne UDF Python sérialise chaque ligne vers un worker Python et aveugle Catalyst ; la même logique en fonctions natives reste optimisable et 10 à 100x plus rapide.
- aggregate : réduire un array en une valeurFold natif sur un tableau : somme pondérée, concaténation conditionnelle — sans explode/groupBy, donc sans shuffle.
- SCD Type 2 avec Delta MERGEHistorisation à double action : clôturer la version courante et insérer la nouvelle, via l'astuce du mergeKey NULL pour les lignes changées.
- checkpoint : tronquer un lineage devenu ingérableDans les pipelines itératifs, le plan logique enfle jusqu'à des StackOverflow ou des re-calculs explosifs ; le checkpoint coupe la généalogie.
- Lire un plan d'exécution : les 4 signaux à chercherexplain(mode='formatted') et les marqueurs qui comptent : Exchange (shuffle), type de jointure, PushedFilters et PartitionFilters.
- monotonically_increasing_id : unique oui, consécutif nonL'identifiant encode le numéro de partition dans ses bits hauts : les valeurs sautent par milliards entre partitions. row_number global est consécutif mais sérialise tout.
- Snapshots quotidiens → flux CDC (INSERT / UPDATE / DELETE)Un full_outer join entre les snapshots J et J-1, comparés par hash sha2 des colonnes suivies, classe chaque ligne en INSERT, UPDATE ou DELETE : un vrai flux CDC sans connecteur.
- Streaks : compter les jours de baisse consécutifsLe motif gaps-and-islands appliqué aux tendances : un cumul de « ruptures » segmente la série en groupes, et la taille des groupes de baisse révèle les produits en chute libre depuis N jours.
- Rétention par cohorte mensuelleChaque client est rattaché à sa cohorte (premier mois d'achat), l'âge en mois est pivoté, et les colonnes M1/M2/M3 expriment la rétention en pourcentage de la cohorte initiale.
- Pré-agréger avant de joindre : le shuffle divisé par 16 000Agréger 800 M de clics en 50 k lignes AVANT la jointure transforme un SortMergeJoin massif en BroadcastHashJoin : même résultat, shuffle quasi nul.
- Garde-fou anti-explosion : estimer une jointure avant de la payerLe nombre de lignes produites par une jointure se calcule à l'avance : somme des produits des cardinalités par clé. Un seuil transforme l'estimation en garde-fou bloquant.
- isEmpty au lieu de count() : le garde-fou qui coûtait 3 minutesTester l'existence de données avec count() scanne TOUT pour produire un nombre qu'on compare à zéro ; isEmpty (ou take(1)) s'arrête à la première ligne trouvée.