Periodically saving processing state to enable recovery from failures without reprocessing all data from the beginning
60% of streaming interviews
Powers systems at Flink, Spark, Kafka Streams
Fast recovery query improvement
Exactly-once semantics
TL;DR
Checkpointing is the process of periodically saving the processing state of a stream processing application, enabling it to resume from the last checkpoint rather than restart from scratch after failures. Essential for exactly-once semantics in systems like Apache Flink, Spark Streaming, and Kafka Streams.
Visual Overview
WITHOUT CHECKPOINTING (Restart from beginning)
ββββββββββββββββββββββββββββββββββββββββββββββββββ
β Stream Processing Job β
β β
β T0: Start processing from offset 0 β
β T1: Processed 1,000,000 messages β
β T2: Processed 5,000,000 messages β
β T3: CRASH! β‘ β
β T4: Restart from offset 0 β β
β β Reprocess all 5,000,000 messages β
β β Hours of lost work β
β β Possible duplicate outputs β
ββββββββββββββββββββββββββββββββββββββββββββββββββ
WITH CHECKPOINTING (Resume from last checkpoint)
ββββββββββββββββββββββββββββββββββββββββββββββββββ
β Stream Processing Job with Checkpoints β
β β
β T0: Start, checkpoint offset=0 β
β T1: Process 1M messages, checkpoint offset=1M β
β T2: Process 4M more, checkpoint offset=5M β
β T3: CRASH! β‘ β
β T4: Restore from checkpoint β β
β β Resume from offset=5M β
β β Only reprocess since last checkpoint β
β β Fast recovery (seconds vs hours) β
β β No duplicates (exactly-once) β
ββββββββββββββββββββββββββββββββββββββββββββββββββ
CHECKPOINT LIFECYCLE
ββββββββββββββββββββββββββββββββββββββββββββββββββ
β Checkpoint Coordinator β
β β β
β 1. Trigger Checkpoint β
β Insert barrier into stream β
β β β
β Stream: [msg1, msg2, BARRIER, msg3, msg4] β
β β β
β 2. Operator receives barrier β
β - Save current state to storage β
β - Save current offset β
β - Acknowledge checkpoint β
β β β
β 3. All operators acknowledged? β
β YES β Checkpoint COMPLETE β β
β NO β Wait or timeout β
β β β
β 4. Commit checkpoint metadata β
β Store: { β
β checkpoint_id: 123 β
β timestamp: T β
β state_location: s3://... β
β offsets: {partition0: 5000, ...} β
β } β
ββββββββββββββββββββββββββββββββββββββββββββββββββ
RECOVERY FLOW
ββββββββββββββββββββββββββββββββββββββββββββββββββ
β Job Failure Detected β
β β β
β 1. Find Latest Successful Checkpoint β
β checkpoint_id: 123 β
β timestamp: 2 minutes ago β
β β β
β 2. Restore Operator State β
β Load state from: s3://checkpoint-123/ β
β - Counters, windows, caches β
β - Application state β
β β β
β 3. Reset Stream Positions β
β Kafka offsets: {partition0: 5000, ...} β
β β β
β 4. Resume Processing β
β Start from checkpoint offsets β
β Reprocess messages since checkpoint β
β β
β Recovery Time Objective (RTO): β
β - Checkpoint interval: 1 minute β
β - State restore: 10 seconds β
β - Reprocess: 1 minute of data β
β Total: ~90 seconds downtime β
ββββββββββββββββββββββββββββββββββββββββββββββββββ
Core Explanation
What is Checkpointing?
Checkpointing is a fault-tolerance mechanism that periodically saves the state of a distributed computation to durable storage. For stream processing, this includes:
- Application State: Counters, aggregations, windows, caches
- Input Positions: Kafka offsets, file positions, database cursors
- Metadata: Checkpoint ID, timestamp, version
Key Property: Consistency
A checkpoint must capture a consistent snapshot across all parallel operators, ensuring:
- No messages are lost (completeness)
- No messages are duplicated (exactly-once)
- State is consistent across operators
Checkpoint Barriers (Chandy-Lamport Algorithm)
Stream processing systems use barriers to coordinate consistent snapshots without stopping the stream:
How Barriers Work:
Source Operator injects barriers into stream:
ββββββββββββββββββββββββββββββββββββββββββ
β Kafka Topic β
β [msg1, msg2, msg3, BARRIER_10, msg4] β
β β β β β
β Operator A processes messages β
β When BARRIER_10 arrives: β
β 1. Save state (e.g., count=150) β
β 2. Forward barrier downstream β
β 3. Continue processing β
ββββββββββββββββββββββββββββββββββββββββββ
Multiple Input Streams (Barrier Alignment):
ββββββββββββββββββββββββββββββββββββββββββ
β Stream 1: [msg1, BARRIER_10, msg2] β
β Stream 2: [msg3, msg4, BARRIER_10] β
β β β β
β Join Operator β
β β β
β Wait for BARRIER_10 from BOTH streams β
β (buffer messages from faster stream) β
β β β
β All barriers received β Save state β
ββββββββββββββββββββββββββββββββββββββββββ
Why Barriers?
β No need to pause the stream (live checkpointing)
β Consistent snapshot across distributed operators
β Minimal impact on throughput (<5% overhead)
Checkpoint Interval Trade-offs
Frequent Checkpoints (e.g., every 10 seconds):
Pros:
β Fast recovery (less data to reprocess)
β Small recovery window
Cons:
β Higher overhead (I/O, CPU for serialization)
β More storage costs
β Can slow down processing
Infrequent Checkpoints (e.g., every 10 minutes):
Pros:
β Lower overhead
β Better throughput
Cons:
β Slower recovery (more data to reprocess)
β Larger recovery window
β Higher risk of data loss
Recommended: 1-5 minutes for most applications
Checkpoint Storage
Where to Store Checkpoints:
1. Distributed File Systems:
- HDFS, S3, GCS, Azure Blob
- Pros: Scalable, durable, cost-effective
- Cons: Higher latency (~100ms writes)
- Use case: Production systems
2. Distributed Databases:
- RocksDB (local + replicated)
- Cassandra, DynamoDB
- Pros: Fast writes (~10ms)
- Cons: Higher cost, operational complexity
- Use case: Low-latency requirements
3. In-Memory (with replication):
- Redis, Memcached with persistence
- Pros: Very fast (~1ms)
- Cons: Limited capacity, expensive
- Use case: Small state, ultra-low latency
Storage Structure:
checkpoint_dir/
βββ checkpoint-000010/
β βββ operator-state-0
β βββ operator-state-1
β βββ metadata
βββ checkpoint-000011/
β βββ ...
βββ _metadata (latest checkpoint info)
Incremental Checkpointing
Full Checkpoint (naive approach):
Every checkpoint saves entire state
State size: 10 GB
Checkpoint interval: 1 minute
I/O: 10 GB/minute = 166 MB/s β (expensive!)
Incremental Checkpoint (RocksDB-based):
Only save changed state since last checkpoint
ββββββββββββββββββββββββββββββββββββββββββ
β Checkpoint 10: Save full state (10GB) β
β Checkpoint 11: Save delta (100MB) β
β Checkpoint 12: Save delta (150MB) β
β Checkpoint 13: Save delta (120MB) β
β β β
β Recovery: Restore checkpoint 10 β
β + Apply deltas 11,12,13 β
ββββββββββββββββββββββββββββββββββββββββββ
Benefits:
β Reduce I/O by 10-100x
β Faster checkpoint completion
β Lower storage costs
Implementation: Apache Flink RocksDB state backend
Exactly-Once Semantics with Checkpointing
How Checkpointing Enables Exactly-Once:
1. Atomic Commits:
ββββββββββββββββββββββββββββββββββββββββββ
β Process messages 1-100 β
β Update state (count += 100) β
β β β
β Checkpoint: β
β - Save state: count=100 β
β - Save offset: 100 β
β - Commit offset to Kafka (atomic) β
β β β
β If crash before checkpoint: β
β β Restore offset=0, count=0 β
β β Reprocess messages 1-100 β β
β β
β If crash after checkpoint: β
β β Restore offset=100, count=100 β
β β Skip messages 1-100 β β
ββββββββββββββββββββββββββββββββββββββββββ
2. Two-Phase Commit (for sinks):
ββββββββββββββββββββββββββββββββββββββββββ
β Phase 1: Pre-commit β
β - Write to staging table β
β - Don't make visible yet β
β β β
β Checkpoint completes β β
β β β
β Phase 2: Commit β
β - Move from staging to production β
β - Make writes visible β
β β
β If crash before Phase 2: β
β β Staging writes discarded β β
β β Reprocess and retry β
ββββββββββββββββββββββββββββββββββββββββββ
Real Systems Using Checkpointing
System | Checkpoint Mechanism | Default Interval | State Backend | Use Case |
---|---|---|---|---|
Apache Flink | Chandy-Lamport barriers | Disabled (manual) | RocksDB, Heap | Real-time analytics, ETL |
Spark Structured Streaming | Micro-batch checkpointing | 5 seconds | HDFS, S3 | Batch + streaming |
Kafka Streams | State stores + offset commits | 30 seconds | RocksDB | Stream processing |
Apache Storm | Record-level acking | N/A (acking-based) | N/A | Low-latency streaming |
Apache Samza | Changelog-based | 1 minute | RocksDB + Kafka | Stateful streaming |
Case Study: Apache Flink Checkpointing
Flink Checkpoint Configuration:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing every 60 seconds
env.enableCheckpointing(60000);
// Checkpoint configuration
CheckpointConfig config = env.getCheckpointConfig();
// Exactly-once mode (vs at-least-once)
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Minimum pause between checkpoints (prevent overload)
config.setMinPauseBetweenCheckpoints(30000);
// Checkpoint timeout (fail if takes too long)
config.setCheckpointTimeout(600000);
// Max concurrent checkpoints
config.setMaxConcurrentCheckpoints(1);
// Retain checkpoints on cancellation (for savepoints)
config.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// State backend (where to store state)
env.setStateBackend(new RocksDBStateBackend("s3://checkpoints/"));
Checkpoint Flow in Flink:
ββββββββββββββββββββββββββββββββββββββββββ
β 1. JobManager triggers checkpoint β
β - Increment checkpoint ID β
β - Send trigger to sources β
β β β
β 2. Sources inject barriers β
β - Insert barrier into stream β
β - Save current offset β
β β β
β 3. Barriers flow through operators β
β - Each operator saves state β
β - Acknowledges checkpoint β
β β β
β 4. Sinks receive barriers β
β - Pre-commit external writes β
β - Acknowledge checkpoint β
β β β
β 5. JobManager collects acks β
β - All tasks acknowledged? β β
β - Commit checkpoint metadata β
β - Finalize external writes β
ββββββββββββββββββββββββββββββββββββββββββ
Recovery in Flink:
1. Job fails (e.g., task exception, node crash)
2. JobManager restarts job from latest checkpoint
3. All tasks restore state from checkpoint
4. Sources reset to checkpoint offsets
5. Processing resumes (exactly-once guaranteed)
Case Study: Kafka Streams Checkpointing
Kafka Streams uses:
- Local RocksDB for state storage
- Kafka topics for changelog (state replication)
- Kafka offsets for position tracking
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
config.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
// Commit interval (checkpoint frequency)
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
// State store configuration
config.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 600000);
How it works:
ββββββββββββββββββββββββββββββββββββββββββ
β State Changes: β
β 1. Write to local RocksDB β
β 2. Write to changelog topic (Kafka) β
β 3. Commit offset every 30s β
β β β
β Recovery: β
β 1. Restore RocksDB from changelog β
β 2. Resume from committed offset β
β 3. Replay uncommitted messages β
ββββββββββββββββββββββββββββββββββββββββββ
Exactly-once in Kafka Streams (EOS):
- Transactional writes to output topics
- Atomic offset commits
- Changelog updates in same transaction
- Result: End-to-end exactly-once
When to Use Checkpointing
β Perfect Use Cases
Long-Running Stream Processing Jobs
Scenario: Real-time analytics running 24/7
Requirement: No data loss, exactly-once semantics
Solution: Checkpoint every 1 minute to S3
Benefit: Recover from failures in < 2 minutes
Stateful Aggregations
Scenario: Counting events per user over 24-hour windows
State size: 100 GB
Solution: Incremental checkpointing with RocksDB
Benefit: Preserve state across restarts, avoid recomputation
Complex Event Processing
Scenario: Multi-stage pipeline with joins, enrichment
Requirement: Consistent state across operators
Solution: Distributed checkpointing with barriers
Benefit: Consistent snapshots without pausing stream
β When NOT to Use (or Use Carefully)
Stateless Processing
Problem: No state to checkpoint, pure transformation
Example: Filter, map, simple parsing
Alternative: Just reprocess from source (no checkpointing overhead)
Ultra-Low Latency Requirements (< 1ms)
Problem: Checkpointing adds latency (barrier alignment)
Alternative: At-least-once processing with deduplication
Example: High-frequency trading, real-time bidding
Small Batch Jobs (< 1 minute)
Problem: Checkpoint overhead > job duration
Alternative: Just rerun the job on failure
Example: Scheduled micro-batch jobs
Interview Application
Common Interview Question
Q: βDesign a real-time fraud detection system processing millions of transactions/second. How would you ensure exactly-once processing and fast recovery from failures?β
Strong Answer:
βIβd design a checkpointed stream processing system:
Architecture:
- Framework: Apache Flink (for exactly-once guarantees)
- State Backend: RocksDB with incremental checkpoints
- Checkpoint Storage: S3 (durable, scalable)
- Checkpoint Interval: 1 minute (balance overhead vs recovery time)
State Management:
- User Profile Cache: Latest transactions per user (for pattern detection)
- Fraud Rules State: Configurable thresholds, ML model parameters
- Windows: 5-minute sliding windows for transaction aggregations
- State Size Estimate: 50 GB (10M users Γ 5KB profile)
Checkpointing Strategy:
- Incremental Checkpoints:
- Full checkpoint hourly
- Incremental deltas every minute
- Reduces I/O from 50 GB/min to ~500 MB/min
- Asynchronous Snapshots:
- Checkpoint in background threads
- Minimal impact on throughput (<5%)
- Barrier Alignment:
- Use Chandy-Lamport barriers for consistency
- Handle out-of-order messages correctly
Exactly-Once Guarantees:
- Source: Read from Kafka with transactional semantics
- Processing: Flink exactly-once mode
- Sink: Two-phase commit to output database
- Pre-commit during checkpoint
- Finalize after checkpoint complete
Recovery Flow:
- Failure Detection: Flink JobManager detects task failure
- Checkpoint Restore:
- Restore state from latest checkpoint (S3)
- Reset Kafka offsets to checkpoint position
- Time: ~30 seconds
- Reprocessing:
- Replay 1 minute of messages since checkpoint
- Time: ~30 seconds (depends on throughput)
- Total RTO: ~90 seconds
Optimization:
- Local State: RocksDB on SSD for fast access
- State Sharding: Partition state by user_id (key-by user_id)
- Compression: Enable Snappy compression for checkpoints
- Monitoring: Alert on checkpoint duration > 60s
Trade-offs:
- Checkpoint interval: 1 min = 5% overhead, 90s recovery
- Faster checkpoints (30s): 8% overhead, 60s recovery
- Slower checkpoints (5 min): 2% overhead, 5-6 min recovery
- Chose 1 min as balance for fraud detection SLA
Disaster Recovery:
- Savepoints: Manual checkpoints for version upgrades
- Cross-region replication: Replicate checkpoints to DR region
- Retention: Keep last 10 checkpoints (10 hours history)β
Code Example
Simple Checkpointing Implementation
import time
import json
import hashlib
from typing import Dict, Any
from pathlib import Path
class CheckpointManager:
"""
Simple checkpointing system for stream processing
"""
def __init__(self, checkpoint_dir: str, interval_seconds: int = 60):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
self.interval_seconds = interval_seconds
self.last_checkpoint_time = 0
self.checkpoint_id = 0
def should_checkpoint(self) -> bool:
"""Check if it's time to create a checkpoint"""
return time.time() - self.last_checkpoint_time >= self.interval_seconds
def create_checkpoint(self, state: Dict[str, Any], offset: int) -> int:
"""
Create a checkpoint by saving state and offset
Returns checkpoint ID
"""
self.checkpoint_id += 1
checkpoint_path = self.checkpoint_dir / f"checkpoint-{self.checkpoint_id}"
checkpoint_path.mkdir(parents=True, exist_ok=True)
# Save application state
state_file = checkpoint_path / "state.json"
with open(state_file, 'w') as f:
json.dump(state, f, indent=2)
# Save stream offset
offset_file = checkpoint_path / "offset.txt"
with open(offset_file, 'w') as f:
f.write(str(offset))
# Save checkpoint metadata
metadata = {
'checkpoint_id': self.checkpoint_id,
'timestamp': time.time(),
'offset': offset,
'state_size': state_file.stat().st_size,
'checksum': self._compute_checksum(state_file)
}
metadata_file = checkpoint_path / "metadata.json"
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
# Update last checkpoint pointer
latest_file = self.checkpoint_dir / "_latest"
with open(latest_file, 'w') as f:
f.write(str(self.checkpoint_id))
self.last_checkpoint_time = time.time()
print(f"β Checkpoint {self.checkpoint_id} created: "
f"offset={offset}, state_size={metadata['state_size']} bytes")
return self.checkpoint_id
def restore_latest_checkpoint(self) -> tuple[Dict[str, Any], int]:
"""
Restore state and offset from latest checkpoint
Returns (state, offset) tuple
"""
latest_file = self.checkpoint_dir / "_latest"
if not latest_file.exists():
print("No checkpoint found, starting from beginning")
return {}, 0
# Read latest checkpoint ID
with open(latest_file, 'r') as f:
checkpoint_id = int(f.read().strip())
checkpoint_path = self.checkpoint_dir / f"checkpoint-{checkpoint_id}"
# Verify checkpoint integrity
metadata_file = checkpoint_path / "metadata.json"
with open(metadata_file, 'r') as f:
metadata = json.load(f)
state_file = checkpoint_path / "state.json"
checksum = self._compute_checksum(state_file)
if checksum != metadata['checksum']:
raise Exception(f"Checkpoint {checkpoint_id} corrupted! "
f"Checksum mismatch")
# Restore state
with open(state_file, 'r') as f:
state = json.load(f)
# Restore offset
offset_file = checkpoint_path / "offset.txt"
with open(offset_file, 'r') as f:
offset = int(f.read().strip())
self.checkpoint_id = checkpoint_id
print(f"β Restored checkpoint {checkpoint_id}: "
f"offset={offset}, state_size={len(state)} items")
return state, offset
def _compute_checksum(self, file_path: Path) -> str:
"""Compute SHA256 checksum of file"""
sha256 = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
sha256.update(chunk)
return sha256.hexdigest()
def cleanup_old_checkpoints(self, keep_last: int = 3):
"""Remove old checkpoints, keeping only the last N"""
checkpoints = sorted(
[d for d in self.checkpoint_dir.iterdir() if d.is_dir()],
key=lambda d: int(d.name.split('-')[1])
)
# Remove old checkpoints
for checkpoint in checkpoints[:-keep_last]:
print(f"Removing old checkpoint: {checkpoint.name}")
for file in checkpoint.iterdir():
file.unlink()
checkpoint.rmdir()
# Stream Processor with Checkpointing
class StreamProcessor:
"""
Stream processor with checkpoint support
"""
def __init__(self, checkpoint_dir: str):
self.checkpoint_manager = CheckpointManager(checkpoint_dir)
self.state: Dict[str, int] = {} # word -> count
self.current_offset = 0
# Restore from last checkpoint
self.state, self.current_offset = \
self.checkpoint_manager.restore_latest_checkpoint()
def process_message(self, message: str):
"""Process a single message (word count)"""
words = message.lower().split()
for word in words:
self.state[word] = self.state.get(word, 0) + 1
self.current_offset += 1
# Checkpoint if interval elapsed
if self.checkpoint_manager.should_checkpoint():
self.checkpoint()
def checkpoint(self):
"""Create a checkpoint"""
self.checkpoint_manager.create_checkpoint(
self.state,
self.current_offset
)
def get_word_count(self, word: str) -> int:
"""Query current state"""
return self.state.get(word, 0)
# Usage Example
if __name__ == '__main__':
import sys
processor = StreamProcessor('/tmp/checkpoints')
# Simulate stream processing
messages = [
"hello world",
"hello kafka",
"stream processing",
"checkpoint test",
"hello checkpoint"
]
print("\n=== Processing Stream ===")
for i, msg in enumerate(messages):
print(f"Message {i}: {msg}")
processor.process_message(msg)
# Simulate checkpoint every 2 messages
if (i + 1) % 2 == 0:
processor.checkpoint()
# Query state
print("\n=== Query State ===")
print(f"Count of 'hello': {processor.get_word_count('hello')}")
print(f"Count of 'checkpoint': {processor.get_word_count('checkpoint')}")
# Simulate crash and recovery
print("\n=== Simulating Crash and Recovery ===")
del processor # "crash"
processor_recovered = StreamProcessor('/tmp/checkpoints')
print(f"Count of 'hello' after recovery: "
f"{processor_recovered.get_word_count('hello')}")
print(f"Current offset after recovery: "
f"{processor_recovered.current_offset}")
# Cleanup
processor_recovered.checkpoint_manager.cleanup_old_checkpoints(keep_last=2)
Flink-Style Checkpointing with Barriers
from dataclasses import dataclass
from typing import Union, List
import queue
@dataclass
class Message:
"""Regular stream message"""
data: str
offset: int
@dataclass
class Barrier:
"""Checkpoint barrier"""
checkpoint_id: int
class BarrierBuffer:
"""
Buffer messages until all input barriers received
(for operators with multiple inputs)
"""
def __init__(self, num_inputs: int):
self.num_inputs = num_inputs
self.pending_checkpoint = None
self.received_barriers = set()
self.buffered_messages = {i: [] for i in range(num_inputs)}
def process(self, message: Union[Message, Barrier], input_id: int):
"""
Process message or barrier from input stream
Returns: (messages_to_process, should_checkpoint)
"""
if isinstance(message, Barrier):
if self.pending_checkpoint is None:
# First barrier for this checkpoint
self.pending_checkpoint = message.checkpoint_id
self.received_barriers = {input_id}
# Return buffered messages from this input
buffered = self.buffered_messages[input_id]
self.buffered_messages[input_id] = []
return buffered, False
elif message.checkpoint_id == self.pending_checkpoint:
# Another barrier for same checkpoint
self.received_barriers.add(input_id)
# Return buffered messages from this input
buffered = self.buffered_messages[input_id]
self.buffered_messages[input_id] = []
# All barriers received?
if len(self.received_barriers) == self.num_inputs:
# Checkpoint!
self.pending_checkpoint = None
self.received_barriers = set()
return buffered, True
else:
return buffered, False
else: # Regular message
if self.pending_checkpoint is not None and \
input_id not in self.received_barriers:
# Buffer message (barrier not yet received from this input)
self.buffered_messages[input_id].append(message)
return [], False
else:
# Process message immediately
return [message], False
class CheckpointedOperator:
"""
Stream operator with checkpoint support
"""
def __init__(self, num_inputs: int = 1):
self.state = {}
self.barrier_buffer = BarrierBuffer(num_inputs)
self.checkpoint_manager = CheckpointManager('/tmp/operator-checkpoints')
def on_message(self, message: Union[Message, Barrier], input_id: int = 0):
"""Handle incoming message or barrier"""
messages, should_checkpoint = \
self.barrier_buffer.process(message, input_id)
# Process buffered/immediate messages
for msg in messages:
self._process_data(msg.data)
# Checkpoint if all barriers received
if should_checkpoint:
self._checkpoint()
def _process_data(self, data: str):
"""Application logic (word count)"""
self.state[data] = self.state.get(data, 0) + 1
def _checkpoint(self):
"""Save state to checkpoint"""
print(f"β Operator checkpointing: state size = {len(self.state)}")
self.checkpoint_manager.create_checkpoint(self.state, 0)
# Example: Two-input operator (join)
if __name__ == '__main__':
operator = CheckpointedOperator(num_inputs=2)
# Stream 1 messages
operator.on_message(Message("hello", 0), input_id=0)
operator.on_message(Message("world", 1), input_id=0)
operator.on_message(Barrier(checkpoint_id=1), input_id=0)
# Stream 2 messages (delayed barrier)
operator.on_message(Message("kafka", 0), input_id=1)
operator.on_message(Message("flink", 1), input_id=1)
# This completes the checkpoint
operator.on_message(Barrier(checkpoint_id=1), input_id=1)
print(f"Final state: {operator.state}")
Related Content
Prerequisites:
- Distributed Systems Basics - Foundation concepts
Related Concepts:
- Offset Management - Tracking stream positions
- Exactly-Once Semantics - Delivery guarantees
- Write-Ahead Log - Related durability mechanism
Used In Systems:
- Apache Flink: Distributed checkpointing with barriers
- Spark Structured Streaming: Micro-batch checkpointing
- Kafka Streams: State stores with changelog
Explained In Detail:
- Stream Processing Deep Dive - Checkpointing implementation details
Quick Self-Check
- Can explain checkpointing in 60 seconds?
- Understand how checkpoint barriers work?
- Know the trade-offs between checkpoint frequencies?
- Can explain how checkpointing enables exactly-once semantics?
- Understand incremental vs full checkpointing?
- Can design a checkpoint strategy for given requirements?