Spark
Spark in the real world
A data engineer's field notebook: every PySpark recipe answers a real production trap — skew, small files, runaway lineage, NULLs that won't match, a count() that scans 1.2 TB — and proves itself with authentic console output (df.show, explain plans, MERGE metrics). PySpark that's measured and instrumented (x34, x88 speedups), centered on Delta Lake, time windowing, and data quality.
20 featured snippets
- True time window with rangeBetween over a timestampAggregate over the last rolling 24 hours (not the last N rows) by converting the timestamp to seconds.
- Forward fill: carry the last known valuelast(ignorenulls=True) over a window unbounded to the left to fill the gaps in a series — pandas' ffill, distributed.
- Sessionization: splitting into sessions by inactivity gapAssign a session id whenever the gap between two events exceeds 30 minutes — a running sum over a break flag.
- Semi-join: filter by existence without pulling columnsleft_semi filters the left table on the existence of a key on the right, without duplicating rows even when the right side has duplicates.
- Null-safe join with eqNullSafeStandard equality drops NULLs on both sides (NULL == NULL is NULL); eqNullSafe matches them — essential for nullable composite keys.
- Salting: breaking a join's skewAdd a random salt on the skewed table and explode the reference table to spread a hyper-dominant key across N partitions.
- Dynamic partition overwrite (safe backfill)Dynamic mode only replaces the partitions present in the written DataFrame, instead of wiping the whole table — the setting that saves backfills.
- Bucketing: pre-shuffling a recurring join tableTables bucketed on the same key and the same number of buckets: the subsequent joins and aggregations avoid the shuffle.
- Replacing a Python UDF with native functionsA Python UDF serializes every row to a Python worker and blinds Catalyst; the same logic in native functions stays optimizable and runs 10 to 100x faster.
- aggregate: reduce an array to a single valueA native fold over an array: weighted sum, conditional concatenation — without explode/groupBy, so no shuffle.
- SCD Type 2 with Delta MERGETwo-action history tracking: close out the current version and insert the new one, using the NULL mergeKey trick for changed rows.
- checkpoint: truncate a lineage that's gotten out of handIn iterative pipelines, the logical plan grows until it triggers StackOverflow errors or explosive recomputation; checkpoint cuts the lineage.
- Reading an execution plan: the 4 signals to look forexplain(mode='formatted') and the markers that matter: Exchange (shuffle), join type, PushedFilters and PartitionFilters.
- monotonically_increasing_id: unique yes, consecutive noThe id encodes the partition number in its high bits: values jump by billions between partitions. A global row_number is consecutive but serializes everything.
- Daily snapshots → CDC stream (INSERT / UPDATE / DELETE)A full_outer join between the day-D and day-(D-1) snapshots, compared via an sha2 hash of the tracked columns, classifies each row as INSERT, UPDATE or DELETE: a real CDC stream with no connector.
- Streaks: counting consecutive days of declineThe gaps-and-islands pattern applied to trends: a running count of 'breaks' segments the series into groups, and the size of the declining groups reveals products in free fall for N days.
- Retention by monthly cohortEach customer is tied to their cohort (first month of purchase), age in months is pivoted, and the M1/M2/M3 columns express retention as a percentage of the original cohort.
- Pre-aggregate before joining: shuffle cut by 16,000xAggregating 800M clicks down to 50k rows BEFORE the join turns a massive SortMergeJoin into a BroadcastHashJoin: same result, near-zero shuffle.
- Anti-explosion guardrail: estimate a join before you pay for itThe number of rows a join will produce can be computed ahead of time: the sum of the products of the per-key cardinalities. A threshold turns the estimate into a blocking guardrail.
- isEmpty instead of count(): the guardrail that cost 3 minutesChecking for the existence of data with count() scans EVERYTHING to produce a number you then compare to zero; isEmpty (or take(1)) stops at the first row found.