Skip to content

Checkpointing

Periodically saving processing state to enable recovery from failures without reprocessing all data from the beginning

TL;DR

Checkpointing is the process of periodically saving the processing state of a stream processing application, enabling it to resume from the last checkpoint rather than restart from scratch after failures. Essential for exactly-once semantics in systems like Apache Flink, Spark Streaming, and Kafka Streams.

Visual Overview

Checkpointing Overview

Core Explanation

What is Checkpointing?

Checkpointing is a fault-tolerance mechanism that periodically saves the state of a distributed computation to durable storage. For stream processing, this includes:

  1. Application State: Counters, aggregations, windows, caches
  2. Input Positions: Kafka offsets, file positions, database cursors
  3. Metadata: Checkpoint ID, timestamp, version

Key Property: Consistency

A checkpoint must capture a consistent snapshot across all parallel operators, ensuring:

  • No messages are lost (completeness)
  • No messages are duplicated (exactly-once)
  • State is consistent across operators

Checkpoint Barriers (Chandy-Lamport Algorithm)

Stream processing systems use barriers to coordinate consistent snapshots without stopping the stream:

Checkpoint Barriers

Checkpoint Interval Trade-offs

Checkpoint Interval Trade-offs

Checkpoint Storage

Where to Store Checkpoints:

Checkpoint Storage Options

Incremental Checkpointing

Incremental Checkpointing

Exactly-Once Semantics with Checkpointing

Exactly-Once Semantics

Real Systems Using Checkpointing

SystemCheckpoint MechanismDefault IntervalState BackendUse Case
Apache FlinkChandy-Lamport barriersDisabled (manual)RocksDB, HeapReal-time analytics, ETL
Spark Structured StreamingMicro-batch checkpointing5 secondsHDFS, S3Batch + streaming
Kafka StreamsState stores + offset commits30 secondsRocksDBStream processing
Apache StormRecord-level ackingN/A (acking-based)N/ALow-latency streaming
Apache SamzaChangelog-based1 minuteRocksDB + KafkaStateful streaming
Apache Flink Checkpointing

Case Study: Kafka Streams Checkpointing

Kafka Streams Checkpointing

When to Use Checkpointing

✓ Perfect Use Cases

Long-Running Stream Processing Jobs

Long-Running Stream Processing

Stateful Aggregations

Stateful Aggregations

Complex Event Processing

Complex Event Processing

✕ When NOT to Use (or Use Carefully)

Stateless Processing

Stateless Processing

Ultra-Low Latency Requirements (< 1ms)

Ultra-Low Latency

Small Batch Jobs (< 1 minute)

Small Batch Jobs

Interview Application

Common Interview Question

Q: “Design a real-time fraud detection system processing millions of transactions/second. How would you ensure exactly-once processing and fast recovery from failures?”

Strong Answer:

“I’d design a checkpointed stream processing system:

Architecture:

  • Framework: Apache Flink (for exactly-once guarantees)
  • State Backend: RocksDB with incremental checkpoints
  • Checkpoint Storage: S3 (durable, scalable)
  • Checkpoint Interval: 1 minute (balance overhead vs recovery time)

State Management:

  • User Profile Cache: Latest transactions per user (for pattern detection)
  • Fraud Rules State: Configurable thresholds, ML model parameters
  • Windows: 5-minute sliding windows for transaction aggregations
  • State Size Estimate: 50 GB (10M users × 5KB profile)

Checkpointing Strategy:

  1. Incremental Checkpoints:
    • Full checkpoint hourly
    • Incremental deltas every minute
    • Reduces I/O from 50 GB/min to ~500 MB/min
  2. Asynchronous Snapshots:
    • Checkpoint in background threads
    • Minimal impact on throughput (less than 5%)
  3. Barrier Alignment:
    • Use Chandy-Lamport barriers for consistency
    • Handle out-of-order messages correctly

Exactly-Once Guarantees:

  1. Source: Read from Kafka with transactional semantics
  2. Processing: Flink exactly-once mode
  3. Sink: Two-phase commit to output database
    • Pre-commit during checkpoint
    • Finalize after checkpoint complete

