Distributed snapshots
Snapshotting one process is easy. Pause it, copy its state, resume. Snapshotting N processes spread across the world without ever pausing the whole system sounds impossible. Chandy and Lamport showed in 1985 that it isn't. Their algorithm is so clean it now sits underneath every Flink checkpoint and every Kafka exactly-once pipeline.
The problem
You have N processes talking over FIFO channels. Each has local state. Each channel has messages in flight. A "global state" is the tuple of (every process's local state) + (every channel's in-flight messages). For the snapshot to be useful, it has to be consistent — it must represent a moment that could have happened in a valid run, not a frankenstein where process A is at time T1 and process B is at T2.
You can't stop the system to take the snapshot. You don't have a global clock. You can only send messages and observe what arrives. How do you capture this?
The algorithm
One process (the initiator) records its own local state, then sends a special marker message on every outgoing channel. The first time any process P receives a marker on incoming channel C:
- P records its own local state.
- P starts recording messages arriving on every other incoming channel until each of those channels also delivers a marker.
- P sends a marker on every outgoing channel.
- The state of channel C is empty (the marker arrived before any other messages on C in this snapshot).
Once P has markers on every incoming channel, its part is done: its local state plus the recorded messages on each channel form the snapshot.
What "consistent" means here
The snapshot may not match any instant that really happened, since different processes recorded their state at different real-world times. What it guarantees is that the snapshot is reachable: some valid run of the system would produce exactly this state. Put another way, if event A "happens before" event B, then in the snapshot either both are recorded or only A is. Never just B.
This is enough for checkpointing. If you recover from this snapshot and replay from there, you get a valid continuation. Real time wasn't preserved, but causal order was.
Where it shows up
Apache Flink implements an asynchronous variant of Chandy-Lamport called the Asynchronous Barrier Snapshotting algorithm. Markers (called "barriers") flow through the stream graph. Each operator checkpoints its local state when it sees a barrier on every input. Source operators inject barriers on a schedule; sink operators commit when the barrier reaches them. This pattern is what gives Flink its "exactly-once" stream-processing guarantee.
Kafka Streams uses a related technique with offset commits. The "transaction" wraps reads and writes, and commit rules make sure a downstream consumer sees either all of a transaction's outputs or none. Underneath, the consistency is the same Chandy-Lamport idea.
Distributed debuggers (such as the original Chandy-Lamport demos and modern descendants like Jepsen's analysis tools) use it to capture a "stop the world" view of a running cluster without actually stopping anything.
Variants
Lai-Yang (1987) — doesn't need FIFO channels. It piggybacks colored markers on regular messages: white = pre-snapshot, red = post-snapshot. More overhead per message, but it works on UDP-style transport.
Mattern's snapshot (1989) — uses vector clocks instead of markers. Each process records when its vector clock passes a target value. Handy when you can't easily inject control messages but already have vector clocks in place.
Spezialetti-Kearns — concurrent snapshots from multiple initiators. Production stream processors mostly dodge this complexity by serialising snapshots through a coordinator.
Flink's Asynchronous Barrier Snapshotting, in detail
The original Chandy-Lamport algorithm has two rough edges that matter at scale. It records channel state, which on a busy stream can be a lot of in-flight data. And the recording is synchronous with marker handling, which adds latency. Apache Flink's ABS algorithm (Carbone et al., 2015) fixes both.
Flink's authors leaned on a few facts about a stream graph: the topology is acyclic (or, when not, the cycles are explicit), operators are deterministic given their input, and sources are replayable (Kafka offsets, file positions). Under those assumptions, you don't need to capture channel state separately. You can rebuild it by replaying the source from the position the snapshot recorded.
ABS works like this. The source operator injects a barrier message into its output stream on a schedule. The barrier carries an id (the snapshot number). When a downstream operator receives a barrier on every one of its inputs (the alignment step), it knows it has seen everything from the pre-snapshot world. It then snapshots its local state asynchronously (the operator keeps processing while the snapshot is serialised to S3 or HDFS in the background), forwards the barrier downstream, and continues. When sinks receive the barrier, they commit transactionally.
A checkpoint succeeds when every operator received and forwarded the barrier and every state snapshot was written durably. The Flink JobManager coordinates this with the operators and finalises the checkpoint metadata. Recovery uses the metadata to restore each operator's state and reset sources to the recorded positions.
Aligned vs unaligned barriers — the back-pressure trade-off
The alignment step in standard ABS is where most Flink performance debugging goes. If an operator has two inputs and the barrier arrives on input A long before input B, the operator blocks input A until the matching barrier arrives on B. Any records that arrive on input A during that wait are buffered.
Under load, when one input is naturally slower than another (one Kafka partition has lower volume, one upstream operator is slower), alignment can take hundreds of milliseconds to seconds. That delay spreads: back-pressure rises, sources slow down, end-to-end latency grows. The job stays correct, since the snapshot is still consistent, but throughput collapses.
Flink 1.11 (2020) introduced unaligned checkpoints. Instead of waiting for barriers on all inputs, the operator captures the barrier the moment it arrives on any input, snapshots its local state, and records the buffered in-flight messages on every input that hasn't seen the barrier yet. The marker effectively "overtakes" the in-flight records, and those records are saved as part of the snapshot so recovery can replay them.
Unaligned checkpoints trade snapshot size (channel buffers go into the snapshot) for checkpoint duration (no alignment wait). For high-back-pressure jobs the trade is well worth it; for low-back-pressure jobs aligned is cheaper. The choice is per-job, and the Flink docs call out the conditions where each wins.
Recovery — replaying from a snapshot
A snapshot is only as good as the recovery it enables. The standard recovery procedure in a Flink-style system:
1. Detect failure. A TaskManager dies, a JVM hangs, a network partition isolates an operator. The JobManager notices through a heartbeat timeout.
2. Find the most recent successfully-completed checkpoint. Flink keeps a configurable number of completed checkpoints in S3/HDFS. The JobManager picks the latest one where every operator confirmed its snapshot.
3. Restart every operator on healthy TaskManagers. Each operator reads its slice of the checkpoint and restores local state into a fresh process. For RocksDB state backends this means downloading the SST files; for heap backends it means deserialising the snapshot.
4. Reset sources to the offsets recorded in the snapshot. Kafka sources seek to the per-partition offsets; file sources seek to the byte positions. The records between the snapshot offset and the failure are replayed.
5. Process resumes. For aligned checkpoints, in-flight records are naturally re-derived because the sources will re-emit them. For unaligned checkpoints, the captured channel buffers are replayed into the appropriate operator inputs before source replay resumes.
The whole sequence runs in seconds to minutes depending on state size. The biggest cost is moving the snapshot off durable storage; jobs with hundreds of gigabytes of state can take many minutes to restore. Designing for fast recovery usually means keeping state smaller, sharding it across more operators to parallelise the restore, or using incremental checkpoints that only ship the delta since the previous snapshot.
Why snapshots are the bedrock of exactly-once
Exactly-once stream processing has three pieces, as the streaming systems chapter covers: replayable input, atomic state checkpoint, transactional sink. The middle piece, the atomic state checkpoint, is exactly Chandy-Lamport. Without a consistent snapshot, you can't rewind the system to a known-good point. Without rewinding, you can't reprocess messages whose effect may have been dropped. And without reprocessing, you can't guarantee the effect appears exactly once.
Flink's exactly-once semantics are aligned-barrier snapshots plus two-phase-commit sinks. The barrier flowing through the job acts as a transaction marker. Every operator snapshots its state, the sinks pre-commit, the JobManager commits the global checkpoint, and the sinks finalise. A failure between pre-commit and finalisation is recoverable from the previous checkpoint plus the pre-committed but unfinalised sink writes (which the sink either rolls back on recovery or completes after the new operator restart).
Kafka Streams's exactly-once works much the same way, but uses Kafka transactions instead of Chandy-Lamport barriers. The transaction wraps a batch of reads and writes; if any fail, the whole batch is aborted. Inside, Kafka brokers use a transaction coordinator log to track in-flight transactions and commit or abort atomically. The Chandy-Lamport ideas are there, just folded into Kafka's transaction machinery rather than spelled out as a separate snapshot protocol.
Performance — what a snapshot costs in practice
A working Flink job with 10 operator parallelism and 50GB of total state, checkpointing to S3 every 60 seconds:
Synchronous phase: 10-100ms per operator. The operator must briefly pause processing to take an in-memory copy of state (or, for RocksDB, snapshot its files and continue). For unaligned checkpoints the channel buffers are captured too, which adds memory pressure.
Asynchronous upload: 5-30 seconds for the 50GB. Operators continue processing during this phase; snapshot data is uploaded in the background. If the snapshot takes longer than the checkpoint interval, checkpoints start backing up. That's a classic signal to either increase the interval, enable incremental checkpointing, or reduce state size.
End-to-end checkpoint duration: from first barrier injection to
JobManager confirmation, typically 1-10 seconds for healthy jobs, growing under
back-pressure. The Flink metric to watch is checkpoint_duration, with
alerts when p99 exceeds, say, half the checkpoint interval.
Recovery time: dominated by S3 download speed. For 50GB on AWS, expect 3-8 minutes if downloads are parallelised across TaskManagers. Local recovery from a persistent disk cache (Flink 1.12+) can drop this to seconds when the recovering operator lands on a node that already had a copy.
Beyond stream processing — where else snapshots show up
Chandy-Lamport's reach goes well past Flink and Kafka. A quick tour:
Distributed debuggers. Tools that want to capture "what is the state of my entire microservice mesh right now" use a Chandy-Lamport-shaped protocol. Each service receives a marker, records its state, and passes the marker to its dependencies. The resulting snapshot is consistent in the causal sense, which is useful for "did all of these services agree about this user's state at the same logical moment".
Distributed garbage collection. Mark-and-sweep across a cluster needs to know which references are reachable from which roots, without the roots changing during the mark. A Chandy-Lamport-style barrier through the reference graph captures a consistent reachability snapshot.
Cluster-wide config rollouts. A configuration push that has to "take effect everywhere at the same logical moment" is a snapshot in reverse. The marker triggers applying the new config rather than recording state. Same causal-ordering guarantees.
Long-running transactions in distributed databases. Some distributed-SQL systems (CockroachDB's BACKUP, Spanner's snapshot reads) construct consistent snapshots of the database as of a chosen timestamp using TrueTime or hybrid logical clocks. The mechanism is different (clock-bounded rather than marker-based) but the goal is the same: a causally consistent global state.
Common mistakes
Running Chandy-Lamport on non-FIFO channels: it silently produces inconsistent snapshots. (Use Lai-Yang instead.) Forgetting to record in-flight channel state: a snapshot that's only "process states" misses the messages in transit, which is most of what makes a stream system live. Treating the snapshot as a real-time consistent view: it isn't, it's a causally-consistent one. Confusing Chandy-Lamport with the consensus problem: they're orthogonal. Snapshots capture state; consensus agrees on values.
Further reading
- Distributed Snapshots: Determining Global States of Distributed Systems — Chandy & Lamport, 1985. The original.
- Asynchronous Lightweight Snapshots for Distributed Dataflows — Carbone et al, 2015. The Flink paper.
- Apache Flink — Fault Tolerance via State Snapshots — the operational docs.
- Time, Clocks, and the Ordering of Events — Lamport's earlier paper; the causal-ordering substrate for snapshots.
- Time & clocks deep dive — annotated.