Table of Contents
Open Table of Contents
Core Architecture Overview
What is Kafka?
Apache Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerant, real-time data pipelines. It acts as a commit log service that stores and forwards messages between producers and consumers.
Key Value Proposition:
- High Throughput: Millions of messages per second
- Low Latency: Sub-millisecond for 99th percentile
- Durability: Messages persisted to disk with replication
- Scalability: Horizontal scaling across multiple machines
- Fault Tolerance: Continues operating despite node failures
Core Components
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ PRODUCERS │ │ KAFKA │ │ CONSUMERS │
│ │ │ CLUSTER │ │ │
│ Applications │───▶│ │───▶│ Applications │
│ Services │ │ Topic-Based │ │ Services │
│ Systems │ │ Pub-Sub │ │ Analytics │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Kafka Cluster Architecture
KAFKA CLUSTER
┌─────────────────────────────────────┐
│ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │
│ │ Leader │ │ Follower │ │
│ │ │ │ │ │
│ └──────────┘ └──────────┘ │
│ │ │ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Broker 3 │ │ZooKeeper │ │
│ │ Follower │ │or KRaft │ │
│ │ │ │ Metadata │ │
│ └──────────┘ └──────────┘ │
└─────────────────────────────────────┘
Basic Data Flow
- Producers send messages to specific topics
- Topics are divided into partitions for scalability
- Brokers store partition data across the cluster
- Consumers read messages from topics/partitions
- Consumer Groups coordinate parallel consumption
Interview Key Points
“Why Kafka over traditional message queues?”
- Persistence: Messages stored on disk, not just memory
- Replay: Consumers can re-read historical messages
- Multiple Consumers: Many consumers can read same data simultaneously
- Ordering: Guarantees order within partitions
- Scaling: Add partitions and brokers independently
“When would you use Kafka?”
- Event-driven microservices communication
- Real-time analytics and stream processing
- Log aggregation and centralized logging
- Change data capture (CDC) from databases
- Activity tracking and user behavior analytics
See Also: Topics & Partitions, Brokers
Topics, Partitions & Segments
Topic Organization
A topic is a logical grouping of related messages (e.g., “user-events”, “payment-transactions”). Topics provide the primary abstraction for organizing data streams.
TOPIC: user-events
├── Partition 0: [msg1][msg2][msg3][msg4]...
├── Partition 1: [msg5][msg6][msg7][msg8]...
├── Partition 2: [msg9][msg10][msg11][msg12]...
└── Partition 3: [msg13][msg14][msg15][msg16]...
Partition Mechanics
Partitions enable:
- Parallelism: Multiple consumers process different partitions simultaneously
- Scalability: Add partitions to increase throughput
- Ordering: Messages within a partition maintain strict order
- Distribution: Partitions spread across different brokers
Partition Assignment:
// Producer determines partition via:
// 1. Explicit partition specification
producer.send(new ProducerRecord<>("topic", partition, key, value));
// 2. Key-based hashing (most common)
producer.send(new ProducerRecord<>("topic", key, value));
// partition = hash(key) % num_partitions
// 3. Round-robin (no key)
producer.send(new ProducerRecord<>("topic", value));
Segment Storage Structure
Each partition is divided into segments for efficient storage and retrieval:
PARTITION 0 DIRECTORY:
├── 00000000000000000000.log (active segment)
├── 00000000000000000000.index (offset index)
├── 00000000000000000000.timeindex (time index)
├── 00000000000000100000.log (older segment)
├── 00000000000000100000.index
└── 00000000000000100000.timeindex
Segment Properties:
- Size-based rollover: New segment when current reaches size limit
- Time-based rollover: New segment after time period
- Immutable: Old segments are read-only, enabling efficient caching
- Cleanup: Old segments deleted based on retention policy
Storage Configuration Examples
Topic Creation with Partitioning Strategy:
# Create topic with strategic partition count
kafka-topics --create \
--topic user-events \
--partitions 12 \
--replication-factor 3 \
--config segment.ms=86400000 \
--config retention.ms=604800000
# Partition count considerations:
# - Start with (target_throughput_MB/s ÷ partition_throughput_MB/s)
# - Consider consumer parallelism (max consumers = partitions)
# - Account for hot partitions with uneven key distribution
Retention Policies:
# Time-based retention (delete after 7 days)
--config retention.ms=604800000
# Size-based retention (delete when topic exceeds 10GB)
--config retention.bytes=10737418240
# Log compaction (keep latest value per key)
--config cleanup.policy=compact
# Combined: compaction + time retention
--config cleanup.policy=compact,delete
--config retention.ms=604800000
Partition Strategy Decision Framework
Key-Based Partitioning:
Use when: Need ordering guarantees for related messages
Example: user_id → ensures all user events in same partition
Tradeoff: Risk of hot partitions with skewed data
Random/Round-Robin Partitioning:
Use when: Maximum throughput, no ordering requirements
Example: Logs, metrics without correlation needs
Tradeoff: No ordering guarantees across messages
Custom Partitioning:
public class CustomPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// Custom logic: geographic routing, load balancing, etc.
if (key.toString().startsWith("VIP_")) {
return 0; // VIP users get dedicated partition
}
return Utils.murmur2(keyBytes) % cluster.partitionCountForTopic(topic);
}
}
Sizing and Capacity Planning
Partition Sizing Guidelines:
Target: 25GB max per partition (for reasonable recovery times)
Formula: daily_data_GB ÷ retention_days ÷ partition_count < 25GB
Example Calculation:
- 1TB/day ingestion
- 7-day retention
- 7TB total storage needed
- 7000GB ÷ 25GB = 280 partitions minimum
Performance Implications:
- Too few partitions: Limits consumer parallelism and throughput
- Too many partitions: Increases metadata overhead, election time
- Sweet spot: Start conservative, monitor, and add partitions as needed
See Also: Producer Architecture, Consumer Groups
Brokers and Clustering
Broker Roles and Responsibilities
A broker is a Kafka server that stores data and serves client requests. In a cluster, brokers coordinate to provide distributed storage and fault tolerance.
BROKER RESPONSIBILITIES:
┌─────────────────────────────────────────┐
│ • Accept producer writes │
│ • Serve consumer reads │
│ • Replicate data to other brokers │
│ • Participate in leader election │
│ • Store and serve cluster metadata │
│ • Handle client connections │
└─────────────────────────────────────────┘
Cluster Coordination: ZooKeeper vs KRaft
Traditional ZooKeeper Mode:
KAFKA CLUSTER
┌─────────────────────┐ ┌──────────────┐
│ ┌──────┐ ┌──────┐ │ │ ZooKeeper │
│ │Broker│ │Broker│ │◄──►│ Ensemble │
│ │ 1 │ │ 2 │ │ │ │
│ └──────┘ └──────┘ │ │ • Metadata │
│ │ │ │ • Elections │
│ ┌──────┐ │ ┌──────┐ │ │ • Config │
│ │Broker│ │Broker│ │ │ • ACLs │
│ │ 3 │ │ 4 │ │ └──────────────┘
│ └──────┘ └──────┘ │
└─────────────────────┘
Modern KRaft Mode (KIP-500):
KAFKA CLUSTER (Self-Managing)
┌──────────────────────────────────────┐
│ ┌──────────┐ ┌──────────┐ │
│ │Controller│ │ Broker │ │
│ │+Broker 1 │ │ 2 │ │
│ └──────────┘ └──────────┘ │
│ │ │ │
│ ┌──────────┐ ┌──────────┐ │
│ │Controller│ │ Broker │ │
│ │+Broker 3 │ │ 4 │ │
│ └──────────┘ └──────────┘ │
│ │
│ Controllers = Metadata Management │
│ Brokers = Data Storage + Serving │
└──────────────────────────────────────┘
Leader Election and Partition Leadership
Partition Leadership Model:
TOPIC: orders, 3 partitions, replication-factor=3
Partition 0: Leader=Broker1, Followers=[Broker2, Broker3]
Partition 1: Leader=Broker2, Followers=[Broker1, Broker3]
Partition 2: Leader=Broker3, Followers=[Broker1, Broker2]
LEADER RESPONSIBILITIES:
├── Accept all writes for the partition
├── Serve reads (configurable)
├── Manage follower replication
└── Coordinate with controller for metadata changes
FOLLOWER RESPONSIBILITIES:
├── Replicate data from leader
├── Stay in-sync to be eligible for leadership
└── Take over as leader if current leader fails
Broker Configuration Examples
Essential Broker Settings:
# server.properties
# Unique broker identifier
broker.id=1
# Network and connectivity
listeners=PLAINTEXT://kafka1.company.com:9092
advertised.listeners=PLAINTEXT://kafka1.company.com:9092
# Storage configuration
log.dirs=/var/kafka-logs-1,/var/kafka-logs-2,/var/kafka-logs-3
num.network.threads=8
num.io.threads=16
# Replication settings
default.replication.factor=3
min.insync.replicas=2
# Log management
log.retention.hours=168
log.segment.bytes=1073741824
log.cleanup.policy=delete
# ZooKeeper connection (if not using KRaft)
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka
Production Hardening:
# JVM heap sizing (typically 6-10GB max)
export KAFKA_HEAP_OPTS="-Xmx8g -Xms8g"
# OS-level optimizations
# File descriptor limits: ulimit -n 100000
# Disable swap: vm.swappiness=1
# Network optimization
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.replica.fetchers=4
Failure Scenarios and Recovery
Broker Failure Handling:
1. BROKER FAILURE DETECTED
├── Controller notices missed heartbeat
├── Controller removes broker from cluster
└── Triggers partition leader re-election
2. PARTITION LEADER RE-ELECTION
├── Controller selects new leader from ISR
├── Updates metadata in all brokers
├── Notifies clients of leadership change
└── New leader starts accepting writes
3. FAILED BROKER RECOVERY
├── Restart broker process
├── Broker rejoins cluster
├── Catches up missing data from leaders
└── Becomes eligible for leadership again
Controller Election:
Controller Failure → New Controller Election
├── All brokers try to become controller
├── First to create /controller znode wins
├── New controller reads cluster state
├── Initiates leader election for affected partitions
└── Updates metadata across cluster
Cluster Expansion and Maintenance
Adding New Brokers:
# 1. Start new broker with unique broker.id
# 2. Create partition reassignment plan
kafka-reassign-partitions --generate \
--topics-to-move-json-file topics-to-move.json \
--broker-list "1,2,3,4" # including new broker
# 3. Execute reassignment
kafka-reassign-partitions --execute \
--reassignment-json-file reassignment-plan.json
# 4. Verify completion
kafka-reassign-partitions --verify \
--reassignment-json-file reassignment-plan.json
Rolling Restarts (Zero Downtime):
# For each broker:
# 1. Drain leadership (optional)
kafka-leader-election --election-type preferred \
--all-topic-partitions
# 2. Stop broker gracefully
kafka-server-stop
# 3. Apply configuration changes
# 4. Start broker
kafka-server-start server.properties
# 5. Verify broker rejoined and caught up
# 6. Proceed to next broker
Interview Scenarios
“How does Kafka handle a broker failure?”
- Controller detects failure via missed heartbeats
- Failed broker removed from ISR for its partitions
- New leaders elected from remaining ISR replicas
- Clients automatically discover new leaders
- Failed broker rejoins when recovered
“What happens if the controller fails?”
- New controller election among remaining brokers
- New controller reads full cluster state
- Continues managing leader elections and metadata
- Brief pause in metadata operations during election
See Also: Replication & Consistency, Operational Considerations
Producer Architecture
Producer Request Flow
The Kafka producer is responsible for publishing messages to topics with configurable reliability, performance, and ordering guarantees.
PRODUCER INTERNAL FLOW:
Application Code
│
▼
┌─────────────────┐
│ send() API │
│ │
└─────────┬───────┘
│
▼
┌─────────────────┐ ┌─────────────────┐
│ Serializer │───▶│ Partitioner │
│ (key & value) │ │ (select target │
│ │ │ partition) │
└─────────────────┘ └─────────┬───────┘
│
▼
┌─────────────────┐ ┌─────────────────┐
│ Record Buffer │◄───│ Record Batch │
│ (per partition) │ │ (accumulator) │
│ │ │ │
└─────────┬───────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐
│ Sender Thread │───▶│ Network I/O │
│ (background) │ │ (to brokers) │
│ │ │ │
└─────────────────┘ └─────────────────┘
Batching and Performance Optimization
Batch Configuration Strategy:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
// Batching controls
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB batches
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // Wait 10ms for batching
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB total buffer
// Compression (reduces network I/O)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // or snappy, gzip
// Parallel requests per connection
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
Batching Mechanics:
PRODUCER BATCHING TIMELINE:
T=0ms: [msg1] ──┐
T=2ms: [msg2] ──┤
T=5ms: [msg3] ──┼─► Batch (3 messages)
T=8ms: [msg4] ──┤
T=10ms: [msg5] ──┘ │
│ │
│ ▼
│ ┌─────────────────┐
│ │ Send when: │
│ │ • batch.size OR │
│ │ • linger.ms OR │
│ │ • buffer full │
│ └─────────────────┘
│
T=10ms: ├─► NETWORK SEND (batch of 5)
Acknowledgment Strategies and Reliability
ACK Configuration Levels:
// acks=0: Fire and forget (highest throughput, no durability)
props.put(ProducerConfig.ACKS_CONFIG, "0");
// acks=1: Leader acknowledgment (balanced)
props.put(ProducerConfig.ACKS_CONFIG, "1");
// acks=all: Full ISR acknowledgment (highest durability)
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.MIN_IN_SYNC_REPLICAS, 2); // At broker level
Reliability vs Performance Tradeoffs:
Configuration Matrix:
acks=0 │ High Throughput │ Data Loss Risk │ Use: Metrics, logs
acks=1 │ Good Throughput │ Minimal Loss Risk │ Use: General apps
acks=all │ Lower Throughput │ No Data Loss │ Use: Financial, critical
Idempotency and Exactly-Once Semantics
Idempotent Producer Configuration:
// Enables automatic deduplication
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Idempotency requires:
// acks = all (automatically set)
// max.in.flight.requests.per.connection <= 5 (automatically set)
// retries > 0 (automatically set)
// Transactional producer (for exactly-once)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
Exactly-Once Producer Pattern:
public class ExactlyOnceProducer {
private KafkaProducer<String, String> producer;
public void initTransactions() {
producer.initTransactions();
}
public void sendTransactionally(List<ProducerRecord<String, String>> records) {
producer.beginTransaction();
try {
for (ProducerRecord<String, String> record : records) {
producer.send(record);
}
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
}
}
Error Handling and Retry Logic
Comprehensive Error Handling:
public class RobustProducer {
private KafkaProducer<String, String> producer;
private final int maxRetries = 3;
public void sendWithRetry(ProducerRecord<String, String> record) {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
handleSendError(record, exception, 0);
} else {
log.info("Message sent to partition {} at offset {}",
metadata.partition(), metadata.offset());
}
});
}
private void handleSendError(ProducerRecord<String, String> record,
Exception exception, int attempt) {
if (exception instanceof RetriableException && attempt < maxRetries) {
// Automatic retry for retriable exceptions
scheduleRetry(record, attempt + 1);
} else if (exception instanceof SerializationException) {
// Non-retriable: log and possibly send to DLQ
sendToDeadLetterQueue(record, exception);
} else {
// Network timeout, authentication, etc.
handleNonRetriableError(record, exception);
}
}
}
Producer Configuration for Reliability:
// Retry configuration
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 300000); // 5 minutes total
// Request timeout (per request)
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); // 30 seconds
// Connection and metadata
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 50);
props.put(ProducerConfig.METADATA_MAX_AGE_MS_CONFIG, 300000); // Refresh topology
Message Ordering Guarantees
Ordering Configuration:
// For strict ordering within partition:
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Key-based routing ensures related messages go to same partition
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", customerId, orderData);
Order Preservation Scenarios:
Scenario 1: Single in-flight request
├── msg1 sent → success → msg2 sent → success
└── Order preserved ✓
Scenario 2: Multiple in-flight requests (max.in.flight > 1)
├── msg1 sent ─┐
├── msg2 sent ─┼─ without idempotency
└── msg3 sent ─┘ may reorder on retry ✗
Scenario 3: Multiple in-flight with idempotency
├── msg1 sent ─┐
├── msg2 sent ─┼─ producer sequence numbers
└── msg3 sent ─┘ preserve order ✓
Interview Questions and Answers
“How do you ensure exactly-once semantics?”
- Enable idempotent producer (
enable.idempotence=true
) - Use transactions with
transactional.id
- Coordinate with exactly-once consumers
- Handle duplicates at application level if needed
“What’s the tradeoff between throughput and durability?”
acks=0
: Highest throughput, risk of data lossacks=1
: Balanced, risk if leader fails before replicationacks=all
: Highest durability, lower throughput
“How do you handle producer failures?”
- Configure appropriate retries and timeouts
- Implement error callbacks for handling failures
- Use circuit breakers for downstream failures
- Monitor producer metrics for health
See Also: Topics & Partitions, Replication & Consistency
Consumer Groups & Offsets
Consumer Group Coordination
Consumer groups enable parallel processing and fault tolerance by coordinating multiple consumer instances to share the work of consuming from a topic’s partitions.
CONSUMER GROUP: "order-processors"
Topic: orders (4 partitions)
┌─────────┬─────────┬─────────┬─────────┐
│Part 0 │Part 1 │Part 2 │Part 3 │
└─────────┴─────────┴─────────┴─────────┘
│ │ │ │
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┬─────────┬─────────┬─────────┐
│Consumer │Consumer │Consumer │Consumer │
│ A │ B │ C │ D │
└─────────┴─────────┴─────────┴─────────┘
Key Guarantee: Each partition assigned to exactly ONE consumer in group
Partition Assignment Strategies
Range Assignor (Default):
Topic: user-events (7 partitions), Consumer Group: analytics (3 consumers)
Consumer-1: partitions [0, 1, 2] (3 partitions)
Consumer-2: partitions [3, 4] (2 partitions)
Consumer-3: partitions [5, 6] (2 partitions)
Pros: Simple, stable assignments
Cons: Uneven distribution possible
Round-Robin Assignor:
Same setup as above:
Consumer-1: partitions [0, 3, 6] (3 partitions)
Consumer-2: partitions [1, 4] (2 partitions)
Consumer-3: partitions [2, 5] (2 partitions)
Pros: More even distribution
Cons: Less locality, more rebalancing
Sticky Assignor (Recommended):
Minimizes partition movement during rebalancing
Maintains assignments when possible
Reduces rebalancing overhead
Configuration:
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.StickyAssignor");
Offset Management Strategies
Automatic Offset Management:
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // 5 seconds
// Offset commit timing options:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // or "latest"
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("user-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record); // Process message
// Offset automatically committed every 5 seconds
}
}
Manual Offset Management (Recommended for Production):
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Disable auto-commit
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("user-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record); // Process message
// Option 1: Commit after each message (lower throughput)
consumer.commitSync();
} catch (Exception e) {
handleProcessingError(record, e);
// Don't commit offset on error - message will be retried
}
}
// Option 2: Commit batch of messages (higher throughput)
try {
consumer.commitSync(); // Commit all processed messages
} catch (CommitFailedException e) {
handleCommitError(e);
}
}
Rebalancing Protocol and Handling
Rebalancing Triggers:
1. Consumer joins group (new consumer started)
2. Consumer leaves group (graceful shutdown)
3. Consumer fails (heartbeat timeout)
4. Topic metadata changes (partition added)
5. Group coordinator failover
Rebalancing Flow:
REBALANCING PROCESS:
1. GROUP COORDINATOR DETECTION
├── Hash(group.id) % num_partitions(__consumer_offsets)
└── Identifies which broker manages this group
2. JOIN GROUP PHASE
├── All consumers send JoinGroup request
├── Coordinator selects group leader
└── Leader receives member list and metadata
3. SYNC GROUP PHASE
├── Leader computes partition assignments
├── Leader sends assignments to coordinator
└── Coordinator distributes assignments to members
4. PARTITION ASSIGNMENT
├── Consumers start consuming assigned partitions
└── Previous assignments released
Rebalancing Listener Implementation:
public class RebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.info("Partitions revoked: {}", partitions);
// 1. Finish processing current messages
finishCurrentWork();
// 2. Commit current offsets
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("Failed to commit offsets during rebalance", e);
}
// 3. Clean up resources for revoked partitions
partitions.forEach(this::cleanupPartition);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("Partitions assigned: {}", partitions);
// 1. Initialize resources for new partitions
partitions.forEach(this::initializePartition);
// 2. Seek to specific offsets if needed
for (TopicPartition partition : partitions) {
long savedOffset = getOffsetFromDatabase(partition);
if (savedOffset >= 0) {
consumer.seek(partition, savedOffset);
}
}
}
}
// Usage:
consumer.subscribe(Arrays.asList("user-events"), new RebalanceListener());
Consumer Configuration Best Practices
Essential Consumer Settings:
Properties props = new Properties();
// Group and client identification
props.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-processors");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-processor-" + instanceId);
// Offset management
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Session and heartbeat configuration
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30 seconds
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // 10 seconds
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 minutes
// Fetch configuration for performance
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 1KB minimum
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 500ms max wait
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB max per partition
Consumer Performance Tuning:
// Large batch processing
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); // Process 1000 records per poll
// Network optimization
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 131072); // 128KB
props.put(ConsumerConfig.SEND_BUFFER_CONFIG, 131072); // 128KB
// For high-throughput scenarios
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000); // Wait for 50KB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100); // But no more than 100ms
Error Handling and Recovery Patterns
Retry with Dead Letter Queue:
public class RobustConsumer {
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> dlqProducer;
private final String dlqTopic;
private final int maxRetries = 3;
public void processRecords() {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
boolean processed = false;
int attempts = 0;
while (!processed && attempts < maxRetries) {
try {
processMessage(record);
processed = true;
} catch (RetriableException e) {
attempts++;
if (attempts >= maxRetries) {
sendToDeadLetterQueue(record, e);
processed = true; // Don't retry further
} else {
waitBeforeRetry(attempts);
}
} catch (NonRetriableException e) {
sendToDeadLetterQueue(record, e);
processed = true;
}
}
}
// Commit offsets only after all messages processed
consumer.commitSync();
}
private void sendToDeadLetterQueue(ConsumerRecord<String, String> record, Exception e) {
Headers headers = record.headers();
headers.add("error.message", e.getMessage().getBytes());
headers.add("error.timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
ProducerRecord<String, String> dlqRecord =
new ProducerRecord<>(dlqTopic, record.key(), record.value(), headers);
dlqProducer.send(dlqRecord);
}
}
Consumer Lag Monitoring
Key Metrics to Track:
# Consumer lag per partition
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group payment-processors
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
orders 0 1500 1750 250 consumer-1-abc123
orders 1 2300 2310 10 consumer-2-def456
orders 2 3100 3100 0 consumer-3-ghi789
Programmatic Lag Monitoring:
public class ConsumerLagMonitor {
private final AdminClient adminClient;
private final String groupId;
public Map<TopicPartition, Long> getConsumerLag() {
Map<TopicPartition, Long> lagMap = new HashMap<>();
// Get consumer group offsets
Map<TopicPartition, OffsetAndMetadata> consumerOffsets =
adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();
// Get log end offsets
Map<TopicPartition, Long> logEndOffsets =
adminClient.getConsumer().endOffsets(consumerOffsets.keySet());
// Calculate lag
for (TopicPartition tp : consumerOffsets.keySet()) {
long consumerOffset = consumerOffsets.get(tp).offset();
long logEndOffset = logEndOffsets.get(tp);
lagMap.put(tp, logEndOffset - consumerOffset);
}
return lagMap;
}
}
Interview Scenarios
“What happens when a consumer in a group fails?”
- Consumer stops sending heartbeats
- Group coordinator triggers rebalancing after session timeout
- Failed consumer’s partitions redistributed to remaining consumers
- Remaining consumers may experience brief pause during rebalancing
“How do you handle slow consumers?”
- Increase
max.poll.interval.ms
for longer processing times - Reduce
max.poll.records
to process smaller batches - Scale horizontally by adding more consumer instances
- Optimize message processing logic
“Explain exactly-once consumption.”
- Disable auto-commit (
enable.auto.commit=false
) - Process message and commit offset in single transaction
- Handle idempotency at application level for duplicates
- Use external storage for offset management if needed
See Also: Brokers & Clustering, Operational Considerations
Replication and Consistency
Replication Architecture
Kafka provides fault tolerance through partition replication across multiple brokers. Each partition has one leader and multiple followers, ensuring data survives broker failures.
TOPIC: transactions, PARTITION: 0, REPLICATION-FACTOR: 3
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ BROKER 1 │ │ BROKER 2 │ │ BROKER 3 │
│ │ │ │ │ │
│ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │
│ │LEADER │ │ │ │FOLLOWER │ │ │ │FOLLOWER │ │
│ │Partition 0│◄─┼──┼──┤Partition 0│ │ │ │Partition 0│ │
│ │[msg1][msg2]│ │ │ │[msg1][msg2]│◄─┼──┼──┤[msg1][msg2]│ │
│ │ │ │ │ │ │ │ │ │ │ │
│ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
REPLICATION FLOW:
1. Producer sends to Leader
2. Leader writes to local log
3. Followers fetch from Leader
4. Leader tracks follower progress
5. Acknowledgment sent based on acks setting
In-Sync Replicas (ISR) Management
ISR Definition and Mechanics:
In-Sync Replicas (ISR) = Replicas that are "caught up" with the leader
ISR Criteria:
├── Follower has fetched within replica.lag.time.max.ms (default: 30s)
├── Follower is within replica.lag.max.messages behind leader
└── Follower is actively fetching (not failed)
ISR Changes:
├── Follower falls behind → Removed from ISR
├── Follower catches up → Added back to ISR
├── Leader fails → New leader elected from ISR only
ISR Configuration:
# Broker configuration
replica.lag.time.max.ms=30000 # 30 seconds to stay in ISR
min.insync.replicas=2 # Minimum ISR size for writes
unclean.leader.election.enable=false # Only ISR can become leader
# Topic-level override
kafka-configs --alter --entity-type topics --entity-name critical-topic \
--add-config min.insync.replicas=3
Consistency Guarantees and Trade-offs
Consistency Model:
KAFKA CONSISTENCY LEVELS:
acks=0 (No Durability Guarantee):
├── Producer: Fire and forget
├── Guarantee: None (fastest)
└── Risk: Message loss if leader fails
acks=1 (Leader Confirmation):
├── Producer: Wait for leader write
├── Guarantee: Durable if leader survives
└── Risk: Loss if leader fails before replication
acks=all (ISR Confirmation):
├── Producer: Wait for ISR write confirmation
├── Guarantee: Durable as long as one ISR replica survives
└── Risk: Minimal (configurable via min.insync.replicas)
CAP Theorem in Kafka Context:
Kafka's CAP Theorem Positioning:
PARTITION TOLERANCE: Always maintained
├── Cluster continues operating despite network splits
├── Partitions can be served from different brokers
└── Automatic failover and recovery
CONSISTENCY vs AVAILABILITY Trade-off:
├── High Consistency: acks=all + min.insync.replicas > 1
│ • Ensures strong durability
│ • May sacrifice availability during broker failures
│
└── High Availability: acks=1 or unclean.leader.election=true
• Prioritizes uptime over consistency
• Risk of message loss or ordering issues
Replication Configuration Examples
High Durability Configuration:
# Topic creation for financial transactions
kafka-topics --create \
--topic financial-transactions \
--partitions 12 \
--replication-factor 3 \
--config min.insync.replicas=3 \
--config unclean.leader.election.enable=false
# Producer configuration
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
High Performance Configuration:
# Topic for metrics/logging
kafka-topics --create \
--topic application-metrics \
--partitions 24 \
--replication-factor 2 \
--config min.insync.replicas=1
# Producer configuration
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
Failure Scenarios and Recovery
Leader Failure Recovery:
SCENARIO: Leader Broker Fails
Before Failure:
├── Broker-1: Leader (Partition 0)
├── Broker-2: Follower (Partition 0, ISR)
└── Broker-3: Follower (Partition 0, ISR)
Failure Sequence:
1. Broker-1 becomes unreachable
2. Controller detects failure (missed heartbeats)
3. Controller triggers leader election
4. New leader selected from ISR (Broker-2 or Broker-3)
5. Metadata updated across cluster
6. Clients discover new leader
7. Normal operation resumes
Recovery Time:
├── Detection: ~session.timeout.ms (6s default)
├── Election: ~few hundred milliseconds
└── Client Discovery: ~metadata.max.age.ms (5min default)
Split-Brain Prevention:
Kafka prevents split-brain through:
1. SINGLE CONTROLLER MODEL
├── Only one controller per cluster
├── Controller election via ZooKeeper/KRaft
└── Controller manages all leadership decisions
2. ISR-BASED ELECTIONS
├── Only ISR replicas eligible for leadership
├── Prevents stale replicas from becoming leaders
└── Maintains data consistency
3. FENCING MECHANISMS
├── Epoch numbers for leaders
├── Stale leaders reject writes
└── Clients automatically discover current leader
Cross-Datacenter Replication
MirrorMaker 2.0 Architecture:
PRIMARY DATACENTER (US-EAST) SECONDARY DATACENTER (US-WEST)
┌─────────────────────────┐ ┌─────────────────────────┐
│ KAFKA CLUSTER A │ │ KAFKA CLUSTER B │
│ │ │ │
│ Topic: user-events │ │ Topic: us-east.user- │
│ ├── Partition 0 │────► │ events │
│ ├── Partition 1 │ │ ├── Partition 0 │
│ └── Partition 2 │ │ ├── Partition 1 │
│ │ │ └── Partition 2 │
│ │ │ │
│ │ │ MirrorMaker 2.0 │
│ │ │ Connectors │
└─────────────────────────┘ └─────────────────────────┘
MirrorMaker Configuration:
# Source and target clusters
clusters=primary,secondary
primary.bootstrap.servers=kafka1-east:9092,kafka2-east:9092
secondary.bootstrap.servers=kafka1-west:9092,kafka2-west:9092
# Replication flows
primary->secondary.enabled=true
primary->secondary.topics=user-events,transactions,.*-logs
# Topic naming in target cluster
replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy
# Results in: primary.user-events, primary.transactions
# Sync settings
sync.topic.acls.enabled=true
sync.topic.configs.enabled=true
emit.checkpoints.enabled=true # For exactly-once
Monitoring Replication Health
Key Replication Metrics:
# ISR size monitoring
kafka-topics --describe --topic critical-topic
Topic: critical-topic
Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 ← Healthy
Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3 ← Degraded
# Under-replicated partitions
kafka-topics --describe --under-replicated-partitions
# Preferred replica election (rebalancing)
kafka-leader-election --election-type preferred --all-topic-partitions
JMX Metrics for Replication:
// Key metrics to monitor
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions // Should be 0
kafka.server:type=ReplicaManager,name=PartitionCount // Partitions per broker
kafka.controller:type=KafkaController,name=OfflinePartitionsCount // Should be 0
kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs // Election frequency
Interview Questions
“How does Kafka ensure no data loss?”
- Configure
acks=all
for producers - Set
min.insync.replicas > 1
at topic/broker level - Disable
unclean.leader.election
- Monitor ISR health and handle shrinking ISRs
“What happens if all replicas fail?”
- Partition becomes unavailable for reads/writes
- If
unclean.leader.election=false
: Wait for ISR replica to recover - If
unclean.leader.election=true
: Allow non-ISR replica to become leader (data loss possible) - Monitor and alert on ISR size to prevent this scenario
“Explain Kafka’s consistency model.”
- Read-after-write consistency: Consumers see writes immediately after producer acknowledgment
- Monotonic read consistency: Consumers see messages in order within partitions
- No cross-partition consistency: No guarantees across different partitions
- Configurable durability: Trade-off between consistency and performance via
acks
See Also: Brokers & Clustering, Producer Architecture
Performance Characteristics
Throughput and Latency Metrics
Typical Performance Benchmarks:
KAFKA PERFORMANCE CHARACTERISTICS:
Throughput (Single Broker):
├── Sequential writes: ~750 MB/s (millions of messages/sec)
├── Random writes: ~150 MB/s
├── Sequential reads: ~900 MB/s
└── Network-limited: ~125 MB/s per 1Gbps link
Latency (99th percentile):
├── End-to-end: 2-5ms (optimized configuration)
├── Producer acks=1: 1-2ms
├── Producer acks=all: 3-5ms
└── Consumer poll: <1ms
Performance Factors:
THROUGHPUT DRIVERS:
├── Batch size (larger = higher throughput)
├── Compression (reduces network/disk I/O)
├── Replication factor (higher = lower throughput)
├── Number of partitions (more = higher parallelism)
└── Hardware (SSD, network bandwidth, CPU cores)
LATENCY DRIVERS:
├── acks configuration (all > 1 > 0)
├── linger.ms (batching delay)
├── Network RTT between producers/brokers/consumers
├── Disk fsync behavior (OS and hardware)
└── JVM garbage collection pauses
Scaling Patterns and Bottlenecks
Horizontal Scaling Strategy:
SCALING DIMENSIONS:
Partition Scaling:
├── More partitions → More consumer parallelism
├── Guideline: Start with 2-3x expected consumers
├── Max recommended: 4000 partitions per broker
└── Cost: Higher metadata overhead, longer elections
Broker Scaling:
├── More brokers → Distribute partition leadership
├── Pattern: Add brokers then rebalance partitions
├── Benefit: Higher aggregate throughput
└── Cost: Increased operational complexity
Consumer Scaling:
├── Scale consumers up to partition count
├── Beyond partition count = idle consumers
├── Pattern: Auto-scaling based on consumer lag
└── Monitor: Consumer lag per partition
Common Bottlenecks and Solutions:
Bottleneck | Symptoms | Solutions |
---|---|---|
Producer Batching | Low throughput, high CPU | Increase batch.size , tune linger.ms |
Network I/O | High latency, bandwidth limits | Enable compression, increase buffers |
Disk I/O | Slow writes, high latency | Use SSD, tune OS page cache |
Consumer Lag | Processing slower than ingestion | Scale consumers, optimize processing |
Replication | High latency with acks=all | Optimize network, tune ISR settings |
GC Pauses | Periodic latency spikes | Tune JVM heap, use G1 collector |
Configuration Tuning Guide
Producer Performance Tuning:
Properties producerProps = new Properties();
// Throughput optimization
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB batches
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, 20); // 20ms batching window
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // Fast compression
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 134217728); // 128MB buffer
// Network optimization
producerProps.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072); // 128KB
producerProps.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 32768); // 32KB
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// Reliability vs performance balance
producerProps.put(ProducerConfig.ACKS_CONFIG, "1"); // Leader ack only
producerProps.put(ProducerConfig.RETRIES_CONFIG, 3);
Consumer Performance Tuning:
Properties consumerProps = new Properties();
// Fetch optimization
consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000); // 50KB minimum
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100); // 100ms max wait
consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB max
// Processing optimization
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000); // Larger batches
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Manual commits
// Network buffers
consumerProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 262144); // 256KB
consumerProps.put(ConsumerConfig.SEND_BUFFER_CONFIG, 131072); // 128KB
Broker Performance Tuning:
# server.properties
# Network threads (typically 8-16)
num.network.threads=16
num.io.threads=16
# Socket buffers
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# Log segment configuration
log.segment.bytes=1073741824 # 1GB segments
log.index.size.max.bytes=10485760 # 10MB indexes
# Replication performance
num.replica.fetchers=8 # Parallel fetcher threads
replica.fetch.max.bytes=1048576 # 1MB per fetch
# Compression and cleanup
compression.type=lz4 # Broker-side compression
log.cleanup.policy=delete
log.retention.check.interval.ms=300000
Hardware Recommendations
CPU Requirements:
BROKER CPU SIZING:
├── Baseline: 8-16 cores per broker
├── Network threads: 1 core per 4 threads
├── I/O threads: 1 core per 2-4 threads
├── Compression: Additional 2-4 cores
└── Scaling: Monitor CPU utilization <70%
Memory Allocation:
MEMORY DISTRIBUTION:
├── JVM Heap: 6-10GB (avoid large heaps)
├── OS Page Cache: 60-70% of total RAM
├── Network buffers: ~1-2GB
└── Example: 64GB machine → 8GB heap, 50GB page cache
Storage Configuration:
DISK RECOMMENDATIONS:
├── Type: SSD strongly recommended for low latency
├── RAID: RAID-10 for performance + redundancy
├── File system: XFS or ext4 with noatime
├── Separate disks: OS, logs, ZooKeeper data
└── Monitoring: Disk utilization, IOPS, latency
Network Specifications:
NETWORK SIZING:
├── Bandwidth: 10Gbps minimum for production
├── Calculation: (ingress + egress × replication_factor) × safety_margin
├── Example: 1GB/s ingress × 3 replicas × 2 margin = 6GB/s needed
└── Monitoring: Network utilization, packet loss
Performance Monitoring and Alerting
Key Performance Metrics:
# Throughput monitoring
kafka-consumer-perf-test --topic test-topic --bootstrap-server localhost:9092
# Producer performance test
kafka-producer-perf-test --topic test-topic --num-records 1000000 \
--record-size 1024 --throughput 100000
# JMX metrics to monitor
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
kafka.server:type=ReplicaManager,name=LeaderCount
Production Monitoring Dashboard:
Key Metrics to Dashboard:
- Throughput: Messages/sec, Bytes/sec per topic
- Latency: 99th percentile produce/consume latency
- Consumer Lag: Per partition and total
- Broker Health: CPU, memory, disk usage
- Replication: ISR shrinking, under-replicated partitions
- Network: Bandwidth utilization, request rate
Interview Performance Questions
“How do you optimize Kafka for high throughput?”
- Increase batch sizes and enable compression
- Tune linger.ms for better batching
- Scale partitions and consumers horizontally
- Use appropriate hardware (SSD, high bandwidth)
- Monitor and eliminate bottlenecks systematically
“What causes latency in Kafka?”
- Producer batching (
linger.ms
setting) - Network RTT between components
- Disk sync behavior and storage performance
- Replication overhead with
acks=all
- JVM garbage collection pauses
“How do you handle traffic spikes?”
- Over-provision partitions for scaling headroom
- Implement producer-side backpressure
- Use consumer auto-scaling based on lag metrics
- Pre-warm page caches during off-peak hours
- Monitor leading indicators (queue depth, latency)
See Also: Producer Architecture, Consumer Groups, Operational Considerations
Operational Considerations
Essential Monitoring Metrics
Cluster Health Metrics:
CRITICAL ALERTS (Page immediately):
├── Under-replicated partitions > 0
├── Offline partitions > 0
├── ISR shrinking rate > normal baseline
├── Controller election frequency > normal
└── Any broker down
PERFORMANCE ALERTS (Monitor closely):
├── Consumer lag > SLA threshold
├── Producer error rate > 1%
├── Request latency P99 > SLA
├── Disk usage > 80%
└── JVM heap usage > 70%
Monitoring Implementation:
// JMX metric collection example
public class KafkaMetricsCollector {
private final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
public Map<String, Object> getClusterHealth() {
Map<String, Object> metrics = new HashMap<>();
// Under-replicated partitions (critical)
ObjectName underReplicatedName = new ObjectName(
"kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions");
metrics.put("underReplicatedPartitions",
server.getAttribute(underReplicatedName, "Value"));
// Offline partitions (critical)
ObjectName offlinePartitionsName = new ObjectName(
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount");
metrics.put("offlinePartitions",
server.getAttribute(offlinePartitionsName, "Value"));
// Producer request rate
ObjectName produceRequestName = new ObjectName(
"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce");
metrics.put("produceRequestRate",
server.getAttribute(produceRequestName, "OneMinuteRate"));
return metrics;
}
}
Troubleshooting Common Issues
Issue: Consumer Lag Building Up
DIAGNOSIS STEPS:
1. Identify affected consumers:
kafka-consumer-groups --describe --group my-group
2. Check consumer health:
├── Consumer instances running?
├── Processing errors in logs?
├── Rebalancing frequently?
└── Network connectivity issues?
3. Check partition distribution:
├── Even partition assignment?
├── Hot partitions with high traffic?
└── Consumer instance capacity?
SOLUTIONS:
├── Scale consumers (if < partition count)
├── Optimize message processing logic
├── Increase max.poll.records for batching
├── Add more partitions (requires rebalancing)
└── Monitor GC pauses in consumer JVMs
Issue: Broker Disk Space Exhaustion
IMMEDIATE ACTIONS:
1. Emergency cleanup:
kafka-log-dirs --describe --bootstrap-server localhost:9092
2. Temporary retention reduction:
kafka-configs --alter --entity-type topics --entity-name large-topic \
--add-config retention.ms=3600000 # 1 hour
3. Manual log cleanup:
# Stop broker gracefully
# Clean old log segments: rm /var/kafka-logs/topic-partition/00*.log
# Restart broker
LONG-TERM SOLUTIONS:
├── Implement proper retention policies
├── Set up disk usage monitoring/alerting
├── Use log compaction for key-based topics
├── Archive old data to cold storage
└── Capacity planning for growth
Issue: Frequent Leader Elections
DIAGNOSIS:
├── Network instability between brokers
├── JVM garbage collection pauses
├── Disk I/O latency spikes
├── Incorrect timeout configurations
└── ZooKeeper/KRaft cluster issues
SOLUTIONS:
1. Network optimization:
├── Check network latency/packet loss
├── Tune socket buffer sizes
└── Verify DNS resolution speed
2. JVM tuning:
├── Use G1 garbage collector
├── Tune heap size (6-10GB typical)
├── Monitor GC logs for long pauses
└── Set appropriate GC tuning flags
3. Timeout adjustments:
replica.lag.time.max.ms=30000 # Increase if needed
controller.socket.timeout.ms=30000
request.timeout.ms=30000
Deployment and Configuration Management
Zero-Downtime Deployment Process:
#!/bin/bash
# Rolling update script
BROKERS=("broker1" "broker2" "broker3")
NEW_VERSION="2.8.0"
for broker in "${BROKERS[@]}"; do
echo "Updating $broker..."
# 1. Gracefully shut down broker
ssh $broker "kafka-server-stop.sh"
# 2. Wait for partition leadership migration
sleep 30
# 3. Update Kafka binaries
ssh $broker "tar -xzf kafka-${NEW_VERSION}.tgz -C /opt/"
# 4. Start broker with new version
ssh $broker "kafka-server-start.sh -daemon server.properties"
# 5. Verify broker rejoined cluster
kafka-broker-api-versions.sh --bootstrap-server $broker:9092
# 6. Wait before proceeding to next broker
sleep 60
done
Configuration Management Best Practices:
# server.properties template with environment variables
broker.id=${BROKER_ID}
listeners=${LISTENERS}
log.dirs=${LOG_DIRS}
# Environment-specific settings
default.replication.factor=${REPLICATION_FACTOR:-3}
min.insync.replicas=${MIN_ISR:-2}
# Security configuration
security.protocol=${SECURITY_PROTOCOL:-PLAINTEXT}
ssl.keystore.location=${SSL_KEYSTORE_PATH}
ssl.truststore.location=${SSL_TRUSTSTORE_PATH}
# Monitoring integration
jmx.port=${JMX_PORT:-9999}
Backup and Disaster Recovery
Backup Strategies:
# 1. Topic configuration backup
kafka-topics --list --bootstrap-server localhost:9092 > topics.txt
while read topic; do
kafka-configs --describe --entity-type topics --entity-name $topic \
--bootstrap-server localhost:9092 >> topic-configs.txt
done < topics.txt
# 2. Consumer group offset backup
kafka-consumer-groups --list --bootstrap-server localhost:9092 > groups.txt
while read group; do
kafka-consumer-groups --describe --group $group \
--bootstrap-server localhost:9092 >> group-offsets.txt
done < groups.txt
# 3. Metadata backup (ZooKeeper)
zkCli.sh -server zk1:2181 <<EOF
ls /brokers
ls /config
ls /controller
EOF
Cross-Region Disaster Recovery:
DR Setup with MirrorMaker 2.0:
PRIMARY_REGION (us-east):
├── Kafka Cluster (3 brokers)
├── Application producers/consumers
└── MirrorMaker 2.0 → DR_REGION
DR_REGION (us-west):
├── Kafka Cluster (3 brokers)
├── Standby applications (read-only)
├── Mirrored topics: primary.topic-name
└── Offset sync for consumer failover
FAILOVER_PROCESS:
1. Stop applications in primary region
2. Switch DNS/load balancers to DR region
3. Start applications in DR region
4. Begin reverse replication (DR → PRIMARY)
Security Operations
Authentication and Authorization Setup:
# Enable SASL/SCRAM authentication
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# SSL configuration
ssl.client.auth=required
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
# ACL authorization
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:kafka;User:admin
Common Security Operations:
# Create SCRAM user
kafka-configs --alter --add-config 'SCRAM-SHA-256=[password=secret]' \
--entity-type users --entity-name alice
# Grant ACLs
kafka-acls --add --allow-principal User:alice \
--operation Read --operation Write --topic payments
# List current ACLs
kafka-acls --list --principal User:alice
# Rotate SSL certificates
# 1. Generate new certificates
# 2. Add to truststore
# 3. Rolling restart with new keystore
# 4. Remove old certificates
Capacity Planning and Growth Management
Capacity Planning Framework:
# Capacity calculation example
def calculate_kafka_capacity(requirements):
daily_messages = requirements['daily_messages']
message_size = requirements['avg_message_size_kb']
retention_days = requirements['retention_days']
replication_factor = requirements['replication_factor']
# Storage calculation
daily_storage_gb = (daily_messages * message_size) / (1024 * 1024)
total_storage_gb = daily_storage_gb * retention_days * replication_factor
# Add safety margin
recommended_storage = total_storage_gb * 1.5
# Partition calculation
target_throughput_mb = daily_storage_gb / (24 * 3600) # MB/s
partition_throughput_mb = 25 # Conservative estimate
min_partitions = max(target_throughput_mb / partition_throughput_mb,
requirements.get('min_consumers', 1))
return {
'storage_gb': recommended_storage,
'partitions': min_partitions,
'brokers': max(3, min_partitions // 100) # 100 partitions per broker
}
Growth Management:
# Adding partitions (irreversible!)
kafka-topics --alter --topic user-events --partitions 20
# Adding brokers and rebalancing
# 1. Start new brokers
# 2. Generate reassignment plan
kafka-reassign-partitions --generate \
--topics-to-move-json-file all-topics.json \
--broker-list "1,2,3,4,5" # Include new brokers
# 3. Execute rebalancing (gradual)
kafka-reassign-partitions --execute \
--reassignment-json-file expand-cluster-reassignment.json \
--throttle 50000000 # 50MB/s throttle
Interview Operational Questions
“How do you monitor Kafka in production?”
- JMX metrics collection (broker, producer, consumer metrics)
- Consumer lag monitoring and alerting
- Cluster health checks (ISR, offline partitions)
- Resource monitoring (CPU, memory, disk, network)
- End-to-end latency and throughput monitoring
“Describe your disaster recovery plan for Kafka.”
- Cross-region replication with MirrorMaker 2.0
- Regular metadata and configuration backups
- Automated failover procedures with DNS switching
- Consumer offset synchronization for seamless failover
- Regular DR testing and runbook maintenance
“How do you handle a failed broker?”
- Immediate: Check if partitions are still available via ISR
- Short-term: Allow automatic leader election and recovery
- Investigation: Identify root cause (hardware, network, configuration)
- Recovery: Fix underlying issue and restart broker
- Verification: Confirm broker rejoined cluster and replicated data
See Also: Performance Characteristics, Replication & Consistency
Summary and Integration
Key Takeaways
Kafka’s Core Strengths:
- High Throughput: Millions of messages per second through batching and sequential I/O
- Fault Tolerance: Replication and ISR management ensure data durability
- Scalability: Horizontal scaling via partitions and brokers
- Durability: Configurable persistence with acks and min.insync.replicas
- Ordering: Guarantees within partition boundaries
Critical Design Decisions:
- Partitioning Strategy: Impacts parallelism, ordering, and scaling
- Replication Factor: Balances durability with performance and storage costs
- Producer ACKs: Trades durability guarantees for latency and throughput
- Consumer Group Design: Affects processing parallelism and failure handling
System Design Integration Points
When to Choose Kafka:
- High-throughput event streaming (>100k messages/sec)
- Event-driven microservices architecture
- Real-time analytics and stream processing
- Log aggregation and centralized logging
- Change data capture from databases
Kafka vs Alternatives:
- RabbitMQ: Better for complex routing, lower throughput requirements
- AWS SQS: Managed service, simpler operations, vendor lock-in
- Apache Pulsar: Multi-tenancy, geo-replication, newer ecosystem
- Redis Streams: Lower latency, simpler deployment, less durability
Production Readiness Checklist
✓ Infrastructure:
- Multi-broker cluster (minimum 3 brokers)
- Separate ZooKeeper/KRaft cluster
- SSD storage with adequate IOPS
- Network bandwidth planning (10Gbps+)
- Security setup (SSL, SASL, ACLs)
✓ Configuration:
- Appropriate replication factors (3+ for critical data)
- min.insync.replicas configured
- Retention policies aligned with business needs
- JVM tuning (heap size, GC collector)
- OS-level optimizations (file descriptors, swappiness)
✓ Monitoring:
- JMX metrics collection and dashboards
- Consumer lag monitoring and alerting
- Cluster health checks automation
- End-to-end latency monitoring
- Capacity and growth trend analysis
✓ Operations:
- Deployment automation and rolling updates
- Backup and disaster recovery procedures
- Incident response runbooks
- Performance baseline and SLAs defined
- Team training and knowledge transfer
This knowledge chunk provides the foundation for implementing, operating, and scaling Kafka in production environments while being prepared for senior-level technical discussions and system design interviews.
Total Study Time: 45 minutes Next Steps: Practice system design scenarios, implement hands-on labs, explore advanced topics like exactly-once semantics and stream processing frameworks.