Skip to content

Kafka Series

Kafka Architecture - Core Concepts

Deep dive into Kafka's distributed architecture, topics, partitions, brokers, and core design decisions that enable high-throughput event streaming

Concepts Covered in This Article

Table of Contents

Table of Contents

Table of Contents

Open Table of Contents

Core Architecture Overview

What is Kafka?

Apache Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerant, real-time data pipelines. It acts as a commit log service that stores and forwards messages between producers and consumers.

Key Value Proposition:

Core Components

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   PRODUCERS     │    │     KAFKA       │    │   CONSUMERS     │
│                 │    │    CLUSTER      │    │                 │
│  Applications   │───▶│                 │───▶│  Applications   │
│  Services       │    │   Topic-Based   │    │  Services       │
│  Systems        │    │   Pub-Sub       │    │  Analytics      │
└─────────────────┘    └─────────────────┘    └─────────────────┘

Kafka Cluster Architecture

              KAFKA CLUSTER
    ┌─────────────────────────────────────┐
    │                                     │
    │  ┌──────────┐  ┌──────────┐        │
    │  │ Broker 1 │  │ Broker 2 │        │
    │  │  Leader  │  │ Follower │        │
    │  │          │  │          │        │
    │  └──────────┘  └──────────┘        │
    │       │              │             │
    │  ┌──────────┐  ┌──────────┐        │
    │  │ Broker 3 │  │ZooKeeper │        │
    │  │ Follower │  │or KRaft  │        │
    │  │          │  │ Metadata │        │
    │  └──────────┘  └──────────┘        │
    └─────────────────────────────────────┘

Basic Data Flow

  1. Producers send messages to specific topics
  2. Topics are divided into partitions for scalability
  3. Brokers store partition data across the cluster
  4. Consumers read messages from topics/partitions
  5. Consumer Groups coordinate parallel consumption

Interview Key Points

“Why Kafka over traditional message queues?”

“When would you use Kafka?”

See Also: Topics & Partitions, Brokers


Topics, Partitions & Segments

Topic Organization

A topic is a logical grouping of related messages (e.g., “user-events”, “payment-transactions”). Topics provide the primary abstraction for organizing data streams.

TOPIC: user-events
├── Partition 0: [msg1][msg2][msg3][msg4]...
├── Partition 1: [msg5][msg6][msg7][msg8]...
├── Partition 2: [msg9][msg10][msg11][msg12]...
└── Partition 3: [msg13][msg14][msg15][msg16]...

Partition Mechanics

Partitions enable:

Partition Assignment:

// Producer determines partition via:
// 1. Explicit partition specification
producer.send(new ProducerRecord<>("topic", partition, key, value));

// 2. Key-based hashing (most common)
producer.send(new ProducerRecord<>("topic", key, value));
// partition = hash(key) % num_partitions

// 3. Round-robin (no key)
producer.send(new ProducerRecord<>("topic", value));

Segment Storage Structure

Each partition is divided into segments for efficient storage and retrieval:

PARTITION 0 DIRECTORY:
├── 00000000000000000000.log    (active segment)
├── 00000000000000000000.index  (offset index)
├── 00000000000000000000.timeindex (time index)
├── 00000000000000100000.log    (older segment)
├── 00000000000000100000.index
└── 00000000000000100000.timeindex

Segment Properties:

Storage Configuration Examples

Topic Creation with Partitioning Strategy:

# Create topic with strategic partition count
kafka-topics --create \
  --topic user-events \
  --partitions 12 \
  --replication-factor 3 \
  --config segment.ms=86400000 \
  --config retention.ms=604800000

# Partition count considerations:
# - Start with (target_throughput_MB/s ÷ partition_throughput_MB/s)
# - Consider consumer parallelism (max consumers = partitions)
# - Account for hot partitions with uneven key distribution

Retention Policies:

# Time-based retention (delete after 7 days)
--config retention.ms=604800000

# Size-based retention (delete when topic exceeds 10GB)
--config retention.bytes=10737418240

# Log compaction (keep latest value per key)
--config cleanup.policy=compact

# Combined: compaction + time retention
--config cleanup.policy=compact,delete
--config retention.ms=604800000

Partition Strategy Decision Framework

Key-Based Partitioning:

Use when: Need ordering guarantees for related messages
Example: user_id → ensures all user events in same partition
Tradeoff: Risk of hot partitions with skewed data

Random/Round-Robin Partitioning:

Use when: Maximum throughput, no ordering requirements
Example: Logs, metrics without correlation needs
Tradeoff: No ordering guarantees across messages

Custom Partitioning:

public class CustomPartitioner implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        // Custom logic: geographic routing, load balancing, etc.
        if (key.toString().startsWith("VIP_")) {
            return 0; // VIP users get dedicated partition
        }
        return Utils.murmur2(keyBytes) % cluster.partitionCountForTopic(topic);
    }
}

Sizing and Capacity Planning

Partition Sizing Guidelines:

Target: 25GB max per partition (for reasonable recovery times)
Formula: daily_data_GB ÷ retention_days ÷ partition_count < 25GB

Example Calculation:
- 1TB/day ingestion
- 7-day retention
- 7TB total storage needed
- 7000GB ÷ 25GB = 280 partitions minimum

Performance Implications:

See Also: Producer Architecture, Consumer Groups


Brokers and Clustering

Broker Roles and Responsibilities

A broker is a Kafka server that stores data and serves client requests. In a cluster, brokers coordinate to provide distributed storage and fault tolerance.

BROKER RESPONSIBILITIES:
┌─────────────────────────────────────────┐
│ • Accept producer writes                │
│ • Serve consumer reads                  │
│ • Replicate data to other brokers       │
│ • Participate in leader election        │
│ • Store and serve cluster metadata      │
│ • Handle client connections             │
└─────────────────────────────────────────┘

Cluster Coordination: ZooKeeper vs KRaft

Traditional ZooKeeper Mode:

    KAFKA CLUSTER
┌─────────────────────┐    ┌──────────────┐
│  ┌──────┐  ┌──────┐ │    │  ZooKeeper   │
│  │Broker│  │Broker│ │◄──►│   Ensemble   │
│  │  1   │  │  2   │ │    │              │
│  └──────┘  └──────┘ │    │ • Metadata   │
│           │         │    │ • Elections  │
│  ┌──────┐ │ ┌──────┐ │    │ • Config     │
│  │Broker│   │Broker│ │    │ • ACLs       │
│  │  3   │   │  4   │ │    └──────────────┘
│  └──────┘   └──────┘ │
└─────────────────────┘

