aggregate: جمعکردن یه array توی یه مقدار
fold بومی روی یه آرایه: جمع وزندار، الحاق شرطی — بدون explode/groupBy و در نتیجه بدون shuffle.
کاربرد
جمع کل یه سبد که بهصورت آرایهای از structها ذخیره شده، یا امتیاز ترکیبی هر ردیف.
پیشنیازها
PySpark 3.1+
Python
from pyspark.sql import functions as F
# items : array<struct<price:double, qty:int>>
df_total = df.withColumn(
"basket_total",
F.aggregate(
"items",
F.lit(0.0), # valeur initiale
lambda acc, it: acc + it["price"] * it["qty"],
),
)
# Variante avec finition (moyenne) : accumulateur struct (somme, compte)
df_avg = df.withColumn(
"avg_price",
F.aggregate(
"items",
F.struct(F.lit(0.0).alias("s"), F.lit(0).alias("n")),
lambda acc, it: F.struct((acc["s"] + it["price"]).alias("s"),
(acc["n"] + 1).alias("n")),
lambda acc: acc["s"] / acc["n"],
),
)نتیجه
>>> df_avg.select("order_id", "items", "basket_total", "avg_price") \
... .show(truncate=False)
+--------+----------------------+------------+---------+
|order_id|items |basket_total|avg_price|
+--------+----------------------+------------+---------+
|O-001 |[{12.5, 2}, {49.9, 1}]|74.9 |31.2 |
|O-002 |[{5.0, 4}] |20.0 |5.0 |
+--------+----------------------+------------+---------+PySparkaggregateArrayFold