Recovery Flow:

  1. Failure Detection: Flink JobManager detects task failure
  2. Checkpoint Restore:
    • Restore state from latest checkpoint (S3)
    • Reset Kafka offsets to checkpoint position
    • Time: ~30 seconds
  3. Reprocessing:
    • Replay 1 minute of messages since checkpoint
    • Time: ~30 seconds (depends on throughput)
  4. Total RTO: ~90 seconds

Optimization:

  • Local State: RocksDB on SSD for fast access
  • State Sharding: Partition state by user_id (key-by user_id)
  • Compression: Enable Snappy compression for checkpoints
  • Monitoring: Alert on checkpoint duration > 60s

Trade-offs:

  • Checkpoint interval: 1 min = 5% overhead, 90s recovery
  • Faster checkpoints (30s): 8% overhead, 60s recovery
  • Slower checkpoints (5 min): 2% overhead, 5-6 min recovery
  • Chose 1 min as balance for fraud detection SLA

Disaster Recovery:

  • Savepoints: Manual checkpoints for version upgrades
  • Cross-region replication: Replicate checkpoints to DR region
  • Retention: Keep last 10 checkpoints (10 hours history)“

Code Example

Simple Checkpointing Implementation

import time
import json
import hashlib
from typing import Dict, Any
from pathlib import Path

class CheckpointManager:
    """
    Simple checkpointing system for stream processing
    """
    def __init__(self, checkpoint_dir: str, interval_seconds: int = 60):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
        self.interval_seconds = interval_seconds
        self.last_checkpoint_time = 0
        self.checkpoint_id = 0

    def should_checkpoint(self) -> bool:
        """Check if it's time to create a checkpoint"""
        return time.time() - self.last_checkpoint_time >= self.interval_seconds

    def create_checkpoint(self, state: Dict[str, Any], offset: int) -> int:
        """
        Create a checkpoint by saving state and offset
        Returns checkpoint ID
        """
        self.checkpoint_id += 1
        checkpoint_path = self.checkpoint_dir / f"checkpoint-{self.checkpoint_id}"
        checkpoint_path.mkdir(parents=True, exist_ok=True)

        # Save application state
        state_file = checkpoint_path / "state.json"
        with open(state_file, 'w') as f:
            json.dump(state, f, indent=2)

        # Save stream offset
        offset_file = checkpoint_path / "offset.txt"
        with open(offset_file, 'w') as f:
            f.write(str(offset))

        # Save checkpoint metadata
        metadata = {
            'checkpoint_id': self.checkpoint_id,
            'timestamp': time.time(),
            'offset': offset,
            'state_size': state_file.stat().st_size,
            'checksum': self._compute_checksum(state_file)
        }

        metadata_file = checkpoint_path / "metadata.json"
        with open(metadata_file, 'w') as f:
            json.dump(metadata, f, indent=2)

        # Update last checkpoint pointer
        latest_file = self.checkpoint_dir / "_latest"
        with open(latest_file, 'w') as f:
            f.write(str(self.checkpoint_id))

        self.last_checkpoint_time = time.time()

        print(f"✓ Checkpoint {self.checkpoint_id} created: "
              f"offset={offset}, state_size={metadata['state_size']} bytes")

        return self.checkpoint_id

    def restore_latest_checkpoint(self) -> tuple[Dict[str, Any], int]:
        """
        Restore state and offset from latest checkpoint
        Returns (state, offset) tuple
        """
        latest_file = self.checkpoint_dir / "_latest"

        if not latest_file.exists():
            print("No checkpoint found, starting from beginning")
            return {}, 0

        # Read latest checkpoint ID
        with open(latest_file, 'r') as f:
            checkpoint_id = int(f.read().strip())

        checkpoint_path = self.checkpoint_dir / f"checkpoint-{checkpoint_id}"

        # Verify checkpoint integrity
        metadata_file = checkpoint_path / "metadata.json"
        with open(metadata_file, 'r') as f:
            metadata = json.load(f)

        state_file = checkpoint_path / "state.json"
        checksum = self._compute_checksum(state_file)

        if checksum != metadata['checksum']:
            raise Exception(f"Checkpoint {checkpoint_id} corrupted! "
                          f"Checksum mismatch")

        # Restore state
        with open(state_file, 'r') as f:
            state = json.load(f)

        # Restore offset
        offset_file = checkpoint_path / "offset.txt"
        with open(offset_file, 'r') as f:
            offset = int(f.read().strip())

        self.checkpoint_id = checkpoint_id

        print(f"✓ Restored checkpoint {checkpoint_id}: "
              f"offset={offset}, state_size={len(state)} items")

        return state, offset

    def _compute_checksum(self, file_path: Path) -> str:
        """Compute SHA256 checksum of file"""
        sha256 = hashlib.sha256()
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(4096), b''):
                sha256.update(chunk)
        return sha256.hexdigest()

    def cleanup_old_checkpoints(self, keep_last: int = 3):
        """Remove old checkpoints, keeping only the last N"""
        checkpoints = sorted(
            [d for d in self.checkpoint_dir.iterdir() if d.is_dir()],
            key=lambda d: int(d.name.split('-')[1])
        )

        # Remove old checkpoints
        for checkpoint in checkpoints[:-keep_last]:
            print(f"Removing old checkpoint: {checkpoint.name}")
            for file in checkpoint.iterdir():
                file.unlink()
            checkpoint.rmdir()