Modern KRaft Mode (KIP-500):

    KAFKA CLUSTER (Self-Managing)
┌──────────────────────────────────────┐
│ ┌──────────┐  ┌──────────┐          │
│ │Controller│  │ Broker   │          │
│ │+Broker 1 │  │    2     │          │
│ └──────────┘  └──────────┘          │
│      │                │             │
│ ┌──────────┐  ┌──────────┐          │
│ │Controller│  │ Broker   │          │
│ │+Broker 3 │  │    4     │          │
│ └──────────┘  └──────────┘          │
│                                     │
│ Controllers = Metadata Management    │
│ Brokers = Data Storage + Serving     │
└──────────────────────────────────────┘

Leader Election and Partition Leadership

Partition Leadership Model:

TOPIC: orders, 3 partitions, replication-factor=3

Partition 0: Leader=Broker1, Followers=[Broker2, Broker3]
Partition 1: Leader=Broker2, Followers=[Broker1, Broker3]
Partition 2: Leader=Broker3, Followers=[Broker1, Broker2]

LEADER RESPONSIBILITIES:
├── Accept all writes for the partition
├── Serve reads (configurable)
├── Manage follower replication
└── Coordinate with controller for metadata changes

FOLLOWER RESPONSIBILITIES:
├── Replicate data from leader
├── Stay in-sync to be eligible for leadership
└── Take over as leader if current leader fails

Broker Configuration Examples

Essential Broker Settings:

# server.properties

# Unique broker identifier
broker.id=1

# Network and connectivity
listeners=PLAINTEXT://kafka1.company.com:9092
advertised.listeners=PLAINTEXT://kafka1.company.com:9092

# Storage configuration
log.dirs=/var/kafka-logs-1,/var/kafka-logs-2,/var/kafka-logs-3
num.network.threads=8
num.io.threads=16

# Replication settings
default.replication.factor=3
min.insync.replicas=2

# Log management
log.retention.hours=168
log.segment.bytes=1073741824
log.cleanup.policy=delete

# ZooKeeper connection (if not using KRaft)
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka

Production Hardening:

# JVM heap sizing (typically 6-10GB max)
export KAFKA_HEAP_OPTS="-Xmx8g -Xms8g"

# OS-level optimizations
# File descriptor limits: ulimit -n 100000
# Disable swap: vm.swappiness=1

# Network optimization
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.replica.fetchers=4

Failure Scenarios and Recovery

Broker Failure Handling:

1. BROKER FAILURE DETECTED
   ├── Controller notices missed heartbeat
   ├── Controller removes broker from cluster
   └── Triggers partition leader re-election

2. PARTITION LEADER RE-ELECTION
   ├── Controller selects new leader from ISR
   ├── Updates metadata in all brokers
   ├── Notifies clients of leadership change
   └── New leader starts accepting writes

3. FAILED BROKER RECOVERY
   ├── Restart broker process
   ├── Broker rejoins cluster
   ├── Catches up missing data from leaders
   └── Becomes eligible for leadership again

Controller Election:

Controller Failure → New Controller Election
├── All brokers try to become controller
├── First to create /controller znode wins
├── New controller reads cluster state
├── Initiates leader election for affected partitions
└── Updates metadata across cluster

Cluster Expansion and Maintenance

Adding New Brokers:

# 1. Start new broker with unique broker.id
# 2. Create partition reassignment plan
kafka-reassign-partitions --generate \
  --topics-to-move-json-file topics-to-move.json \
  --broker-list "1,2,3,4" # including new broker

# 3. Execute reassignment
kafka-reassign-partitions --execute \
  --reassignment-json-file reassignment-plan.json

# 4. Verify completion
kafka-reassign-partitions --verify \
  --reassignment-json-file reassignment-plan.json

Rolling Restarts (Zero Downtime):

# For each broker:
# 1. Drain leadership (optional)
kafka-leader-election --election-type preferred \
  --all-topic-partitions

# 2. Stop broker gracefully
kafka-server-stop

# 3. Apply configuration changes
# 4. Start broker
kafka-server-start server.properties

# 5. Verify broker rejoined and caught up
# 6. Proceed to next broker

Interview Scenarios

“How does Kafka handle a broker failure?”

  1. Controller detects failure via missed heartbeats
  2. Failed broker removed from ISR for its partitions
  3. New leaders elected from remaining ISR replicas
  4. Clients automatically discover new leaders
  5. Failed broker rejoins when recovered

“What happens if the controller fails?”

  1. New controller election among remaining brokers
  2. New controller reads full cluster state
  3. Continues managing leader elections and metadata
  4. Brief pause in metadata operations during election

See Also: Replication & Consistency, Operational Considerations


Producer Architecture

Producer Request Flow

The Kafka producer is responsible for publishing messages to topics with configurable reliability, performance, and ordering guarantees.

PRODUCER INTERNAL FLOW:

Application Code


┌─────────────────┐
│   send() API    │
│                 │
└─────────┬───────┘


┌─────────────────┐    ┌─────────────────┐
│   Serializer    │───▶│   Partitioner   │
│ (key & value)   │    │ (select target  │
│                 │    │  partition)     │
└─────────────────┘    └─────────┬───────┘


┌─────────────────┐    ┌─────────────────┐
│  Record Buffer  │◄───│  Record Batch   │
│ (per partition) │    │  (accumulator)  │
│                 │    │                 │
└─────────┬───────┘    └─────────────────┘


┌─────────────────┐    ┌─────────────────┐
│   Sender Thread │───▶│   Network I/O   │
│ (background)    │    │  (to brokers)   │
│                 │    │                 │
└─────────────────┘    └─────────────────┘

Batching and Performance Optimization

Batch Configuration Strategy:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");

// Batching controls
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);        // 32KB batches
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);            // Wait 10ms for batching
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);  // 64MB total buffer

// Compression (reduces network I/O)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");  // or snappy, gzip

// Parallel requests per connection
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

Batching Mechanics:

PRODUCER BATCHING TIMELINE:

T=0ms:    [msg1] ──┐
T=2ms:    [msg2] ──┤
T=5ms:    [msg3] ──┼─► Batch (3 messages)
T=8ms:    [msg4] ──┤
T=10ms:   [msg5] ──┘   │
          │            │
          │            ▼
          │    ┌─────────────────┐
          │    │ Send when:      │
          │    │ • batch.size OR │
          │    │ • linger.ms OR  │
          │    │ • buffer full   │
          │    └─────────────────┘

