Spark

Sessionization: dividir en sesiones por gap de inactividad

Asignacion de un identificador de sesion cuando la diferencia entre dos eventos supera los 30 minutos: acumulado de un flag de ruptura.

Requisitos

PySpark 3.x

Python
from pyspark.sql import functions as F
from pyspark.sql.window import Window

GAP_SEC = 30 * 60
w = Window.partitionBy("user_id").orderBy("event_ts")

sessions = (
    events
    .withColumn("prev_ts", F.lag("event_ts").over(w))
    .withColumn(
        "new_session",
        (F.col("prev_ts").isNull() |
         (F.col("event_ts").cast("long") - F.col("prev_ts").cast("long") > GAP_SEC)
        ).cast("int"),
    )
    # Le cumul des ruptures numérote les sessions
    .withColumn("session_seq", F.sum("new_session").over(
        w.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
    .withColumn("session_id", F.concat_ws("-", "user_id", "session_seq"))
)

Resultado

+-------+-------------------+-----------+----------+
|user_id|           event_ts|new_session|session_id|
+-------+-------------------+-----------+----------+
|  U-042|2026-06-09 10:00:12|          1|   U-042-1|
|  U-042|2026-06-09 10:04:55|          0|   U-042-1|
|  U-042|2026-06-09 10:21:03|          0|   U-042-1|
|  U-042|2026-06-09 11:30:48|          1|   U-042-2|
|  U-042|2026-06-09 11:32:11|          0|   U-042-2|
+-------+-------------------+-----------+----------+
PySparkWindowSessionizationClickstream

Snippets relacionados

Volver al Data Lab