Spark

Sessionization: برش session بر اساس فاصله بی‌فعالیتی

هر وقت فاصله دو رویداد از ۳۰ دقیقه بیشتر بشه، یک شناسه session جدید می‌دی — با جمع تجمعی روی یک فلگ شروع.

کاربرد

بازسازی sessionهای مرور وب از روی یک جریان خام کلیک.

پیش‌نیازها

PySpark 3.x

Python
from pyspark.sql import functions as F
from pyspark.sql.window import Window

GAP_SEC = 30 * 60
w = Window.partitionBy("user_id").orderBy("event_ts")

sessions = (
    events
    .withColumn("prev_ts", F.lag("event_ts").over(w))
    .withColumn(
        "new_session",
        (F.col("prev_ts").isNull() |
         (F.col("event_ts").cast("long") - F.col("prev_ts").cast("long") > GAP_SEC)
        ).cast("int"),
    )
    # Le cumul des ruptures numérote les sessions
    .withColumn("session_seq", F.sum("new_session").over(
        w.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
    .withColumn("session_id", F.concat_ws("-", "user_id", "session_seq"))
)

نتیجه

+-------+-------------------+-----------+----------+
|user_id|           event_ts|new_session|session_id|
+-------+-------------------+-----------+----------+
|  U-042|2026-06-09 10:00:12|          1|   U-042-1|
|  U-042|2026-06-09 10:04:55|          0|   U-042-1|
|  U-042|2026-06-09 10:21:03|          0|   U-042-1|
|  U-042|2026-06-09 11:30:48|          1|   U-042-2|
|  U-042|2026-06-09 11:32:11|          0|   U-042-2|
+-------+-------------------+-----------+----------+
PySparkWindowSessionizationClickstream

اسنیپت‌های مرتبط

بازگشت به آزمایشگاه داده