T=10ms:   ├─► NETWORK SEND (batch of 5)

Acknowledgment Strategies and Reliability

ACK Configuration Levels:

// acks=0: Fire and forget (highest throughput, no durability)
props.put(ProducerConfig.ACKS_CONFIG, "0");

// acks=1: Leader acknowledgment (balanced)
props.put(ProducerConfig.ACKS_CONFIG, "1");

// acks=all: Full ISR acknowledgment (highest durability)
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.MIN_IN_SYNC_REPLICAS, 2); // At broker level

Reliability vs Performance Tradeoffs:

Configuration Matrix:

acks=0     │ High Throughput  │ Data Loss Risk    │ Use: Metrics, logs
acks=1     │ Good Throughput  │ Minimal Loss Risk │ Use: General apps
acks=all   │ Lower Throughput │ No Data Loss      │ Use: Financial, critical

Idempotency and Exactly-Once Semantics

Idempotent Producer Configuration:

// Enables automatic deduplication
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// Idempotency requires:
// acks = all (automatically set)
// max.in.flight.requests.per.connection <= 5 (automatically set)
// retries > 0 (automatically set)

// Transactional producer (for exactly-once)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");

Exactly-Once Producer Pattern:

public class ExactlyOnceProducer {
    private KafkaProducer<String, String> producer;

    public void initTransactions() {
        producer.initTransactions();
    }

    public void sendTransactionally(List<ProducerRecord<String, String>> records) {
        producer.beginTransaction();
        try {
            for (ProducerRecord<String, String> record : records) {
                producer.send(record);
            }
            producer.commitTransaction();
        } catch (Exception e) {
            producer.abortTransaction();
            throw e;
        }
    }
}

Error Handling and Retry Logic

Comprehensive Error Handling:

public class RobustProducer {
    private KafkaProducer<String, String> producer;
    private final int maxRetries = 3;

    public void sendWithRetry(ProducerRecord<String, String> record) {
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                handleSendError(record, exception, 0);
            } else {
                log.info("Message sent to partition {} at offset {}",
                    metadata.partition(), metadata.offset());
            }
        });
    }

    private void handleSendError(ProducerRecord<String, String> record,
                                Exception exception, int attempt) {
        if (exception instanceof RetriableException && attempt < maxRetries) {
            // Automatic retry for retriable exceptions
            scheduleRetry(record, attempt + 1);
        } else if (exception instanceof SerializationException) {
            // Non-retriable: log and possibly send to DLQ
            sendToDeadLetterQueue(record, exception);
        } else {
            // Network timeout, authentication, etc.
            handleNonRetriableError(record, exception);
        }
    }
}

Producer Configuration for Reliability:

// Retry configuration
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 300000); // 5 minutes total

// Request timeout (per request)
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); // 30 seconds

// Connection and metadata
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 50);
props.put(ProducerConfig.METADATA_MAX_AGE_MS_CONFIG, 300000); // Refresh topology

Message Ordering Guarantees

Ordering Configuration:

// For strict ordering within partition:
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// Key-based routing ensures related messages go to same partition
ProducerRecord<String, String> record =
    new ProducerRecord<>("orders", customerId, orderData);

Order Preservation Scenarios:

Scenario 1: Single in-flight request
├── msg1 sent → success → msg2 sent → success
└── Order preserved ✓

Scenario 2: Multiple in-flight requests (max.in.flight > 1)
├── msg1 sent ─┐
├── msg2 sent ─┼─ without idempotency
└── msg3 sent ─┘   may reorder on retry ✗

Scenario 3: Multiple in-flight with idempotency
├── msg1 sent ─┐
├── msg2 sent ─┼─ producer sequence numbers
└── msg3 sent ─┘   preserve order ✓

Interview Questions and Answers

“How do you ensure exactly-once semantics?”

  1. Enable idempotent producer (enable.idempotence=true)
  2. Use transactions with transactional.id
  3. Coordinate with exactly-once consumers
  4. Handle duplicates at application level if needed

“What’s the tradeoff between throughput and durability?”

“How do you handle producer failures?”

  1. Configure appropriate retries and timeouts
  2. Implement error callbacks for handling failures
  3. Use circuit breakers for downstream failures
  4. Monitor producer metrics for health

See Also: Topics & Partitions, Replication & Consistency


Consumer Groups & Offsets

Consumer Group Coordination

Consumer groups enable parallel processing and fault tolerance by coordinating multiple consumer instances to share the work of consuming from a topic’s partitions.

CONSUMER GROUP: "order-processors"

Topic: orders (4 partitions)
┌─────────┬─────────┬─────────┬─────────┐
│Part 0   │Part 1   │Part 2   │Part 3   │
└─────────┴─────────┴─────────┴─────────┘
     │        │        │        │
     │        │        │        │
     ▼        ▼        ▼        ▼
┌─────────┬─────────┬─────────┬─────────┐
│Consumer │Consumer │Consumer │Consumer │
│   A     │   B     │   C     │   D     │
└─────────┴─────────┴─────────┴─────────┘

Key Guarantee: Each partition assigned to exactly ONE consumer in group

Partition Assignment Strategies

Range Assignor (Default):

Topic: user-events (7 partitions), Consumer Group: analytics (3 consumers)

Consumer-1: partitions [0, 1, 2]     (3 partitions)
Consumer-2: partitions [3, 4]        (2 partitions)
Consumer-3: partitions [5, 6]        (2 partitions)

Pros: Simple, stable assignments
Cons: Uneven distribution possible

Round-Robin Assignor:

Same setup as above:

Consumer-1: partitions [0, 3, 6]     (3 partitions)
Consumer-2: partitions [1, 4]        (2 partitions)
Consumer-3: partitions [2, 5]        (2 partitions)

Pros: More even distribution
Cons: Less locality, more rebalancing

Sticky Assignor (Recommended):

Minimizes partition movement during rebalancing
Maintains assignments when possible
Reduces rebalancing overhead

Configuration:
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    "org.apache.kafka.clients.consumer.StickyAssignor");

Offset Management Strategies

Automatic Offset Management:

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // 5 seconds

// Offset commit timing options:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // or "latest"

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("user-events"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record); // Process message
        // Offset automatically committed every 5 seconds
    }
}

