Consumer Groups and Rebalancing
Deep dive into consumer group coordination, partition assignment strategies, rebalancing protocols, and offset management
Concepts Covered in This Article
Consumer Groups
IntermediateHow multiple consumers coordinate to process partitions in parallel with fault tolerance, automatic rebalancing, and exactly-once guarantees
Offset Management
IntermediateHow distributed messaging systems track consumer progress through partitions using offsets, enabling fault tolerance, exactly-once processing, and replay capabilities
Consumer Group Fundamentals
Group Coordination Architecture
Consumer groups enable fault-tolerant parallel processing by coordinating multiple consumer instances to share partition consumption:
CONSUMER GROUP ARCHITECTURE:
Topic: user-events (4 partitions)
┌─────────┬─────────┬─────────┬─────────┐
│Part 0 │Part 1 │Part 2 │Part 3 │
└─────────┴─────────┴─────────┴─────────┘
│ │ │ │
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┬─────────┬─────────┬─────────┐
│Consumer │Consumer │Consumer │Consumer │
│ A │ B │ C │ D │
└─────────┴─────────┴─────────┴─────────┘
Group: "analytics-processors"
Coordinator: Broker-2 (based on hash(group.id))
KEY GUARANTEES:
├── Each partition assigned to exactly ONE consumer in group
├── Each consumer can handle multiple partitions
├── Automatic rebalancing on member changes
└── Fault tolerance through coordinator failover
Coordinator Selection and Responsibilities
Group Coordinator Selection:
// Coordinator determination algorithm
int coordinatorPartition = Math.abs(groupId.hashCode()) % numOffsetsTopicPartitions;
Broker groupCoordinator = findBrokerForPartition("__consumer_offsets", coordinatorPartition);
// Example:
// groupId="payment-processors" → hash=1823456
// __consumer_offsets has 50 partitions
// coordinator partition = 1823456 % 50 = 6
// Group coordinator = broker hosting __consumer_offsets partition 6
Coordinator Responsibilities:
GROUP COORDINATOR DUTIES:
Member Management:
├── Track group membership
├── Handle join/leave requests
├── Detect failed members via heartbeat
└── Trigger rebalancing when needed
Offset Management:
├── Store committed offsets in __consumer_offsets
├── Handle offset fetch/commit requests
├── Provide offset reset capabilities
└── Manage offset retention
Rebalancing Coordination:
├── Lead rebalancing protocol
├── Collect member metadata
├── Coordinate assignment distribution
└── Ensure consistent state across members
Consumer Lifecycle and State Management
public class ConsumerLifecycleExample {
private final KafkaConsumer<String, String> consumer;
private final String groupId = "order-processors";
public void startConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30s
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // 10s
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5min
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders", "payments"));
// Consumer state transitions:
// 1. UNASSIGNED → Joining group
// 2. PREPARING_REBALANCE → Rebalancing triggered
// 3. COMPLETING_REBALANCE → Receiving assignments
// 4. STABLE → Normal processing
}
}
Partition Assignment Strategies
Assignor Algorithm Deep Dive
Range Assignor (Default):
// Range assignment distributes consecutive partitions
public class RangeAssignorExample {
void demonstrateRangeAssignment() {
// Topic: orders (7 partitions)
// Consumers: 3 members
// Result:
// Consumer-1: [0, 1, 2] (3 partitions)
// Consumer-2: [3, 4] (2 partitions)
// Consumer-3: [5, 6] (2 partitions)
// Pros: Simple, predictable
// Cons: Uneven distribution with multiple topics
}
}
Sticky Assignor (Recommended):
public class StickyAssignorBenefits {
void demonstrateStickiness() {
// Before rebalancing:
// Consumer-A: [0, 1, 2]
// Consumer-B: [3, 4, 5]
// Consumer-C: [6, 7, 8]
// Consumer-B fails:
// Sticky Assignor result:
// Consumer-A: [0, 1, 2, 4] ← keeps original + gets 1 from failed
// Consumer-C: [6, 7, 8, 3, 5] ← keeps original + gets remaining
// Benefits:
// ├── Minimizes partition movement
// ├── Preserves consumer-local state/caches
// ├── Faster rebalancing completion
// └── Better performance during member changes
}
}
Custom Assignor Implementation
public class GeographicAssignor implements ConsumerPartitionAssignor {
@Override
public String name() {
return "geographic";
}
@Override
public Map<String, List<TopicPartition>> assign(
Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> assignments = new HashMap<>();
for (String memberId : subscriptions.keySet()) {
// Extract geographic region from consumer metadata
ConsumerPartitionAssignor.Subscription subscription = subscriptions.get(memberId);
ByteBuffer userData = subscription.userData();
String region = extractRegion(userData); // us-east, us-west, eu-west
// Assign partitions based on geographic affinity
List<TopicPartition> assignedPartitions = new ArrayList<>();
for (Map.Entry<String, Integer> topicEntry : partitionsPerTopic.entrySet()) {
String topic = topicEntry.getKey();
int numPartitions = topicEntry.getValue();
// Custom logic: assign partitions based on region
List<Integer> regionPartitions = getPartitionsForRegion(topic, numPartitions, region);
for (Integer partition : regionPartitions) {
assignedPartitions.add(new TopicPartition(topic, partition));
}
}
assignments.put(memberId, assignedPartitions);
}
return assignments;
}
private List<Integer> getPartitionsForRegion(String topic, int numPartitions, String region) {
// Example: us-east gets partitions 0-2, us-west gets 3-5, eu-west gets 6-8
switch (region) {
case "us-east": return IntStream.range(0, numPartitions/3).boxed().collect(toList());
case "us-west": return IntStream.range(numPartitions/3, 2*numPartitions/3).boxed().collect(toList());
case "eu-west": return IntStream.range(2*numPartitions/3, numPartitions).boxed().collect(toList());
default: return Collections.emptyList();
}
}
}
Rebalancing Protocol Mechanics
Rebalancing Trigger Scenarios
Common Rebalancing Triggers:
public class RebalancingTriggers {
// 1. Consumer joins group (scale up)
void consumerJoins() {
// New KafkaConsumer calls subscribe() and starts polling
// Coordinator detects new member via JoinGroup request
// Triggers rebalancing for entire group
}
// 2. Consumer leaves gracefully
void consumerLeaves() {
consumer.close(); // Sends LeaveGroup request
// Immediate rebalancing without waiting for session timeout
}
// 3. Consumer failure (heartbeat timeout)
void consumerFails() {
// Consumer stops sending heartbeats
// Coordinator waits for session.timeout.ms (default: 30s)
// Removes failed member and triggers rebalancing
}
// 4. Partition count change
void partitionAdded() {
// Admin adds partitions: kafka-topics --alter --partitions 12
// All consumers detect metadata change
// Rebalancing redistributes new partitions
}
// 5. Subscription change
void subscriptionChange() {
// Consumer calls subscribe() with different topic list
// Forces rebalancing to handle new subscription
}
}
Rebalancing Protocol Flow
EAGER REBALANCING PROTOCOL:
Phase 1: PREPARING_REBALANCE
┌─────────────────────────────────────────────────────────────┐
│ Coordinator broadcasts rebalance to all members │
│ ├── All consumers stop processing │
│ ├── Consumers commit current offsets │
│ ├── Consumers revoke assigned partitions │
│ └── Consumers send JoinGroup requests │
└─────────────────────────────────────────────────────────────┘
│
▼
Phase 2: COMPLETING_REBALANCE
┌─────────────────────────────────────────────────────────────┐
│ Coordinator selects group leader from members │
│ ├── Leader receives member list and metadata │
│ ├── Leader computes new partition assignments │
│ ├── Leader sends assignments back to coordinator │
│ ├── Coordinator distributes assignments to all members │
│ └── All members start consuming assigned partitions │
└─────────────────────────────────────────────────────────────┘
DOWNTIME: Complete stop during rebalancing (seconds to minutes)
Cooperative Rebalancing (Incremental)
public class CooperativeRebalancingExample {
void configureCooperativeRebalancing() {
Properties props = new Properties();
// Enable cooperative (incremental) rebalancing
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Cooperative rebalancing benefits:
// ├── Only affected partitions pause processing
// ├── Unaffected partitions continue processing
// ├── Significantly reduced downtime
// └── Better for real-time processing systems
}
void cooperativeFlow() {
// Example scenario: Consumer-B fails in 3-consumer group
// Traditional eager rebalancing:
// ├── ALL consumers stop processing
// ├── ALL partitions reassigned
// ├── Downtime: 5-30 seconds
// Cooperative rebalancing:
// ├── Consumer-A continues processing partitions [0,1,2]
// ├── Consumer-C continues processing partitions [6,7,8]
// ├── Only partitions [3,4,5] from failed Consumer-B reassigned
// ├── Minimal downtime: < 1 second
}
}
Rebalancing Optimization Strategies
Static Membership Configuration
public class StaticMembershipOptimization {
void configureStaticMembership() {
Properties props = new Properties();
// Assign unique static member ID
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "processor-pod-1");
// Increase session timeout for static members
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 180000); // 3 minutes
// Static membership benefits:
// ├── Rolling restarts don't trigger rebalancing
// ├── Temporary network issues tolerated better
// ├── Preserves partition assignments across restarts
// └── Reduces rebalancing frequency significantly
}
void handleRollingDeployment() {
// Traditional membership (dynamic):
// Pod restart → consumer leave → rebalance → consumer rejoin → rebalance
// Total rebalances: 2 per pod restart
// Static membership:
// Pod restart → session timeout wait → same consumer rejoins → no rebalance
// Total rebalances: 0 for quick restarts (< session timeout)
// Best practice: Set session timeout > deployment time
}
}
Graceful Shutdown Implementation
public class GracefulShutdownPattern {
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final KafkaConsumer<String, String> consumer;
public void startGracefulConsumer() {
// Register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Starting graceful shutdown...");
shutdown.set(true);
consumer.wakeup(); // Interrupt poll() if blocking
}));
try {
while (!shutdown.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
if (shutdown.get()) {
break; // Stop processing on shutdown signal
}
processRecord(record);
}
// Commit offsets periodically
if (!records.isEmpty()) {
consumer.commitSync();
}
}
} catch (WakeupException e) {
// Expected during shutdown
if (!shutdown.get()) {
throw e;
}
} finally {
try {
// Final offset commit
consumer.commitSync();
} finally {
// Clean shutdown - sends LeaveGroup request
consumer.close();
System.out.println("Consumer closed gracefully");
}
}
}
}
Offset Management Patterns
Advanced Offset Commit Strategies
Manual Offset Management with Error Handling:
public class RobustOffsetManagement {
private final KafkaConsumer<String, String> consumer;
private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
public void processWithManualOffsets() {
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
// Process message
processMessage(record);
// Track successful processing
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
} catch (ProcessingException e) {
// Handle processing failure - don't advance offset
handleProcessingFailure(record, e);
// Option 1: Skip message and continue
// Option 2: Send to dead letter queue
// Option 3: Retry with backoff
}
}
// Batch commit successful offsets
if (!currentOffsets.isEmpty()) {
try {
consumer.commitSync(currentOffsets);
currentOffsets.clear();
} catch (CommitFailedException e) {
// Commit failed - likely due to rebalancing
handleCommitFailure(e);
}
}
}
} finally {
consumer.close();
}
}
}
External Offset Storage Pattern
public class ExternalOffsetStorage {
private final KafkaConsumer<String, String> consumer;
private final OffsetRepository offsetRepository; // Database/Redis
public void processWithExternalOffsets() {
// Disable Kafka's offset management
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumer = new KafkaConsumer<>(props);
// Manual partition assignment (no group coordination)
List<TopicPartition> partitions = Arrays.asList(
new TopicPartition("orders", 0),
new TopicPartition("orders", 1)
);
consumer.assign(partitions);
// Restore offsets from external storage
for (TopicPartition partition : partitions) {
Long storedOffset = offsetRepository.getOffset(partition);
if (storedOffset != null) {
consumer.seek(partition, storedOffset);
} else {
consumer.seekToBeginning(Collections.singletonList(partition));
}
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// Process in transaction with offset storage
processInTransaction(record);
}
}
}
@Transactional
private void processInTransaction(ConsumerRecord<String, String> record) {
// Process business logic
BusinessResult result = processBusinessLogic(record.value());
// Store result in database
businessRepository.save(result);
// Store offset in same transaction
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
offsetRepository.saveOffset(tp, record.offset() + 1);
// Both operations succeed or fail together (exactly-once processing)
}
}
Consumer Group Scaling Patterns
Auto-Scaling Based on Lag
public class ConsumerAutoScaler {
private final AdminClient adminClient;
private final String groupId;
public void monitorAndScale() {
while (true) {
try {
ConsumerGroupLagMetrics lagMetrics = measureConsumerLag();
if (shouldScaleUp(lagMetrics)) {
scaleUp();
} else if (shouldScaleDown(lagMetrics)) {
scaleDown();
}
Thread.sleep(30000); // Check every 30 seconds
} catch (Exception e) {
logger.error("Error in auto-scaling loop", e);
}
}
}
private ConsumerGroupLagMetrics measureConsumerLag() {
// Get consumer group offset information
Map<TopicPartition, OffsetAndMetadata> consumerOffsets =
adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();
// Get latest offsets for all partitions
Set<TopicPartition> partitions = consumerOffsets.keySet();
Map<TopicPartition, Long> latestOffsets = getLatestOffsets(partitions);
// Calculate lag per partition
long totalLag = 0;
long maxLag = 0;
for (TopicPartition tp : partitions) {
long consumerOffset = consumerOffsets.get(tp).offset();
long latestOffset = latestOffsets.get(tp);
long lag = latestOffset - consumerOffset;
totalLag += lag;
maxLag = Math.max(maxLag, lag);
}
return new ConsumerGroupLagMetrics(totalLag, maxLag, partitions.size());
}
private boolean shouldScaleUp(ConsumerGroupLagMetrics metrics) {
// Scale up conditions:
return metrics.getTotalLag() > 100000 || // Total lag > 100k messages
metrics.getMaxLag() > 50000 || // Any partition > 50k lag
metrics.getAvgLag() > 10000; // Average lag > 10k
}
private boolean shouldScaleDown(ConsumerGroupLagMetrics metrics) {
// Scale down conditions (conservative):
return metrics.getTotalLag() < 1000 && // Very low lag
metrics.getMaxLag() < 500 && // All partitions caught up
getCurrentConsumerCount() > getPartitionCount(); // More consumers than partitions
}
}
Partition-to-Consumer Ratio Planning
public class CapacityPlanning {
public ConsumerGroupConfig planConsumerGroup(
long messagesPerSecond,
int processingTimeMs,
int targetLagSeconds) {
// Calculate processing capacity per consumer
double messagesPerConsumerPerSecond = 1000.0 / processingTimeMs;
// Calculate required consumers for throughput
int consumersForThroughput = (int) Math.ceil(messagesPerSecond / messagesPerConsumerPerSecond);
// Calculate required consumers for lag target
long maxAcceptableLag = messagesPerSecond * targetLagSeconds;
int consumersForLag = (int) Math.ceil(messagesPerSecond * processingTimeMs / 1000.0);
// Take maximum of throughput and lag requirements
int recommendedConsumers = Math.max(consumersForThroughput, consumersForLag);
// Partitions should be multiple of consumer count for even distribution
int recommendedPartitions = Math.max(recommendedConsumers,
roundUpToNearestMultiple(recommendedConsumers, 12)); // 12 for future scaling
return new ConsumerGroupConfig(recommendedConsumers, recommendedPartitions);
}
// Example calculations:
void examplePlanning() {
// Scenario: Order processing system
// 1000 messages/sec, 50ms processing time, 30s lag target
ConsumerGroupConfig config = planConsumerGroup(1000, 50, 30);
// Results:
// messagesPerConsumerPerSecond = 20 (1000ms / 50ms)
// consumersForThroughput = 50 (1000 / 20)
// consumersForLag = 50 (1000 * 50 / 1000)
// recommendedConsumers = 50
// recommendedPartitions = 60 (for future scaling headroom)
}
}
Monitoring and Observability
Consumer Group Health Metrics
public class ConsumerGroupMonitoring {
public ConsumerGroupHealth assessGroupHealth() {
return ConsumerGroupHealth.builder()
.lagMetrics(measureLag())
.rebalanceFrequency(measureRebalanceFrequency())
.memberStability(measureMemberStability())
.processingRate(measureProcessingRate())
.build();
}
private ConsumerLagMetrics measureLag() {
// Key lag metrics to monitor:
return ConsumerLagMetrics.builder()
.totalLag(getTotalLag()) // Sum across all partitions
.maxLag(getMaxLag()) // Highest lag partition
.avgLag(getAverageLag()) // Average across partitions
.lagTrend(getLagTrend()) // Increasing/decreasing
.lagBeyondThreshold(getLagBeyondThreshold()) // Partitions > SLA
.build();
}
private RebalanceMetrics measureRebalanceFrequency() {
// Rebalancing health indicators:
return RebalanceMetrics.builder()
.rebalancesPerHour(getRebalancesPerHour())
.avgRebalanceDuration(getAvgRebalanceDuration())
.rebalanceSuccessRate(getRebalanceSuccessRate())
.timeSpentRebalancing(getTimeSpentRebalancing()) // % of time
.build();
}
void setupAlerting() {
// Critical alerts:
alertIf("Consumer lag > SLA", () -> getMaxLag() > 50000);
alertIf("Frequent rebalancing", () -> getRebalancesPerHour() > 4);
alertIf("Consumer group empty", () -> getActiveConsumers() == 0);
alertIf("Processing stopped", () -> getProcessingRate() == 0);
// Warning alerts:
alertIf("High rebalance duration", () -> getAvgRebalanceDuration() > 30000);
alertIf("Member instability", () -> getMemberChurnRate() > 0.1);
alertIf("Uneven partition distribution", () -> getPartitionDistributionSkew() > 0.3);
}
}
This comprehensive knowledge chunk provides both interview-ready concepts and production-ready implementation patterns for Kafka consumer groups and rebalancing. The content balances theoretical understanding with practical operational knowledge needed for senior-level discussions and implementations.
See Also: [[Producer_Mechanics]], [[Kafka_Transactions]], [[Stream_Processing_Frameworks]], [[Multi_Region_Patterns]]