Spark
Spark en el mundo real
Un cuaderno de campo de ingeniero de datos: cada receta PySpark responde a una trampa real de producción —skew, small files, lineage explosivo, NULL que no hacen match, un count() que escanea 1,2 TB— y se verifica con una salida de consola auténtica (df.show, plan de explain, métricas de MERGE). PySpark medido e instrumentado (mejoras x34, x88), orientado a Delta Lake, ventanas temporales y calidad de datos.
20 snippets destacados
- Ventana temporal real con rangeBetween sobre timestampAgregar sobre las ultimas 24 horas moviles (y no las ultimas N filas) convirtiendo el timestamp a segundos.
- Forward fill: propagar el ultimo valor conocidolast(ignorenulls=True) sobre una ventana no acotada a la izquierda para rellenar los huecos de una serie: el ffill de pandas, en distribuido.
- Sessionization: dividir en sesiones por gap de inactividadAsignacion de un identificador de sesion cuando la diferencia entre dos eventos supera los 30 minutos: acumulado de un flag de ruptura.
- Semi-join: filtrar por existencia sin traer columnasleft_semi filtra la tabla de la izquierda por la existencia de una clave a la derecha, sin duplicar las filas aunque la derecha contenga duplicados.
- Union null-safe con eqNullSafeLa igualdad estandar elimina los NULL de ambos lados (NULL == NULL es NULL); eqNullSafe los empareja, imprescindible en claves compuestas nullable.
- Salting: romper el skew de una unionAnadir una sal aleatoria del lado de la tabla con skew y explotar el lado del referencial para repartir una clave hiperdominante en N particiones.
- Sobrescritura dinamica de particiones (backfill seguro)El modo dynamic solo reemplaza las particiones presentes en el DataFrame escrito, en lugar de purgar toda la tabla: el ajuste que salva los backfills.
- Bucketing: pre-shuffle de una tabla de union recurrenteTablas bucketed sobre la misma clave y el mismo numero de buckets: las uniones y agregaciones posteriores evitan el shuffle.
- Sustituir una UDF de Python por funciones nativasUna UDF de Python serializa cada fila hacia un worker de Python y ciega a Catalyst; la misma logica en funciones nativas sigue siendo optimizable y resulta de 10 a 100x mas rapida.
- aggregate: reducir un array a un solo valorUn fold nativo sobre un array: suma ponderada, concatenación condicional — sin explode/groupBy, por lo que no hay shuffle.
- SCD Type 2 con Delta MERGEHistorización en dos acciones: cerrar la versión actual e insertar la nueva, mediante el truco del mergeKey NULL para las filas que cambian.
- checkpoint: truncar un lineage que se ha vuelto inmanejableEn los pipelines iterativos, el plan lógico crece hasta provocar StackOverflow o recálculos explosivos; el checkpoint corta el lineage.
- Leer un plan de ejecución: las 4 señales a buscarexplain(mode='formatted') y los marcadores que importan: Exchange (shuffle), tipo de join, PushedFilters y PartitionFilters.
- monotonically_increasing_id: único sí, consecutivo noEl identificador codifica el número de partición en sus bits altos: los valores saltan de mil en mil millones entre particiones. Un row_number global es consecutivo, pero serializa todo.
- Snapshots diarios → flujo CDC (INSERT / UPDATE / DELETE)Un full_outer join entre los snapshots del día D y D-1, comparados mediante un hash sha2 de las columnas rastreadas, clasifica cada fila como INSERT, UPDATE o DELETE: un flujo CDC real, sin conector.
- Streaks: contar los días consecutivos de caídaEl patrón gaps-and-islands aplicado a las tendencias: un acumulado de 'rupturas' segmenta la serie en grupos, y el tamaño de los grupos de caída revela los productos en caída libre desde hace N días.
- Retención por cohorte mensualCada cliente se vincula a su cohorte (primer mes de compra), la antigüedad en meses se pivota, y las columnas M1/M2/M3 expresan la retención como porcentaje de la cohorte inicial.
- Preagregar antes de unir: el shuffle dividido por 16 000Agregar 800 M de clics a 50 k filas ANTES del join convierte un SortMergeJoin masivo en un BroadcastHashJoin: mismo resultado, shuffle casi nulo.
- Barrera anti-explosión: estimar un join antes de pagarloEl número de filas que producirá un join se puede calcular de antemano: la suma de los productos de las cardinalidades por clave. Un umbral convierte la estimación en una barrera que bloquea.
- isEmpty en lugar de count(): la barrera que costaba 3 minutosComprobar la existencia de datos con count() escanea TODO para producir un número que luego comparas con cero; isEmpty (o take(1)) se detiene en la primera fila encontrada.