Manual Offset Management (Recommended for Production):

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Disable auto-commit

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("user-events"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        try {
            processRecord(record); // Process message

            // Option 1: Commit after each message (lower throughput)
            consumer.commitSync();

        } catch (Exception e) {
            handleProcessingError(record, e);
            // Don't commit offset on error - message will be retried
        }
    }

    // Option 2: Commit batch of messages (higher throughput)
    try {
        consumer.commitSync(); // Commit all processed messages
    } catch (CommitFailedException e) {
        handleCommitError(e);
    }
}

Rebalancing Protocol and Handling

Rebalancing Triggers:

1. Consumer joins group (new consumer started)
2. Consumer leaves group (graceful shutdown)
3. Consumer fails (heartbeat timeout)
4. Topic metadata changes (partition added)
5. Group coordinator failover

Rebalancing Flow:

REBALANCING PROCESS:

1. GROUP COORDINATOR DETECTION
   ├── Hash(group.id) % num_partitions(__consumer_offsets)
   └── Identifies which broker manages this group

2. JOIN GROUP PHASE
   ├── All consumers send JoinGroup request
   ├── Coordinator selects group leader
   └── Leader receives member list and metadata

3. SYNC GROUP PHASE
   ├── Leader computes partition assignments
   ├── Leader sends assignments to coordinator
   └── Coordinator distributes assignments to members

4. PARTITION ASSIGNMENT
   ├── Consumers start consuming assigned partitions
   └── Previous assignments released

Rebalancing Listener Implementation:

public class RebalanceListener implements ConsumerRebalanceListener {

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        log.info("Partitions revoked: {}", partitions);

        // 1. Finish processing current messages
        finishCurrentWork();

        // 2. Commit current offsets
        try {
            consumer.commitSync();
        } catch (CommitFailedException e) {
            log.error("Failed to commit offsets during rebalance", e);
        }

        // 3. Clean up resources for revoked partitions
        partitions.forEach(this::cleanupPartition);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        log.info("Partitions assigned: {}", partitions);

        // 1. Initialize resources for new partitions
        partitions.forEach(this::initializePartition);

        // 2. Seek to specific offsets if needed
        for (TopicPartition partition : partitions) {
            long savedOffset = getOffsetFromDatabase(partition);
            if (savedOffset >= 0) {
                consumer.seek(partition, savedOffset);
            }
        }
    }
}

// Usage:
consumer.subscribe(Arrays.asList("user-events"), new RebalanceListener());

Consumer Configuration Best Practices

Essential Consumer Settings:

Properties props = new Properties();

// Group and client identification
props.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-processors");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-processor-" + instanceId);

// Offset management
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Session and heartbeat configuration
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);      // 30 seconds
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);   // 10 seconds
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);   // 5 minutes

// Fetch configuration for performance
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);          // 1KB minimum
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);         // 500ms max wait
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB max per partition

Consumer Performance Tuning:

// Large batch processing
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); // Process 1000 records per poll

// Network optimization
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 131072);   // 128KB
props.put(ConsumerConfig.SEND_BUFFER_CONFIG, 131072);      // 128KB

// For high-throughput scenarios
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000);   // Wait for 50KB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100);   // But no more than 100ms

Error Handling and Recovery Patterns

Retry with Dead Letter Queue:

public class RobustConsumer {
    private final KafkaConsumer<String, String> consumer;
    private final KafkaProducer<String, String> dlqProducer;
    private final String dlqTopic;
    private final int maxRetries = 3;

    public void processRecords() {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

        for (ConsumerRecord<String, String> record : records) {
            boolean processed = false;
            int attempts = 0;

            while (!processed && attempts < maxRetries) {
                try {
                    processMessage(record);
                    processed = true;
                } catch (RetriableException e) {
                    attempts++;
                    if (attempts >= maxRetries) {
                        sendToDeadLetterQueue(record, e);
                        processed = true; // Don't retry further
                    } else {
                        waitBeforeRetry(attempts);
                    }
                } catch (NonRetriableException e) {
                    sendToDeadLetterQueue(record, e);
                    processed = true;
                }
            }
        }

        // Commit offsets only after all messages processed
        consumer.commitSync();
    }

    private void sendToDeadLetterQueue(ConsumerRecord<String, String> record, Exception e) {
        Headers headers = record.headers();
        headers.add("error.message", e.getMessage().getBytes());
        headers.add("error.timestamp", String.valueOf(System.currentTimeMillis()).getBytes());

        ProducerRecord<String, String> dlqRecord =
            new ProducerRecord<>(dlqTopic, record.key(), record.value(), headers);

        dlqProducer.send(dlqRecord);
    }
}

Consumer Lag Monitoring

Key Metrics to Track:

# Consumer lag per partition
kafka-consumer-groups --bootstrap-server localhost:9092 \
  --describe --group payment-processors

TOPIC     PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG    CONSUMER-ID
orders    0         1500           1750           250    consumer-1-abc123
orders    1         2300           2310           10     consumer-2-def456
orders    2         3100           3100           0      consumer-3-ghi789

Programmatic Lag Monitoring:

public class ConsumerLagMonitor {
    private final AdminClient adminClient;
    private final String groupId;

    public Map<TopicPartition, Long> getConsumerLag() {
        Map<TopicPartition, Long> lagMap = new HashMap<>();

        // Get consumer group offsets
        Map<TopicPartition, OffsetAndMetadata> consumerOffsets =
            adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();

        // Get log end offsets
        Map<TopicPartition, Long> logEndOffsets =
            adminClient.getConsumer().endOffsets(consumerOffsets.keySet());

        // Calculate lag
        for (TopicPartition tp : consumerOffsets.keySet()) {
            long consumerOffset = consumerOffsets.get(tp).offset();
            long logEndOffset = logEndOffsets.get(tp);
            lagMap.put(tp, logEndOffset - consumerOffset);
        }

        return lagMap;
    }
}

Interview Scenarios

“What happens when a consumer in a group fails?”

  1. Consumer stops sending heartbeats
  2. Group coordinator triggers rebalancing after session timeout
  3. Failed consumer’s partitions redistributed to remaining consumers
  4. Remaining consumers may experience brief pause during rebalancing

“How do you handle slow consumers?”

  1. Increase max.poll.interval.ms for longer processing times
  2. Reduce max.poll.records to process smaller batches
  3. Scale horizontally by adding more consumer instances
  4. Optimize message processing logic

“Explain exactly-once consumption.”

  1. Disable auto-commit (enable.auto.commit=false)
  2. Process message and commit offset in single transaction
  3. Handle idempotency at application level for duplicates
  4. Use external storage for offset management if needed