# Stream Processor with Checkpointing
class StreamProcessor:
    """
    Stream processor with checkpoint support
    """
    def __init__(self, checkpoint_dir: str):
        self.checkpoint_manager = CheckpointManager(checkpoint_dir)
        self.state: Dict[str, int] = {}  # word -> count
        self.current_offset = 0

        # Restore from last checkpoint
        self.state, self.current_offset = \
            self.checkpoint_manager.restore_latest_checkpoint()

    def process_message(self, message: str):
        """Process a single message (word count)"""
        words = message.lower().split()

        for word in words:
            self.state[word] = self.state.get(word, 0) + 1

        self.current_offset += 1

        # Checkpoint if interval elapsed
        if self.checkpoint_manager.should_checkpoint():
            self.checkpoint()

    def checkpoint(self):
        """Create a checkpoint"""
        self.checkpoint_manager.create_checkpoint(
            self.state,
            self.current_offset
        )

    def get_word_count(self, word: str) -> int:
        """Query current state"""
        return self.state.get(word, 0)

# Usage Example
if __name__ == '__main__':
    import sys

    processor = StreamProcessor('/tmp/checkpoints')

    # Simulate stream processing
    messages = [
        "hello world",
        "hello kafka",
        "stream processing",
        "checkpoint test",
        "hello checkpoint"
    ]

    print("\n=== Processing Stream ===")
    for i, msg in enumerate(messages):
        print(f"Message {i}: {msg}")
        processor.process_message(msg)

        # Simulate checkpoint every 2 messages
        if (i + 1) % 2 == 0:
            processor.checkpoint()

    # Query state
    print("\n=== Query State ===")
    print(f"Count of 'hello': {processor.get_word_count('hello')}")
    print(f"Count of 'checkpoint': {processor.get_word_count('checkpoint')}")

    # Simulate crash and recovery
    print("\n=== Simulating Crash and Recovery ===")
    del processor  # "crash"

    processor_recovered = StreamProcessor('/tmp/checkpoints')
    print(f"Count of 'hello' after recovery: "
          f"{processor_recovered.get_word_count('hello')}")
    print(f"Current offset after recovery: "
          f"{processor_recovered.current_offset}")

    # Cleanup
    processor_recovered.checkpoint_manager.cleanup_old_checkpoints(keep_last=2)
from dataclasses import dataclass
from typing import Union, List
import queue

@dataclass
class Message:
    """Regular stream message"""
    data: str
    offset: int

@dataclass
class Barrier:
    """Checkpoint barrier"""
    checkpoint_id: int

