Spark

Salting: شکستن skew توی یک join

سمت جدول skewدار یک salt تصادفی اضافه می‌کنی و سمت مرجع explode می‌کنی، تا یک کلید بیش‌ازحد غالب روی N پارتیشن پخش بشه.

کاربرد

اتصال جریانی که ۴۰٪ ردیف‌هاش یک کلید واحد دارن (مشتری ناشناس، مقدار پیش‌فرض).

پیش‌نیازها

PySpark 3.x

Python
from pyspark.sql import functions as F

SALT = 16

# Côté skewé : sel aléatoire 0..15
facts_salted = facts.withColumn(
    "salt", (F.rand(seed=42) * SALT).cast("int")
)

# Côté référentiel : dupliquer chaque ligne sur les 16 sels
dim_exploded = dim.withColumn(
    "salt", F.explode(F.array([F.lit(i) for i in range(SALT)]))
)

joined = facts_salted.join(dim_exploded, ["join_key", "salt"]).drop("salt")
# À tenter d'abord : AQE skew join (spark.sql.adaptive.skewJoin.enabled)
# qui règle la plupart des cas sans salting manuel.

نتیجه

-- Avant salting : Stage 4, 199/200 tasks en 12 s, 1 task (cle ANON) : 18 min
-- Apres salting (SALT=16) : 200/200 tasks en 84 s, max task 9 s

>>> facts_salted.groupBy("salt").count().show(4)
+----+------+
|salt| count|
+----+------+
|   0|524873|
|   1|525101|
|   2|524610|
|   3|525488|
+----+------+
PySparkSkewSaltingJoin

اسنیپت‌های مرتبط

بازگشت به آزمایشگاه داده