Consumer Groups and Rebalancing
Deep dive into consumer group coordination, partition assignment strategies, rebalancing protocols, and offset management
Consumer Group Fundamentals
Group Coordination Architecture
Consumer groups enable fault-tolerant parallel processing by coordinating multiple consumer instances to share partition consumption:
Coordinator Selection and Responsibilities
Group Coordinator Selection:
// Coordinator determination algorithm
int coordinatorPartition = Math.abs(groupId.hashCode()) % numOffsetsTopicPartitions;
Broker groupCoordinator = findBrokerForPartition("__consumer_offsets", coordinatorPartition);
// Example:
// groupId="payment-processors" → hash=1823456
// __consumer_offsets has 50 partitions
// coordinator partition = 1823456 % 50 = 6
// Group coordinator = broker hosting __consumer_offsets partition 6
Coordinator Responsibilities:
Consumer Lifecycle and State Management
public class ConsumerLifecycleExample {
private final KafkaConsumer<String, String> consumer;
private final String groupId = "order-processors";
public void startConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30s
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // 10s
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5min
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders", "payments"));
// Consumer state transitions:
// 1. UNASSIGNED → Joining group
// 2. PREPARING_REBALANCE → Rebalancing triggered
// 3. COMPLETING_REBALANCE → Receiving assignments
// 4. STABLE → Normal processing
}
}
Partition Assignment Strategies
Assignor Algorithm Deep Dive
Range Assignor (Default):
// Range assignment distributes consecutive partitions
public class RangeAssignorExample {
void demonstrateRangeAssignment() {
// Topic: orders (7 partitions)
// Consumers: 3 members
// Result:
// Consumer-1: [0, 1, 2] (3 partitions)
// Consumer-2: [3, 4] (2 partitions)
// Consumer-3: [5, 6] (2 partitions)
// Pros: Simple, predictable
// Cons: Uneven distribution with multiple topics
}
}
Sticky Assignor (Recommended):
public class StickyAssignorBenefits {
void demonstrateStickiness() {
// Before rebalancing:
// Consumer-A: [0, 1, 2]
// Consumer-B: [3, 4, 5]
// Consumer-C: [6, 7, 8]
// Consumer-B fails:
// Sticky Assignor result:
// Consumer-A: [0, 1, 2, 4] ← keeps original + gets 1 from failed
// Consumer-C: [6, 7, 8, 3, 5] ← keeps original + gets remaining
// Benefits:
// ├── Minimizes partition movement
// ├── Preserves consumer-local state/caches
// ├── Faster rebalancing completion
// └── Better performance during member changes
}
}
Custom Assignor Implementation
public class GeographicAssignor implements ConsumerPartitionAssignor {
@Override
public String name() {
return "geographic";
}
@Override
public Map<String, List<TopicPartition>> assign(
Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> assignments = new HashMap<>();
for (String memberId : subscriptions.keySet()) {
// Extract geographic region from consumer metadata
ConsumerPartitionAssignor.Subscription subscription = subscriptions.get(memberId);
ByteBuffer userData = subscription.userData();
String region = extractRegion(userData); // us-east, us-west, eu-west
// Assign partitions based on geographic affinity
List<TopicPartition> assignedPartitions = new ArrayList<>();
for (Map.Entry<String, Integer> topicEntry : partitionsPerTopic.entrySet()) {
String topic = topicEntry.getKey();
int numPartitions = topicEntry.getValue();
// Custom logic: assign partitions based on region
List<Integer> regionPartitions = getPartitionsForRegion(topic, numPartitions, region);
for (Integer partition : regionPartitions) {
assignedPartitions.add(new TopicPartition(topic, partition));
}
}
assignments.put(memberId, assignedPartitions);
}
return assignments;
}
private List<Integer> getPartitionsForRegion(String topic, int numPartitions, String region) {
// Example: us-east gets partitions 0-2, us-west gets 3-5, eu-west gets 6-8
switch (region) {
case "us-east": return IntStream.range(0, numPartitions/3).boxed().collect(toList());
case "us-west": return IntStream.range(numPartitions/3, 2*numPartitions/3).boxed().collect(toList());
case "eu-west": return IntStream.range(2*numPartitions/3, numPartitions).boxed().collect(toList());
default: return Collections.emptyList();
}
}
}
Rebalancing Protocol Mechanics
Rebalancing Trigger Scenarios
Common Rebalancing Triggers:
public class RebalancingTriggers {
// 1. Consumer joins group (scale up)
void consumerJoins() {
// New KafkaConsumer calls subscribe() and starts polling
// Coordinator detects new member via JoinGroup request
// Triggers rebalancing for entire group
}
// 2. Consumer leaves gracefully
void consumerLeaves() {
consumer.close(); // Sends LeaveGroup request
// Immediate rebalancing without waiting for session timeout
}
// 3. Consumer failure (heartbeat timeout)
void consumerFails() {
// Consumer stops sending heartbeats
// Coordinator waits for session.timeout.ms (default: 30s)
// Removes failed member and triggers rebalancing
}
// 4. Partition count change
void partitionAdded() {
// Admin adds partitions: kafka-topics --alter --partitions 12
// All consumers detect metadata change
// Rebalancing redistributes new partitions
}
// 5. Subscription change
void subscriptionChange() {
// Consumer calls subscribe() with different topic list
// Forces rebalancing to handle new subscription
}
}
Rebalancing Protocol Flow
Cooperative Rebalancing (Incremental)
public class CooperativeRebalancingExample {
void configureCooperativeRebalancing() {
Properties props = new Properties();
// Enable cooperative (incremental) rebalancing
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Cooperative rebalancing benefits:
// ├── Only affected partitions pause processing
// ├── Unaffected partitions continue processing
// ├── Significantly reduced downtime
// └── Better for real-time processing systems
}
void cooperativeFlow() {
// Example scenario: Consumer-B fails in 3-consumer group
// Traditional eager rebalancing:
// ├── ALL consumers stop processing
// ├── ALL partitions reassigned
// ├── Downtime: 5-30 seconds
// Cooperative rebalancing:
// ├── Consumer-A continues processing partitions [0,1,2]
// ├── Consumer-C continues processing partitions [6,7,8]
// ├── Only partitions [3,4,5] from failed Consumer-B reassigned
// ├── Minimal downtime: < 1 second
}
}
Rebalancing Optimization Strategies
Static Membership Configuration
public class StaticMembershipOptimization {
void configureStaticMembership() {
Properties props = new Properties();
// Assign unique static member ID
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "processor-pod-1");
// Increase session timeout for static members
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 180000); // 3 minutes
// Static membership benefits:
// ├── Rolling restarts don't trigger rebalancing
// ├── Temporary network issues tolerated better
// ├── Preserves partition assignments across restarts
// └── Reduces rebalancing frequency significantly
}
void handleRollingDeployment() {
// Traditional membership (dynamic):
// Pod restart → consumer leave → rebalance → consumer rejoin → rebalance
// Total rebalances: 2 per pod restart
// Static membership:
// Pod restart → session timeout wait → same consumer rejoins → no rebalance
// Total rebalances: 0 for quick restarts (< session timeout)
// Best practice: Set session timeout > deployment time
}
}
Graceful Shutdown Implementation
public class GracefulShutdownPattern {
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final KafkaConsumer<String, String> consumer;
public void startGracefulConsumer() {
// Register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Starting graceful shutdown...");
shutdown.set(true);
consumer.wakeup(); // Interrupt poll() if blocking
}));
try {
while (!shutdown.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
if (shutdown.get()) {
break; // Stop processing on shutdown signal
}
processRecord(record);
}
// Commit offsets periodically
if (!records.isEmpty()) {
consumer.commitSync();
}
}
} catch (WakeupException e) {
// Expected during shutdown
if (!shutdown.get()) {
throw e;
}
} finally {
try {
// Final offset commit
consumer.commitSync();
} finally {
// Clean shutdown - sends LeaveGroup request
consumer.close();
System.out.println("Consumer closed gracefully");
}
}
}
}
Offset Management Patterns
Advanced Offset Commit Strategies
Manual Offset Management with Error Handling:
public class RobustOffsetManagement {
private final KafkaConsumer<String, String> consumer;
private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
public void processWithManualOffsets() {
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
// Process message
processMessage(record);
// Track successful processing
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
} catch (ProcessingException e) {
// Handle processing failure - don't advance offset
handleProcessingFailure(record, e);
// Option 1: Skip message and continue
// Option 2: Send to dead letter queue
// Option 3: Retry with backoff
}
}
// Batch commit successful offsets
if (!currentOffsets.isEmpty()) {
try {
consumer.commitSync(currentOffsets);
currentOffsets.clear();
} catch (CommitFailedException e) {
// Commit failed - likely due to rebalancing
handleCommitFailure(e);
}
}
}
} finally {
consumer.close();
}
}
}
External Offset Storage Pattern
public class ExternalOffsetStorage {
private final KafkaConsumer<String, String> consumer;
private final OffsetRepository offsetRepository; // Database/Redis
public void processWithExternalOffsets() {
// Disable Kafka's offset management
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumer = new KafkaConsumer<>(props);
// Manual partition assignment (no group coordination)
List<TopicPartition> partitions = Arrays.asList(
new TopicPartition("orders", 0),
new TopicPartition("orders", 1)
);
consumer.assign(partitions);
// Restore offsets from external storage
for (TopicPartition partition : partitions) {
Long storedOffset = offsetRepository.getOffset(partition);
if (storedOffset != null) {
consumer.seek(partition, storedOffset);
} else {
consumer.seekToBeginning(Collections.singletonList(partition));
}
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// Process in transaction with offset storage
processInTransaction(record);
}
}
}
@Transactional
private void processInTransaction(ConsumerRecord<String, String> record) {
// Process business logic
BusinessResult result = processBusinessLogic(record.value());
// Store result in database
businessRepository.save(result);
// Store offset in same transaction
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
offsetRepository.saveOffset(tp, record.offset() + 1);
// Both operations succeed or fail together (exactly-once processing)
}
}
Consumer Group Scaling Patterns
Auto-Scaling Based on Lag
public class ConsumerAutoScaler {
private final AdminClient adminClient;
private final String groupId;
public void monitorAndScale() {
while (true) {
try {
ConsumerGroupLagMetrics lagMetrics = measureConsumerLag();
if (shouldScaleUp(lagMetrics)) {
scaleUp();
} else if (shouldScaleDown(lagMetrics)) {
scaleDown();
}
Thread.sleep(30000); // Check every 30 seconds
} catch (Exception e) {
logger.error("Error in auto-scaling loop", e);
}
}
}
private ConsumerGroupLagMetrics measureConsumerLag() {
// Get consumer group offset information
Map<TopicPartition, OffsetAndMetadata> consumerOffsets =
adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();
// Get latest offsets for all partitions
Set<TopicPartition> partitions = consumerOffsets.keySet();
Map<TopicPartition, Long> latestOffsets = getLatestOffsets(partitions);
// Calculate lag per partition
long totalLag = 0;
long maxLag = 0;
for (TopicPartition tp : partitions) {
long consumerOffset = consumerOffsets.get(tp).offset();
long latestOffset = latestOffsets.get(tp);
long lag = latestOffset - consumerOffset;
totalLag += lag;
maxLag = Math.max(maxLag, lag);
}
return new ConsumerGroupLagMetrics(totalLag, maxLag, partitions.size());
}
private boolean shouldScaleUp(ConsumerGroupLagMetrics metrics) {
// Scale up conditions:
return metrics.getTotalLag() > 100000 || // Total lag > 100k messages
metrics.getMaxLag() > 50000 || // Any partition > 50k lag
metrics.getAvgLag() > 10000; // Average lag > 10k
}
private boolean shouldScaleDown(ConsumerGroupLagMetrics metrics) {
// Scale down conditions (conservative):
return metrics.getTotalLag() < 1000 && // Very low lag
metrics.getMaxLag() < 500 && // All partitions caught up
getCurrentConsumerCount() > getPartitionCount(); // More consumers than partitions
}
}
Partition-to-Consumer Ratio Planning
public class CapacityPlanning {
public ConsumerGroupConfig planConsumerGroup(
long messagesPerSecond,
int processingTimeMs,
int targetLagSeconds) {
// Calculate processing capacity per consumer
double messagesPerConsumerPerSecond = 1000.0 / processingTimeMs;
// Calculate required consumers for throughput
int consumersForThroughput = (int) Math.ceil(messagesPerSecond / messagesPerConsumerPerSecond);
// Calculate required consumers for lag target
long maxAcceptableLag = messagesPerSecond * targetLagSeconds;
int consumersForLag = (int) Math.ceil(messagesPerSecond * processingTimeMs / 1000.0);
// Take maximum of throughput and lag requirements
int recommendedConsumers = Math.max(consumersForThroughput, consumersForLag);
// Partitions should be multiple of consumer count for even distribution
int recommendedPartitions = Math.max(recommendedConsumers,
roundUpToNearestMultiple(recommendedConsumers, 12)); // 12 for future scaling
return new ConsumerGroupConfig(recommendedConsumers, recommendedPartitions);
}
// Example calculations:
void examplePlanning() {
// Scenario: Order processing system
// 1000 messages/sec, 50ms processing time, 30s lag target
ConsumerGroupConfig config = planConsumerGroup(1000, 50, 30);
// Results:
// messagesPerConsumerPerSecond = 20 (1000ms / 50ms)
// consumersForThroughput = 50 (1000 / 20)
// consumersForLag = 50 (1000 * 50 / 1000)
// recommendedConsumers = 50
// recommendedPartitions = 60 (for future scaling headroom)
}
}
Monitoring and Observability
Consumer Group Health Metrics
public class ConsumerGroupMonitoring {
public ConsumerGroupHealth assessGroupHealth() {
return ConsumerGroupHealth.builder()
.lagMetrics(measureLag())
.rebalanceFrequency(measureRebalanceFrequency())
.memberStability(measureMemberStability())
.processingRate(measureProcessingRate())
.build();
}
private ConsumerLagMetrics measureLag() {
// Key lag metrics to monitor:
return ConsumerLagMetrics.builder()
.totalLag(getTotalLag()) // Sum across all partitions
.maxLag(getMaxLag()) // Highest lag partition
.avgLag(getAverageLag()) // Average across partitions
.lagTrend(getLagTrend()) // Increasing/decreasing
.lagBeyondThreshold(getLagBeyondThreshold()) // Partitions > SLA
.build();
}
private RebalanceMetrics measureRebalanceFrequency() {
// Rebalancing health indicators:
return RebalanceMetrics.builder()
.rebalancesPerHour(getRebalancesPerHour())
.avgRebalanceDuration(getAvgRebalanceDuration())
.rebalanceSuccessRate(getRebalanceSuccessRate())
.timeSpentRebalancing(getTimeSpentRebalancing()) // % of time
.build();
}
void setupAlerting() {
// Critical alerts:
alertIf("Consumer lag > SLA", () -> getMaxLag() > 50000);
alertIf("Frequent rebalancing", () -> getRebalancesPerHour() > 4);
alertIf("Consumer group empty", () -> getActiveConsumers() == 0);
alertIf("Processing stopped", () -> getProcessingRate() == 0);
// Warning alerts:
alertIf("High rebalance duration", () -> getAvgRebalanceDuration() > 30000);
alertIf("Member instability", () -> getMemberChurnRate() > 0.1);
alertIf("Uneven partition distribution", () -> getPartitionDistributionSkew() > 0.3);
}
}
This comprehensive knowledge chunk provides both interview-ready concepts and production-ready implementation patterns for Kafka consumer groups and rebalancing. The content balances theoretical understanding with practical operational knowledge needed for senior-level discussions and implementations.
See Also: [[Producer_Mechanics]], [[Kafka_Transactions]], [[Stream_Processing_Frameworks]], [[Multi_Region_Patterns]]