class BarrierBuffer:
    """
    Buffer messages until all input barriers received
    (for operators with multiple inputs)
    """
    def __init__(self, num_inputs: int):
        self.num_inputs = num_inputs
        self.pending_checkpoint = None
        self.received_barriers = set()
        self.buffered_messages = {i: [] for i in range(num_inputs)}

    def process(self, message: Union[Message, Barrier], input_id: int):
        """
        Process message or barrier from input stream
        Returns: (messages_to_process, should_checkpoint)
        """
        if isinstance(message, Barrier):
            if self.pending_checkpoint is None:
                # First barrier for this checkpoint
                self.pending_checkpoint = message.checkpoint_id
                self.received_barriers = {input_id}

                # Return buffered messages from this input
                buffered = self.buffered_messages[input_id]
                self.buffered_messages[input_id] = []
                return buffered, False

            elif message.checkpoint_id == self.pending_checkpoint:
                # Another barrier for same checkpoint
                self.received_barriers.add(input_id)

                # Return buffered messages from this input
                buffered = self.buffered_messages[input_id]
                self.buffered_messages[input_id] = []

                # All barriers received?
                if len(self.received_barriers) == self.num_inputs:
                    # Checkpoint!
                    self.pending_checkpoint = None
                    self.received_barriers = set()
                    return buffered, True
                else:
                    return buffered, False

        else:  # Regular message
            if self.pending_checkpoint is not None and \
               input_id not in self.received_barriers:
                # Buffer message (barrier not yet received from this input)
                self.buffered_messages[input_id].append(message)
                return [], False
            else:
                # Process message immediately
                return [message], False

class CheckpointedOperator:
    """
    Stream operator with checkpoint support
    """
    def __init__(self, num_inputs: int = 1):
        self.state = {}
        self.barrier_buffer = BarrierBuffer(num_inputs)
        self.checkpoint_manager = CheckpointManager('/tmp/operator-checkpoints')

    def on_message(self, message: Union[Message, Barrier], input_id: int = 0):
        """Handle incoming message or barrier"""
        messages, should_checkpoint = \
            self.barrier_buffer.process(message, input_id)

        # Process buffered/immediate messages
        for msg in messages:
            self._process_data(msg.data)

        # Checkpoint if all barriers received
        if should_checkpoint:
            self._checkpoint()

    def _process_data(self, data: str):
        """Application logic (word count)"""
        self.state[data] = self.state.get(data, 0) + 1

    def _checkpoint(self):
        """Save state to checkpoint"""
        print(f"→ Operator checkpointing: state size = {len(self.state)}")
        self.checkpoint_manager.create_checkpoint(self.state, 0)

# Example: Two-input operator (join)
if __name__ == '__main__':
    operator = CheckpointedOperator(num_inputs=2)

    # Stream 1 messages
    operator.on_message(Message("hello", 0), input_id=0)
    operator.on_message(Message("world", 1), input_id=0)
    operator.on_message(Barrier(checkpoint_id=1), input_id=0)

    # Stream 2 messages (delayed barrier)
    operator.on_message(Message("kafka", 0), input_id=1)
    operator.on_message(Message("flink", 1), input_id=1)

    # This completes the checkpoint
    operator.on_message(Barrier(checkpoint_id=1), input_id=1)

    print(f"Final state: {operator.state}")

Prerequisites:

Related Concepts:

Used In Systems:

  • Apache Flink: Distributed checkpointing with barriers
  • Spark Structured Streaming: Micro-batch checkpointing
  • Kafka Streams: State stores with changelog

Explained In Detail:

  • Stream Processing Deep Dive - Checkpointing implementation details

See It In Action

Quick Self-Check

  • Can explain checkpointing in 60 seconds?
  • Understand how checkpoint barriers work?
  • Know the trade-offs between checkpoint frequencies?
  • Can explain how checkpointing enables exactly-once semantics?
  • Understand incremental vs full checkpointing?
  • Can design a checkpoint strategy for given requirements?
Interview Notes
💼60% of streaming interviews
Interview Relevance
60% of streaming interviews
🏭Flink, Spark, Kafka Streams
Production Impact
Powers systems at Flink, Spark, Kafka Streams
Fast recovery
Performance
Fast recovery query improvement
📈Exactly-once semantics
Scalability
Exactly-once semantics