Distributed execution: stages, shuffles, skew & spill
A single machine can only read so fast and hold so much in memory. When a table is terabytes, no one computer can scan it in reasonable time — so modern analytic engines spread one query across many machines working in parallel. This is massively parallel processing (MPP), and it's the architecture behind Trino, Spark SQL, BigQuery, Snowflake, Redshift, and the rest. This lesson explains how a query is split across a cluster, the one operation that dominates distributed cost (the shuffle), and the two failure modes — skew and spill — that turn a fast query slow.
The MPP architecture: coordinator + workers
An MPP engine has two roles:
- The coordinator (Spark calls it the driver; BigQuery, the coordinator) receives your SQL, runs the parse → plan → optimize pipeline, splits the physical plan into pieces, and hands them out. It does no heavy data crunching itself — it orchestrates.
- The workers (executors / nodes / slots) each process a slice of the data in parallel and stream results back. Add workers and the same query goes faster — this is horizontal scaling, the whole point of the architecture.
Separation of storage and compute
The defining trait of the modern distributed engine — and the cleanest break from old databases — is the separation of storage and compute. Old systems bolted the data permanently to the machines that processed it; to get more compute you had to also move the data. Modern engines keep data in object storage (Chapter 2) — cheap, durable, independent — and spin stateless compute clusters up and down against it on demand.
The consequences are profound and worth memorizing:
- Scale compute and storage independently. Petabytes sitting cheaply in object storage, a cluster sized only for the query you're running right now.
- Compute is elastic and disposable. Spin up 100 workers for a heavy job, then shut them off and pay nothing — you're not deleting any data, just the compute.
- Many engines, one copy of data. Trino, Spark, and DuckDB can all query the same Parquet/Iceberg files. The data isn't trapped inside one product.
This is why "query-on-the-lake" engines exist at all, and it sets up the engine landscape and the lakehouse.
Stages and the shuffle (exchange)
The optimizer splits a distributed plan into stages — chunks of work that can run fully in parallel with no data movement between workers. A stage runs to completion, and between two stages sits an exchange (Spark and most engines call it a shuffle): a step that redistributes data across the network so the next stage's workers get the rows they need.
Why is a shuffle ever needed? Because some operations require all rows with the same key to be on the same worker:
- ShopFlow's revenue-by-customer rollup,
GROUP BY customer_idoverorders(ShopFlow — see Meet ShopFlow), must gather every row for a givencustomer_idonto one worker to sum itsamount— but those rows started scattered across all workers. The shuffle repartitions data bycustomer_idso each customer lands on one worker. - The
orders⋈order_itemsjoin from the previous lesson is a shuffle join (neither side is small enough to broadcast): it repartitions both tables byorder_idso matching keys co-locate.
The shuffle is almost always the most expensive part of a distributed query, because it moves data over the network (orders of magnitude slower than memory) and often writes it to disk in between. In an EXPLAIN plan, every Exchange/Shuffle/Repartition node is a place where data crosses the network — the first thing to look at when tuning. The durable optimization goal is simple: shuffle less. Predicate/partition pruning, projection pruning, pre-aggregating before the shuffle, and broadcasting small tables instead of shuffling big ones all serve that one goal.
The two failure modes at scale
Parallelism only pays off if the work is evenly divided. The two classic distributed-query problems are both about that breaking down.
Skew: one worker gets too much
Data skew is when data is unevenly distributed across workers, so one (or a few) workers get far more rows than the rest. A stage finishes only when its slowest worker finishes, so one overloaded worker stalls the entire stage while the others sit idle. Total parallelism, wasted.
The classic cause is a lopsided key. Shuffle by customer_id, but 40% of rows have customer_id = 0 (the "unknown customer" sentinel)? All of those land on one worker. Symptoms in a plan/monitor: one task runs 50× longer than its siblings, or one shuffle partition is enormous. Fixes include salting the skewed key (append a random suffix to spread the hot value across workers, then re-aggregate), filtering the sentinel out, or using an engine's built-in adaptive skew handling (Spark AQE, for instance, can split a skewed partition automatically). The durable point: even partitioning is the precondition for parallel speed; a skewed key destroys it.
Spill: a worker runs out of memory
Each worker has limited memory. When an operation that wants to be in memory — building a hash table, sorting, holding a big aggregation — doesn't fit, the engine spills the excess to local disk and continues. Spilling keeps the query correct (it won't crash), but disk is far slower than memory, so a spilling query slows dramatically. Heavy spill shows up as a query that's mysteriously slow with high disk I/O.
Spill is usually a symptom of too much data on one worker — which often means skew (above), an over-large broadcast/build side, or simply an under-provisioned cluster. The fixes rhyme with everything else in the chapter: reduce the data reaching the operator (pushdown/pruning), fix skew so no single worker is overloaded, give the cluster more memory, or pre-aggregate before the heavy step.
:::tip Diagnosing a slow distributed query, in order
- Read less — is pushdown/pruning working? (check the scans). 2. Shuffle less — how many
Exchangenodes, and could a broadcast replace a shuffle join? 3. Skew — is one task far slower / one partition far bigger than its peers? 4. Spill — high disk I/O during a build/sort/aggregate? Almost every "why is this slow" answer is one of these four, and all four are visible in anEXPLAIN ANALYZEplan plus the engine's query monitor. :::
Why it matters
Distributed engines get their speed by splitting one query across many workers, coordinated by a planner and reading from shared object storage that's decoupled from compute. That decoupling is the architectural idea behind every modern engine and the lakehouse. But parallel speed is fragile: it depends on moving as little data as possible (the shuffle is the dominant cost) and on dividing work evenly (skew breaks this; spill is what happens when a worker is overloaded). Understanding stages, shuffles, skew, and spill is what lets you look at a query taking an hour on a big cluster and say exactly which of the four levers to pull — instead of just throwing more machines at it.
Common pitfalls
- Throwing hardware at a shuffle/skew problem. More workers don't help if one skewed partition is the bottleneck — the extra workers sit idle.
- Ignoring the
Exchangenodes. They're the expensive part; tune to remove or shrink them (broadcast, pre-aggregate, prune) before anything else. - A sentinel/
NULL-heavy join or group key. Concentrates rows on one worker — the most common skew source. Filter or salt it. - Assuming spill = a crash. It's correct-but-slow. The slowness, not an error, is the signal.
- Forgetting storage/compute are separate. Killing a cluster does not delete your data (it's in object storage); and two clusters can read the same files at once.
Cross-links
- Builds on: Join algorithms (broadcast vs shuffle join) and Inside a query engine (the
Exchangein the plan). - Foreshadows: Batch processing & Spark — partitions, shuffles, and skew are the heart of Spark tuning — and the lakehouse, built on storage/compute separation.
- Next: The engine landscape → — which real engines embody this architecture, and how lake engines differ from warehouses.
Checkpoint
- What is the defining architectural trait of a modern distributed engine versus an old database, and name one concrete benefit it unlocks.
- Why does
GROUP BY customer_idoverordersrequire a shuffle, and why is the shuffle usually the most expensive part of the query? - A stage has 200 tasks; 199 finish in seconds and 1 runs for 20 minutes. Name the problem and one fix.
Answers
- Separation of storage and compute — data lives in object storage, stateless compute runs against it on demand. Benefits (any one): scale compute and storage independently; spin clusters up/down and pay only for compute used; multiple engines query the same copy of data.
GROUP BY customer_idmust place all of each customer'sordersrows on one worker to aggregate them, but they start scattered — so the engine shuffles (repartitions bycustomer_id) across the network. It's the most expensive part because it moves data over the network (far slower than memory) and often spills to disk between stages.- Data skew — one shuffle partition (a hot key) got far more rows than the others, so its task dominates while the rest idle. Fixes: salt the skewed key, filter out the sentinel value, or enable adaptive skew handling (e.g. Spark AQE).