See Also: Brokers & Clustering, Operational Considerations


Replication and Consistency

Replication Architecture

Kafka provides fault tolerance through partition replication across multiple brokers. Each partition has one leader and multiple followers, ensuring data survives broker failures.

TOPIC: transactions, PARTITION: 0, REPLICATION-FACTOR: 3

┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐
│    BROKER 1     │  │    BROKER 2     │  │    BROKER 3     │
│                 │  │                 │  │                 │
│  ┌───────────┐  │  │  ┌───────────┐  │  │  ┌───────────┐  │
│  │LEADER     │  │  │  │FOLLOWER   │  │  │  │FOLLOWER   │  │
│  │Partition 0│◄─┼──┼──┤Partition 0│  │  │  │Partition 0│  │
│  │[msg1][msg2]│  │  │  │[msg1][msg2]│◄─┼──┼──┤[msg1][msg2]│  │
│  │           │  │  │  │           │  │  │  │           │  │
│  └───────────┘  │  │  └───────────┘  │  │  └───────────┘  │
└─────────────────┘  └─────────────────┘  └─────────────────┘

REPLICATION FLOW:
1. Producer sends to Leader
2. Leader writes to local log
3. Followers fetch from Leader
4. Leader tracks follower progress
5. Acknowledgment sent based on acks setting

In-Sync Replicas (ISR) Management

ISR Definition and Mechanics:

In-Sync Replicas (ISR) = Replicas that are "caught up" with the leader

ISR Criteria:
├── Follower has fetched within replica.lag.time.max.ms (default: 30s)
├── Follower is within replica.lag.max.messages behind leader
└── Follower is actively fetching (not failed)

ISR Changes:
├── Follower falls behind → Removed from ISR
├── Follower catches up → Added back to ISR
├── Leader fails → New leader elected from ISR only

ISR Configuration:

# Broker configuration
replica.lag.time.max.ms=30000           # 30 seconds to stay in ISR
min.insync.replicas=2                   # Minimum ISR size for writes
unclean.leader.election.enable=false    # Only ISR can become leader

# Topic-level override
kafka-configs --alter --entity-type topics --entity-name critical-topic \
  --add-config min.insync.replicas=3

Consistency Guarantees and Trade-offs

Consistency Model:

KAFKA CONSISTENCY LEVELS:

acks=0 (No Durability Guarantee):
├── Producer: Fire and forget
├── Guarantee: None (fastest)
└── Risk: Message loss if leader fails

acks=1 (Leader Confirmation):
├── Producer: Wait for leader write
├── Guarantee: Durable if leader survives
└── Risk: Loss if leader fails before replication

acks=all (ISR Confirmation):
├── Producer: Wait for ISR write confirmation
├── Guarantee: Durable as long as one ISR replica survives
└── Risk: Minimal (configurable via min.insync.replicas)

CAP Theorem in Kafka Context:

Kafka's CAP Theorem Positioning:

PARTITION TOLERANCE: Always maintained
├── Cluster continues operating despite network splits
├── Partitions can be served from different brokers
└── Automatic failover and recovery

CONSISTENCY vs AVAILABILITY Trade-off:
├── High Consistency: acks=all + min.insync.replicas > 1
│   • Ensures strong durability
│   • May sacrifice availability during broker failures

└── High Availability: acks=1 or unclean.leader.election=true
    • Prioritizes uptime over consistency
    • Risk of message loss or ordering issues

Replication Configuration Examples

High Durability Configuration:

# Topic creation for financial transactions
kafka-topics --create \
  --topic financial-transactions \
  --partitions 12 \
  --replication-factor 3 \
  --config min.insync.replicas=3 \
  --config unclean.leader.election.enable=false

# Producer configuration
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

High Performance Configuration:

# Topic for metrics/logging
kafka-topics --create \
  --topic application-metrics \
  --partitions 24 \
  --replication-factor 2 \
  --config min.insync.replicas=1

# Producer configuration
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);

Failure Scenarios and Recovery

Leader Failure Recovery:

SCENARIO: Leader Broker Fails

Before Failure:
├── Broker-1: Leader (Partition 0)
├── Broker-2: Follower (Partition 0, ISR)
└── Broker-3: Follower (Partition 0, ISR)

Failure Sequence:
1. Broker-1 becomes unreachable
2. Controller detects failure (missed heartbeats)
3. Controller triggers leader election
4. New leader selected from ISR (Broker-2 or Broker-3)
5. Metadata updated across cluster
6. Clients discover new leader
7. Normal operation resumes

Recovery Time:
├── Detection: ~session.timeout.ms (6s default)
├── Election: ~few hundred milliseconds
└── Client Discovery: ~metadata.max.age.ms (5min default)

Split-Brain Prevention:

Kafka prevents split-brain through:

1. SINGLE CONTROLLER MODEL
   ├── Only one controller per cluster
   ├── Controller election via ZooKeeper/KRaft
   └── Controller manages all leadership decisions

2. ISR-BASED ELECTIONS
   ├── Only ISR replicas eligible for leadership
   ├── Prevents stale replicas from becoming leaders
   └── Maintains data consistency

3. FENCING MECHANISMS
   ├── Epoch numbers for leaders
   ├── Stale leaders reject writes
   └── Clients automatically discover current leader

Cross-Datacenter Replication

MirrorMaker 2.0 Architecture:

PRIMARY DATACENTER (US-EAST)     SECONDARY DATACENTER (US-WEST)
┌─────────────────────────┐      ┌─────────────────────────┐
│   KAFKA CLUSTER A       │      │   KAFKA CLUSTER B       │
│                         │      │                         │
│  Topic: user-events     │      │  Topic: us-east.user-   │
│  ├── Partition 0        │────► │  events                 │
│  ├── Partition 1        │      │  ├── Partition 0        │
│  └── Partition 2        │      │  ├── Partition 1        │
│                         │      │  └── Partition 2        │
│                         │      │                         │
│                         │      │  MirrorMaker 2.0        │
│                         │      │  Connectors             │
└─────────────────────────┘      └─────────────────────────┘

MirrorMaker Configuration:

# Source and target clusters
clusters=primary,secondary
primary.bootstrap.servers=kafka1-east:9092,kafka2-east:9092
secondary.bootstrap.servers=kafka1-west:9092,kafka2-west:9092

# Replication flows
primary->secondary.enabled=true
primary->secondary.topics=user-events,transactions,.*-logs

