aggregate: reduce an array to a single value
A native fold over an array: weighted sum, conditional concatenation — without explode/groupBy, so no shuffle.
Prerequisites
PySpark 3.1+
Python
from pyspark.sql import functions as F
# items : array<struct<price:double, qty:int>>
df_total = df.withColumn(
"basket_total",
F.aggregate(
"items",
F.lit(0.0), # valeur initiale
lambda acc, it: acc + it["price"] * it["qty"],
),
)
# Variante avec finition (moyenne) : accumulateur struct (somme, compte)
df_avg = df.withColumn(
"avg_price",
F.aggregate(
"items",
F.struct(F.lit(0.0).alias("s"), F.lit(0).alias("n")),
lambda acc, it: F.struct((acc["s"] + it["price"]).alias("s"),
(acc["n"] + 1).alias("n")),
lambda acc: acc["s"] / acc["n"],
),
)Result
>>> df_avg.select("order_id", "items", "basket_total", "avg_price") \
... .show(truncate=False)
+--------+----------------------+------------+---------+
|order_id|items |basket_total|avg_price|
+--------+----------------------+------------+---------+
|O-001 |[{12.5, 2}, {49.9, 1}]|74.9 |31.2 |
|O-002 |[{5.0, 4}] |20.0 |5.0 |
+--------+----------------------+------------+---------+PySparkaggregateArrayFold