Spark

Streaks: contar los días consecutivos de caída

El patrón gaps-and-islands aplicado a las tendencias: un acumulado de 'rupturas' segmenta la serie en grupos, y el tamaño de los grupos de caída revela los productos en caída libre desde hace N días.

Requisitos

PySpark 3.x

Python
from pyspark.sql import functions as F, Window

w = Window.partitionBy("product_id").orderBy("jour")
w_cumul = w.rowsBetween(Window.unboundedPreceding, 0)

streaks = (
    ventes_jour
    .withColumn("baisse", (F.col("ca") < F.lag("ca").over(w)).cast("int"))
    .withColumn("rupture", F.when(F.col("baisse") == 0, 1).otherwise(0))
    .withColumn("groupe", F.sum("rupture").over(w_cumul))
    .groupBy("product_id", "groupe")
    .agg(F.sum("baisse").alias("jours_de_baisse"),
         F.max("jour").alias("jusqu_au"))
    .filter("jours_de_baisse >= 5")
)
streaks.select("product_id", "jours_de_baisse", "jusqu_au") \
       .orderBy(F.desc("jours_de_baisse")).show(4)

Resultado

+----------+---------------+----------+
|product_id|jours_de_baisse|  jusqu_au|
+----------+---------------+----------+
|  P-220150|             11|2026-06-09|
|  P-184022|              8|2026-06-09|
|  P-302881|              6|2026-06-07|
|  P-441203|              5|2026-06-09|
+----------+---------------+----------+

4 produits en baisse ininterrompue depuis 5+ jours
P-220150 (11 jours) : déréférencé chez le concurrent ? prix ? → enquête pricing
PySparkGaps and islandsWindowTendance

Snippets relacionados

Volver al Data Lab