# Topic naming in target cluster
replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy
# Results in: primary.user-events, primary.transactions

# Sync settings
sync.topic.acls.enabled=true
sync.topic.configs.enabled=true
emit.checkpoints.enabled=true  # For exactly-once

Monitoring Replication Health

Key Replication Metrics:

# ISR size monitoring
kafka-topics --describe --topic critical-topic

Topic: critical-topic
Partition: 0    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3 Healthy
Partition: 1    Leader: 2    Replicas: 2,3,1    Isr: 2,3 Degraded

# Under-replicated partitions
kafka-topics --describe --under-replicated-partitions

# Preferred replica election (rebalancing)
kafka-leader-election --election-type preferred --all-topic-partitions

JMX Metrics for Replication:

// Key metrics to monitor
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions     // Should be 0
kafka.server:type=ReplicaManager,name=PartitionCount               // Partitions per broker
kafka.controller:type=KafkaController,name=OfflinePartitionsCount  // Should be 0
kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs  // Election frequency

Interview Questions

“How does Kafka ensure no data loss?”

  1. Configure acks=all for producers
  2. Set min.insync.replicas > 1 at topic/broker level
  3. Disable unclean.leader.election
  4. Monitor ISR health and handle shrinking ISRs

“What happens if all replicas fail?”

  1. Partition becomes unavailable for reads/writes
  2. If unclean.leader.election=false: Wait for ISR replica to recover
  3. If unclean.leader.election=true: Allow non-ISR replica to become leader (data loss possible)
  4. Monitor and alert on ISR size to prevent this scenario

“Explain Kafka’s consistency model.”

See Also: Brokers & Clustering, Producer Architecture


Performance Characteristics

Throughput and Latency Metrics

Typical Performance Benchmarks:

KAFKA PERFORMANCE CHARACTERISTICS:

Throughput (Single Broker):
├── Sequential writes: ~750 MB/s (millions of messages/sec)
├── Random writes: ~150 MB/s
├── Sequential reads: ~900 MB/s
└── Network-limited: ~125 MB/s per 1Gbps link

Latency (99th percentile):
├── End-to-end: 2-5ms (optimized configuration)
├── Producer acks=1: 1-2ms
├── Producer acks=all: 3-5ms
└── Consumer poll: <1ms

Performance Factors:

THROUGHPUT DRIVERS:
├── Batch size (larger = higher throughput)
├── Compression (reduces network/disk I/O)
├── Replication factor (higher = lower throughput)
├── Number of partitions (more = higher parallelism)
└── Hardware (SSD, network bandwidth, CPU cores)

LATENCY DRIVERS:
├── acks configuration (all > 1 > 0)
├── linger.ms (batching delay)
├── Network RTT between producers/brokers/consumers
├── Disk fsync behavior (OS and hardware)
└── JVM garbage collection pauses

Scaling Patterns and Bottlenecks

Horizontal Scaling Strategy:

SCALING DIMENSIONS:

Partition Scaling:
├── More partitions → More consumer parallelism
├── Guideline: Start with 2-3x expected consumers
├── Max recommended: 4000 partitions per broker
└── Cost: Higher metadata overhead, longer elections

Broker Scaling:
├── More brokers → Distribute partition leadership
├── Pattern: Add brokers then rebalance partitions
├── Benefit: Higher aggregate throughput
└── Cost: Increased operational complexity

Consumer Scaling:
├── Scale consumers up to partition count
├── Beyond partition count = idle consumers
├── Pattern: Auto-scaling based on consumer lag
└── Monitor: Consumer lag per partition

Common Bottlenecks and Solutions:

BottleneckSymptomsSolutions
Producer BatchingLow throughput, high CPUIncrease batch.size, tune linger.ms
Network I/OHigh latency, bandwidth limitsEnable compression, increase buffers
Disk I/OSlow writes, high latencyUse SSD, tune OS page cache
Consumer LagProcessing slower than ingestionScale consumers, optimize processing
ReplicationHigh latency with acks=allOptimize network, tune ISR settings
GC PausesPeriodic latency spikesTune JVM heap, use G1 collector

Configuration Tuning Guide

Producer Performance Tuning:

Properties producerProps = new Properties();

// Throughput optimization
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);        // 64KB batches
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, 20);            // 20ms batching window
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");   // Fast compression
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 134217728);  // 128MB buffer

// Network optimization
producerProps.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072);       // 128KB
producerProps.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 32768);     // 32KB
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

// Reliability vs performance balance
producerProps.put(ProducerConfig.ACKS_CONFIG, "1");                 // Leader ack only
producerProps.put(ProducerConfig.RETRIES_CONFIG, 3);

Consumer Performance Tuning:

Properties consumerProps = new Properties();

// Fetch optimization
consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000);     // 50KB minimum
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100);     // 100ms max wait
consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB max

// Processing optimization
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);     // Larger batches
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // Manual commits

// Network buffers
consumerProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 262144);     // 256KB
consumerProps.put(ConsumerConfig.SEND_BUFFER_CONFIG, 131072);        // 128KB

Broker Performance Tuning:

# server.properties

# Network threads (typically 8-16)
num.network.threads=16
num.io.threads=16

# Socket buffers
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# Log segment configuration
log.segment.bytes=1073741824          # 1GB segments
log.index.size.max.bytes=10485760     # 10MB indexes

# Replication performance
num.replica.fetchers=8                # Parallel fetcher threads
replica.fetch.max.bytes=1048576       # 1MB per fetch

# Compression and cleanup
compression.type=lz4                  # Broker-side compression
log.cleanup.policy=delete
log.retention.check.interval.ms=300000

Hardware Recommendations

CPU Requirements:

BROKER CPU SIZING:
├── Baseline: 8-16 cores per broker
├── Network threads: 1 core per 4 threads
├── I/O threads: 1 core per 2-4 threads
├── Compression: Additional 2-4 cores
└── Scaling: Monitor CPU utilization <70%

Memory Allocation:

MEMORY DISTRIBUTION:
├── JVM Heap: 6-10GB (avoid large heaps)
├── OS Page Cache: 60-70% of total RAM
├── Network buffers: ~1-2GB
└── Example: 64GB machine → 8GB heap, 50GB page cache

Storage Configuration:

DISK RECOMMENDATIONS:
├── Type: SSD strongly recommended for low latency
├── RAID: RAID-10 for performance + redundancy
├── File system: XFS or ext4 with noatime
├── Separate disks: OS, logs, ZooKeeper data
└── Monitoring: Disk utilization, IOPS, latency

