Streaks: contar los días consecutivos de caída
El patrón gaps-and-islands aplicado a las tendencias: un acumulado de 'rupturas' segmenta la serie en grupos, y el tamaño de los grupos de caída revela los productos en caída libre desde hace N días.
Requisitos
PySpark 3.x
Python
from pyspark.sql import functions as F, Window
w = Window.partitionBy("product_id").orderBy("jour")
w_cumul = w.rowsBetween(Window.unboundedPreceding, 0)
streaks = (
ventes_jour
.withColumn("baisse", (F.col("ca") < F.lag("ca").over(w)).cast("int"))
.withColumn("rupture", F.when(F.col("baisse") == 0, 1).otherwise(0))
.withColumn("groupe", F.sum("rupture").over(w_cumul))
.groupBy("product_id", "groupe")
.agg(F.sum("baisse").alias("jours_de_baisse"),
F.max("jour").alias("jusqu_au"))
.filter("jours_de_baisse >= 5")
)
streaks.select("product_id", "jours_de_baisse", "jusqu_au") \
.orderBy(F.desc("jours_de_baisse")).show(4)Resultado
+----------+---------------+----------+ |product_id|jours_de_baisse| jusqu_au| +----------+---------------+----------+ | P-220150| 11|2026-06-09| | P-184022| 8|2026-06-09| | P-302881| 6|2026-06-07| | P-441203| 5|2026-06-09| +----------+---------------+----------+ 4 produits en baisse ininterrompue depuis 5+ jours P-220150 (11 jours) : déréférencé chez le concurrent ? prix ? → enquête pricing
PySparkGaps and islandsWindowTendance