Transformations, actions & lazy evaluation
You now know the machines (driver, executors, cluster manager) and the work hierarchy (jobs, stages, tasks). This lesson explains the part that surprises everyone new to Spark: when you write df.filter(...), Spark doesn't do anything. It just makes a note. Understanding why — and what finally makes it run — is the key to both correctness and speed. This is the execution model proper.
The three APIs: RDD, DataFrame, Dataset
Spark gives you three ways to represent distributed data. You should know all three names and use exactly one of them.
- RDD (Resilient Distributed Dataset) — the original, low-level API. An RDD is an immutable, partitioned collection of objects spread across the cluster, on which you call functional operations (
map,filter,reduceByKey). "Resilient" = it can rebuild lost partitions from lineage (more below). RDDs are powerful but opaque: Spark sees your code as a black box of arbitrary functions and can't optimize it. You'll rarely write RDDs today; they're the foundation everything else compiles down to. - DataFrame — a distributed table: rows and named, typed columns, exactly like a SQL table or a pandas DataFrame, but spread across the cluster. You manipulate it with a declarative API (
df.filter,df.groupBy) or literal SQL. Crucially, Spark understands these operations and can rewrite them for speed. This is the API you should use. - Dataset — a typed DataFrame available in Scala/Java (compile-time type safety). It doesn't exist meaningfully in PySpark (Python is dynamically typed), so in Python "DataFrame" is the whole story.
Why DataFrames/SQL are preferred: the optimizer can see them
Here's the deep reason DataFrames beat RDDs, and it's worth internalizing. When you write an RDD map with a Python lambda, Spark sees an inscrutable function — it has no idea what's inside, so it must run it exactly as written. When you write df.filter(df.amount > 100) (the amount total on ShopFlow's orders — see Meet ShopFlow), Spark sees a structured, declarative expression it can reason about: it knows you're filtering on amount, so it can push that filter down to the data source, reorder it before an expensive join, or prune columns you never use. Structure that the engine understands is structure it can optimize. That optimizer is Catalyst, and we cover it at the end of this lesson. The one-line takeaway: prefer the DataFrame and SQL APIs; drop to RDDs only when you genuinely can't express something declaratively.
Transformations vs actions
Every operation on a DataFrame (or RDD) is one of two kinds. This distinction is the heart of Spark.
- A transformation describes a new dataset derived from an existing one —
filter,select,groupBy,join,withColumn. Transformations are lazy: calling one does not compute anything. It just records "here's another step I want." It returns a new DataFrame representing the recipe, not the result. - An action asks for an actual result —
count,collect,show,write,take. An action is what finally triggers computation: Spark looks at the whole recipe of transformations leading up to it, plans it, and runs it.
That's lazy evaluation: transformations pile up unexecuted; an action forces the whole pile to run at once.
Tracing lazy evaluation
Trace this and predict where work happens — it's the canonical ShopFlow batch job, daily revenue, built from line items (ShopFlow — see Meet ShopFlow). Each line item's revenue is quantity × unit_price (the price captured at sale time, order_items.unit_price), and dt is the order-date partition column the lake is laid out by (Chapter 2):
from pyspark.sql import functions as F
items = spark.read.parquet("s3://shopflow/order_items/") # transformation (lazy) — nothing reads yet
orders = spark.read.parquet("s3://shopflow/orders/") # lazy — nothing reads yet
priced = (items
.withColumn("line_revenue", F.col("quantity") * F.col("unit_price")) # lazy — nothing happens
.join(orders.select("order_id", "dt"), "order_id")) # lazy — brings in the order date (dt)
daily = priced.groupBy("dt").agg(F.sum("line_revenue").alias("revenue")) # transformation (lazy) — still nothing
# ... up to here, ZERO data has been read or processed ...
daily.show() # ACTION — NOW everything above runs
Until show(), Spark has executed none of it — no file was read, no line_revenue computed. It merely built up a recipe: read → derive line_revenue → join to get the day → group by day → sum. The show() action triggers Spark to plan and execute the entire chain. If you had never called an action, the program would read no data at all. (dt is orders' order-date partition column — line items get it by joining on order_id.)
:::tip Why laziness is a feature, not a quirk
Because Spark sees the whole recipe before running anything, it can optimize across the entire chain. It can push the read down to load only the columns the job touches (order_id, quantity, unit_price from order_items; order_id, dt from orders), fuse the withColumn into the scan, and drop every column you never reference. An eager engine that ran each line immediately couldn't do any of this — it would've already read every column of order_items before it knew you only wanted three. Laziness is what enables the optimizer.
:::
Lineage and the DAG
As transformations stack up, Spark records them as a DAG — a Directed Acyclic Graph, a flowchart of steps where each operation points to the one(s) it depends on, with no cycles. This recorded chain of "this dataset came from that dataset via this operation" is the dataset's lineage.
Lineage is how Spark achieves fault tolerance without copying data everywhere. If an executor dies and takes some partitions with it, Spark doesn't fail the job — it reads the lineage, sees exactly how to recompute just those lost partitions from their inputs, and re-runs only that work on another executor. This is the MapReduce "re-run the failed piece" idea (lesson 5.1) generalized: because every dataset knows its recipe, any lost piece is reproducible.
When an action fires, Spark's DAG scheduler takes this graph, cuts it into stages at the shuffle boundaries (lesson 5.2), and dispatches tasks. The DAG you authored becomes the stages-and-tasks the cluster runs.
Narrow vs wide transformations
Not all transformations cost the same, and the difference is the shuffle. Every transformation is either narrow or wide:
- A narrow transformation —
filter,select,map,withColumn— is one where each output partition depends on exactly one input partition. No data needs to move between machines; each executor transforms its own partition in place. Narrow transformations are cheap and chain together inside a single stage. On ShopFlow,items.filter(items.quantity > 0)anditems.withColumn("line_revenue", F.col("quantity") * F.col("unit_price"))are both narrow — each executor reworks its own slice oforder_itemswith no data leaving the machine. - A wide transformation —
groupBy,join,distinct,orderBy,repartition— is one where each output partition depends on many input partitions. To compute it, rows must be regrouped across the cluster so related rows co-locate — that's a shuffle. Wide transformations are expensive and force a stage boundary. On ShopFlow, the daily-revenuegroupBy("dt").agg(F.sum("line_revenue"))and theorders ⋈ order_itemsjoin (next lesson) are both wide — they must move rows across the network to bring matching keys together.
NARROW (no movement) WIDE (shuffle — data crosses machines)
P1 ──→ P1' P1 ─┐
P2 ──→ P2' P2 ─┼─→ regroup by key across the network ─→ P1', P2', ...
P3 ──→ P3' P3 ─┘
each output from one input each output pulls from many inputs
This is the most useful lens in all of Spark tuning: scan your code for the wide transformations — those are your shuffles, your stage boundaries, and your costs. A job with no wide transformations never shuffles and scales almost perfectly. Every groupBy, join, and orderBy is a place to ask "do I really need this, and can I make it cheaper?" (lesson 5.4).
Catalyst and Tungsten: why DataFrames are fast
We promised the optimizer. Two engines turn your declarative DataFrame code into fast execution; you don't call them, but knowing they exist explains why DataFrames beat hand-written RDDs.
Catalyst is Spark's query optimizer. When an action fires, Catalyst takes your DataFrame/SQL operations and:
- Builds a logical plan (what you asked for).
- Applies optimization rules — predicate pushdown (run filters as early as possible, ideally inside the file scan so less data loads), column pruning (read only the columns you use), constant folding, reordering joins.
- Produces an optimized physical plan (how to actually run it — including which join strategy to use, lesson 5.4).
You can see this plan with df.explain() — a core skill, since reading the plan tells you what Spark will really do (e.g. whether it pushed your filter down, which join it chose). This is the Spark equivalent of the query plans you read in Chapter 3.
Tungsten is Spark's execution engine, focused on squeezing the most out of CPU and memory:
- Whole-stage code generation (codegen) — instead of interpreting your operations one row at a time, Tungsten generates compact Java code for an entire stage and runs that, collapsing many operations into one tight loop.
- Off-heap memory + compact binary formats — it stores data in a cache-friendly binary layout outside the JVM's managed heap, avoiding object overhead and garbage-collection pressure.
- Vectorization — processing batches of rows at once (especially for columnar formats like Parquet, Chapter 2) instead of row-by-row, which modern CPUs handle far faster.
The combined payoff: write declarative DataFrame code, and Catalyst + Tungsten compile it into execution that's often faster than careful hand-written RDD code — and you didn't have to think about any of it. This is the single strongest reason to use DataFrames over RDDs.
:::note The dark side of opacity (foreshadowing UDFs) Catalyst can only optimize what it understands. The moment you insert a Python UDF (a user-defined function — your own Python code Spark calls per row), Catalyst goes blind: it can't see inside your function, can't push anything through it, and must ship every row out to a Python process and back. That destroys the very advantage this lesson is about. We'll quantify the damage in lesson 5.5 — for now, just know that arbitrary Python is the enemy of the optimizer. :::
Why it matters
The transformation/action split explains Spark's most confusing behaviors: why your "read" line is instant (it's lazy) but show() takes a minute (it ran everything); why an error in your filter only surfaces at the action (nothing executed until then); why Spark can recover from a dead machine (lineage). The narrow/wide split is your shuffle radar — the foundation of every tuning decision. And Catalyst/Tungsten are why the advice "use DataFrames, not RDDs, and avoid Python UDFs" is performance gospel rather than mere style.
Common pitfalls
- Expecting transformations to run. They don't — only actions do. A pipeline of transformations with no action does nothing. Conversely, calling actions repeatedly (
count()to "check progress") re-runs the whole lineage each time unless you cache (lesson 5.5). - Reaching for RDDs out of habit. RDDs bypass Catalyst and Tungsten — you lose all optimization. Use DataFrames/SQL unless you truly can't express the operation declaratively.
- Not reading
explain(). The physical plan tells you what Spark actually does — whether your filter pushed down, which join it picked. Guessing instead of reading the plan is how people "tune" the wrong thing. - Ignoring the narrow/wide distinction. Treating
groupByas if it were as cheap asfilteris how jobs accidentally shuffle terabytes.
Cross-links
- ← Came from Spark architecture — the DAG scheduler cuts this DAG into the stages you saw there.
- → Going deeper: The shuffle, skew & joins takes the wide transformations from this lesson and shows what they cost and how to tame them.
- → Tuning & PySpark in practice covers caching (so repeated actions don't recompute) and why Python UDFs blind Catalyst.
Checkpoint
- Is
df.filter(...)a transformation or an action? Does it run immediately? What finally makes it run? - Classify each as narrow or wide, and say which forces a stage boundary:
select,join,withColumn,groupBy. - In one sentence, why can Catalyst optimize a DataFrame
filterbut not an RDDmapwith a Python lambda?
Answers
filteris a transformation, and it is lazy — it does not run; it just records a step. An action (e.g.show,count,write) finally triggers Spark to plan and execute the whole chain.select→ narrow;withColumn→ narrow (neither moves data).join→ wide;groupBy→ wide. The wide ones (join,groupBy) shuffle and force a stage boundary.- Catalyst understands the structured expression
amount > 100(so it can push it down, reorder, prune), but a Python lambda is an opaque black box it can't see inside, so it must run it exactly as written with no optimization.