Network Specifications:

NETWORK SIZING:
├── Bandwidth: 10Gbps minimum for production
├── Calculation: (ingress + egress × replication_factor) × safety_margin
├── Example: 1GB/s ingress × 3 replicas × 2 margin = 6GB/s needed
└── Monitoring: Network utilization, packet loss

Performance Monitoring and Alerting

Key Performance Metrics:

# Throughput monitoring
kafka-consumer-perf-test --topic test-topic --bootstrap-server localhost:9092

# Producer performance test
kafka-producer-perf-test --topic test-topic --num-records 1000000 \
  --record-size 1024 --throughput 100000

# JMX metrics to monitor
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
kafka.server:type=ReplicaManager,name=LeaderCount

Production Monitoring Dashboard:

Key Metrics to Dashboard:
  - Throughput: Messages/sec, Bytes/sec per topic
  - Latency: 99th percentile produce/consume latency
  - Consumer Lag: Per partition and total
  - Broker Health: CPU, memory, disk usage
  - Replication: ISR shrinking, under-replicated partitions
  - Network: Bandwidth utilization, request rate

Interview Performance Questions

“How do you optimize Kafka for high throughput?”

  1. Increase batch sizes and enable compression
  2. Tune linger.ms for better batching
  3. Scale partitions and consumers horizontally
  4. Use appropriate hardware (SSD, high bandwidth)
  5. Monitor and eliminate bottlenecks systematically

“What causes latency in Kafka?”

  1. Producer batching (linger.ms setting)
  2. Network RTT between components
  3. Disk sync behavior and storage performance
  4. Replication overhead with acks=all
  5. JVM garbage collection pauses

“How do you handle traffic spikes?”

  1. Over-provision partitions for scaling headroom
  2. Implement producer-side backpressure
  3. Use consumer auto-scaling based on lag metrics
  4. Pre-warm page caches during off-peak hours
  5. Monitor leading indicators (queue depth, latency)

See Also: Producer Architecture, Consumer Groups, Operational Considerations


Operational Considerations

Essential Monitoring Metrics

Cluster Health Metrics:

CRITICAL ALERTS (Page immediately):
├── Under-replicated partitions > 0
├── Offline partitions > 0
├── ISR shrinking rate > normal baseline
├── Controller election frequency > normal
└── Any broker down

PERFORMANCE ALERTS (Monitor closely):
├── Consumer lag > SLA threshold
├── Producer error rate > 1%
├── Request latency P99 > SLA
├── Disk usage > 80%
└── JVM heap usage > 70%

Monitoring Implementation:

// JMX metric collection example
public class KafkaMetricsCollector {
    private final MBeanServer server = ManagementFactory.getPlatformMBeanServer();

    public Map<String, Object> getClusterHealth() {
        Map<String, Object> metrics = new HashMap<>();

        // Under-replicated partitions (critical)
        ObjectName underReplicatedName = new ObjectName(
            "kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions");
        metrics.put("underReplicatedPartitions",
            server.getAttribute(underReplicatedName, "Value"));

        // Offline partitions (critical)
        ObjectName offlinePartitionsName = new ObjectName(
            "kafka.controller:type=KafkaController,name=OfflinePartitionsCount");
        metrics.put("offlinePartitions",
            server.getAttribute(offlinePartitionsName, "Value"));

        // Producer request rate
        ObjectName produceRequestName = new ObjectName(
            "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce");
        metrics.put("produceRequestRate",
            server.getAttribute(produceRequestName, "OneMinuteRate"));

        return metrics;
    }
}

Troubleshooting Common Issues

Issue: Consumer Lag Building Up

DIAGNOSIS STEPS:
1. Identify affected consumers:
   kafka-consumer-groups --describe --group my-group

2. Check consumer health:
   ├── Consumer instances running?
   ├── Processing errors in logs?
   ├── Rebalancing frequently?
   └── Network connectivity issues?

3. Check partition distribution:
   ├── Even partition assignment?
   ├── Hot partitions with high traffic?
   └── Consumer instance capacity?

SOLUTIONS:
├── Scale consumers (if < partition count)
├── Optimize message processing logic
├── Increase max.poll.records for batching
├── Add more partitions (requires rebalancing)
└── Monitor GC pauses in consumer JVMs

Issue: Broker Disk Space Exhaustion

IMMEDIATE ACTIONS:
1. Emergency cleanup:
   kafka-log-dirs --describe --bootstrap-server localhost:9092

2. Temporary retention reduction:
   kafka-configs --alter --entity-type topics --entity-name large-topic \
     --add-config retention.ms=3600000  # 1 hour

3. Manual log cleanup:
   # Stop broker gracefully
   # Clean old log segments: rm /var/kafka-logs/topic-partition/00*.log
   # Restart broker

LONG-TERM SOLUTIONS:
├── Implement proper retention policies
├── Set up disk usage monitoring/alerting
├── Use log compaction for key-based topics
├── Archive old data to cold storage
└── Capacity planning for growth

Issue: Frequent Leader Elections

DIAGNOSIS:
├── Network instability between brokers
├── JVM garbage collection pauses
├── Disk I/O latency spikes
├── Incorrect timeout configurations
└── ZooKeeper/KRaft cluster issues

SOLUTIONS:
1. Network optimization:
   ├── Check network latency/packet loss
   ├── Tune socket buffer sizes
   └── Verify DNS resolution speed

2. JVM tuning:
   ├── Use G1 garbage collector
   ├── Tune heap size (6-10GB typical)
   ├── Monitor GC logs for long pauses
   └── Set appropriate GC tuning flags

3. Timeout adjustments:
   replica.lag.time.max.ms=30000      # Increase if needed
   controller.socket.timeout.ms=30000
   request.timeout.ms=30000

Deployment and Configuration Management

Zero-Downtime Deployment Process:

#!/bin/bash
# Rolling update script

BROKERS=("broker1" "broker2" "broker3")
NEW_VERSION="2.8.0"

for broker in "${BROKERS[@]}"; do
    echo "Updating $broker..."

    # 1. Gracefully shut down broker
    ssh $broker "kafka-server-stop.sh"

    # 2. Wait for partition leadership migration
    sleep 30

    # 3. Update Kafka binaries
    ssh $broker "tar -xzf kafka-${NEW_VERSION}.tgz -C /opt/"

    # 4. Start broker with new version
    ssh $broker "kafka-server-start.sh -daemon server.properties"

    # 5. Verify broker rejoined cluster
    kafka-broker-api-versions.sh --bootstrap-server $broker:9092

    # 6. Wait before proceeding to next broker
    sleep 60
done

Configuration Management Best Practices:

# server.properties template with environment variables

broker.id=${BROKER_ID}
listeners=${LISTENERS}
log.dirs=${LOG_DIRS}

# Environment-specific settings
default.replication.factor=${REPLICATION_FACTOR:-3}
min.insync.replicas=${MIN_ISR:-2}

# Security configuration
security.protocol=${SECURITY_PROTOCOL:-PLAINTEXT}
ssl.keystore.location=${SSL_KEYSTORE_PATH}
ssl.truststore.location=${SSL_TRUSTSTORE_PATH}

# Monitoring integration
jmx.port=${JMX_PORT:-9999}

Backup and Disaster Recovery

Backup Strategies:

# 1. Topic configuration backup
kafka-topics --list --bootstrap-server localhost:9092 > topics.txt
while read topic; do
    kafka-configs --describe --entity-type topics --entity-name $topic \
      --bootstrap-server localhost:9092 >> topic-configs.txt
done < topics.txt

# 2. Consumer group offset backup
kafka-consumer-groups --list --bootstrap-server localhost:9092 > groups.txt
while read group; do
    kafka-consumer-groups --describe --group $group \
      --bootstrap-server localhost:9092 >> group-offsets.txt
done < groups.txt

# 3. Metadata backup (ZooKeeper)
zkCli.sh -server zk1:2181 <<EOF
ls /brokers
ls /config
ls /controller
EOF

Cross-Region Disaster Recovery:

DR Setup with MirrorMaker 2.0:

PRIMARY_REGION (us-east):
├── Kafka Cluster (3 brokers)
├── Application producers/consumers
└── MirrorMaker 2.0 → DR_REGION

DR_REGION (us-west):
├── Kafka Cluster (3 brokers)
├── Standby applications (read-only)
├── Mirrored topics: primary.topic-name
└── Offset sync for consumer failover

FAILOVER_PROCESS:
1. Stop applications in primary region
2. Switch DNS/load balancers to DR region
3. Start applications in DR region
4. Begin reverse replication (DR → PRIMARY)

Security Operations

Authentication and Authorization Setup:

# Enable SASL/SCRAM authentication
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256

# SSL configuration
ssl.client.auth=required
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks

# ACL authorization
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:kafka;User:admin

Common Security Operations:

# Create SCRAM user
kafka-configs --alter --add-config 'SCRAM-SHA-256=[password=secret]' \
  --entity-type users --entity-name alice

# Grant ACLs
kafka-acls --add --allow-principal User:alice \
  --operation Read --operation Write --topic payments

# List current ACLs
kafka-acls --list --principal User:alice

# Rotate SSL certificates
# 1. Generate new certificates
# 2. Add to truststore
# 3. Rolling restart with new keystore
# 4. Remove old certificates

Capacity Planning and Growth Management

Capacity Planning Framework:

# Capacity calculation example
def calculate_kafka_capacity(requirements):
    daily_messages = requirements['daily_messages']
    message_size = requirements['avg_message_size_kb']
    retention_days = requirements['retention_days']
    replication_factor = requirements['replication_factor']

    # Storage calculation
    daily_storage_gb = (daily_messages * message_size) / (1024 * 1024)
    total_storage_gb = daily_storage_gb * retention_days * replication_factor

    # Add safety margin
    recommended_storage = total_storage_gb * 1.5

    # Partition calculation
    target_throughput_mb = daily_storage_gb / (24 * 3600) # MB/s
    partition_throughput_mb = 25  # Conservative estimate
    min_partitions = max(target_throughput_mb / partition_throughput_mb,
                        requirements.get('min_consumers', 1))

    return {
        'storage_gb': recommended_storage,
        'partitions': min_partitions,
        'brokers': max(3, min_partitions // 100)  # 100 partitions per broker
    }

Growth Management:

# Adding partitions (irreversible!)
kafka-topics --alter --topic user-events --partitions 20

# Adding brokers and rebalancing
# 1. Start new brokers
# 2. Generate reassignment plan
kafka-reassign-partitions --generate \
  --topics-to-move-json-file all-topics.json \
  --broker-list "1,2,3,4,5" # Include new brokers

# 3. Execute rebalancing (gradual)
kafka-reassign-partitions --execute \
  --reassignment-json-file expand-cluster-reassignment.json \
  --throttle 50000000  # 50MB/s throttle

Interview Operational Questions

“How do you monitor Kafka in production?”

  1. JMX metrics collection (broker, producer, consumer metrics)
  2. Consumer lag monitoring and alerting
  3. Cluster health checks (ISR, offline partitions)
  4. Resource monitoring (CPU, memory, disk, network)
  5. End-to-end latency and throughput monitoring

“Describe your disaster recovery plan for Kafka.”

  1. Cross-region replication with MirrorMaker 2.0
  2. Regular metadata and configuration backups
  3. Automated failover procedures with DNS switching
  4. Consumer offset synchronization for seamless failover
  5. Regular DR testing and runbook maintenance

“How do you handle a failed broker?”

  1. Immediate: Check if partitions are still available via ISR
  2. Short-term: Allow automatic leader election and recovery
  3. Investigation: Identify root cause (hardware, network, configuration)
  4. Recovery: Fix underlying issue and restart broker
  5. Verification: Confirm broker rejoined cluster and replicated data

See Also: Performance Characteristics, Replication & Consistency


Summary and Integration

Key Takeaways

Kafka’s Core Strengths:

  1. High Throughput: Millions of messages per second through batching and sequential I/O
  2. Fault Tolerance: Replication and ISR management ensure data durability
  3. Scalability: Horizontal scaling via partitions and brokers
  4. Durability: Configurable persistence with acks and min.insync.replicas
  5. Ordering: Guarantees within partition boundaries

Critical Design Decisions:

System Design Integration Points

When to Choose Kafka:

Kafka vs Alternatives:

Production Readiness Checklist

✓ Infrastructure:

✓ Configuration:

✓ Monitoring:

✓ Operations:

This knowledge chunk provides the foundation for implementing, operating, and scaling Kafka in production environments while being prepared for senior-level technical discussions and system design interviews.

Total Study Time: 45 minutes Next Steps: Practice system design scenarios, implement hands-on labs, explore advanced topics like exactly-once semantics and stream processing frameworks.