Skip to content

Offset Management

How distributed messaging systems track consumer progress through partitions using offsets, enabling fault tolerance, exactly-once processing, and replay capabilities

TL;DR

Offsets are monotonically increasing integers that uniquely identify each message’s position within a partition. Consumers track their progress by storing offsets, enabling fault tolerance (resume from last position), replay (restart from any point), and exactly-once semantics (commit offsets transactionally with processing).

Visual Overview

Offset Management Overview

Core Explanation

What is an Offset?

An offset is a unique, sequential 64-bit integer assigned to each message within a partition:

  • Unique per partition: Offset 5 in Partition 0 ≠ Offset 5 in Partition 1
  • Monotonically increasing: Never decreases, always incrementing
  • Permanent: Once assigned, never changes (immutable)
  • Zero-indexed: First message is offset 0
Partition Lifecycle

Offset Semantics: At-Most-Once vs At-Least-Once vs Exactly-Once

At-Most-Once (Commit Before Processing):

At-Most-Once Delivery

At-Least-Once (Commit After Processing):

At-Least-Once Delivery

Exactly-Once (Transactional Commit):

Exactly-Once Delivery

Offset Storage: __consumer_offsets Topic

Internal Kafka Topic:

Consumer Offsets Topic

Coordinator Lookup:

// How consumer finds where to commit offsets
int partition = Math.abs(groupId.hashCode()) % numOffsetsTopicPartitions;
Broker coordinator = findLeader("__consumer_offsets", partition);
// All offset commits for "analytics" group go to same coordinator

Manual vs Automatic Offset Commit

Automatic Commit (Default):

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Every 5 seconds

// Kafka automatically commits offsets in background
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record); // If crash here, might reprocess up to 5 seconds of data
    }
}

⚠️ At-least-once semantics (potential duplicates on crash)

Manual Commit (Precise Control):

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);

        // Commit after EACH message (safest, slowest)
        consumer.commitSync();
    }
}

// OR batch commit (faster, risk multiple reprocessing)
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);
    }
    consumer.commitSync(); // Commit entire batch
}

Async Commit (Performance):

consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        log.error("Commit failed for offsets: " + offsets, exception);
        // Handle commit failure (retry, alert, etc.)
    }
});

Offset Reset Strategies

What happens when no committed offset exists?

// EARLIEST: Start from beginning of partition
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use case: New consumer group wants all historical data

// LATEST (default): Start from end (only new messages)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// Use case: Only care about new events going forward

// NONE: Throw exception if no offset found
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
// Use case: Require explicit offset initialization

Example Scenarios:

Offset Reset Scenarios

Seeking to Specific Offsets

Manual Offset Control:

// Seek to specific offset
consumer.seek(new TopicPartition("user-events", 0), 12345);

// Seek to beginning
consumer.seekToBeginning(Collections.singletonList(
    new TopicPartition("user-events", 0)
));

// Seek to end
consumer.seekToEnd(Collections.singletonList(
    new TopicPartition("user-events", 0)
));

// Seek by timestamp (find offset at specific time)
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
timestampsToSearch.put(
    new TopicPartition("user-events", 0),
    System.currentTimeMillis() - 86400000 // 24 hours ago
);
Map<TopicPartition, OffsetAndTimestamp> offsets =
    consumer.offsetsForTimes(timestampsToSearch);

Use Cases:

  • Debugging: Replay specific time range to reproduce issue
  • Reprocessing: Reprocess data after bug fix
  • Data Recovery: Restore state after data corruption
  • Testing: Start from known offset for integration tests

Tradeoffs

Advantages:

  • ✓ Fault tolerance (resume from last position)
  • ✓ Replay capability (reprocess historical data)
  • ✓ Consumer independence (each group tracks own offsets)
  • ✓ Flexible semantics (at-least-once, at-most-once, exactly-once)

Disadvantages:

  • ✕ Offset commits add latency overhead
  • ✕ Managing manual commits is complex
  • ✕ Duplicate processing with at-least-once semantics
  • ✕ Offset storage can become bottleneck at extreme scale

Real Systems Using This

Kafka (Apache)

  • Implementation: __consumer_offsets compacted topic with 50 partitions
  • Scale: Billions of offset commits per day at LinkedIn
  • Typical Setup: Auto-commit for simple pipelines, manual for critical data

Amazon Kinesis

  • Implementation: DynamoDB table for checkpoint storage (similar to offsets)
  • Scale: Auto-scaling checkpoint storage
  • Typical Setup: Kinesis Client Library (KCL) manages checkpoints automatically

Apache Pulsar

  • Implementation: Managed cursors stored in ledger metadata
  • Scale: Automatic cursor management with acknowledgment tracking
  • Typical Setup: Subscription cursors per consumer group

When to Use Different Commit Strategies

✓ Auto-Commit (Simplicity)

Auto-Commit Strategy

✓ Manual Sync Commit (Safety)

Manual Sync Commit Strategy

✓ Manual Async Commit (Performance)

Manual Async Commit Strategy

✓ Transactional Commit (Exactly-Once)

Transactional Commit Strategy

Interview Application

Common Interview Question 1

Q: “Your consumer processes a message and writes to a database, then crashes before committing the offset. What happens on restart? How do you handle this?”

Strong Answer:

“On restart, the consumer will reprocess the message since the offset wasn’t committed - this is at-least-once semantics. This can cause duplicate database writes. Solutions:

  1. Idempotent processing: Use upsert instead of insert, or add message IDs to detect duplicates
  2. Transactional processing: Use database transactions to commit both the database write and Kafka offset atomically (requires transactional API)
  3. Exactly-once with Kafka transactions: Use isolation.level=read_committed and transactional producer/consumer

For critical systems like payments, I’d use option 2 or 3. For analytics where occasional duplicates are acceptable, option 1 is simpler.”

Why this is good:

  • Identifies the problem (duplicate processing)
  • Provides multiple solutions with tradeoffs
  • Matches solution to use case severity

Common Interview Question 2

Q: “How would you reprocess the last 7 days of data from a Kafka topic?”

Strong Answer:

“I’d use consumer.offsetsForTimes() to find the offset from 7 days ago, then seek to that offset:

long sevenDaysAgo = System.currentTimeMillis() - (7 * 24 * 60 * 60 * 1000);
Map<TopicPartition, Long> timestamps = Map.of(
    new TopicPartition("events", 0), sevenDaysAgo
);
Map<TopicPartition, OffsetAndTimestamp> offsets =
    consumer.offsetsForTimes(timestamps);

offsets.forEach((partition, offsetAndTimestamp) -> {
    consumer.seek(partition, offsetAndTimestamp.offset());
});

Important considerations:

  • Ensure retention period is > 7 days, or data may be deleted
  • Create a NEW consumer group to avoid affecting production consumers
  • Consider data volume (7 days might be terabytes, need parallel consumers)
  • Implement idempotent processing to handle potential duplicates”

Why this is good:

  • Provides working code
  • Addresses retention concerns
  • Thinks about production impact
  • Considers scale and parallelism

Red Flags to Avoid

  • ✕ Confusing offset with message ID or timestamp
  • ✕ Not understanding at-least-once vs exactly-once semantics
  • ✕ Assuming offsets reset to 0 after deletion
  • ✕ Forgetting that offsets are per-partition, not per-topic

Quick Self-Check

Before moving on, can you:

  • Explain what an offset is in 30 seconds?
  • Draw the relationship between offsets, partitions, and consumers?
  • Explain at-least-once vs exactly-once semantics?
  • Describe how to replay data from a specific timestamp?
  • Choose appropriate commit strategy for different use cases?
  • Understand where offsets are stored?

See It In Action

Prerequisites

Used In Systems

  • Real-Time Analytics Pipelines - Offset management for fault tolerance
  • CDC Pipelines - Exactly-once offset commits

Explained In Detail


Next Recommended: Log-Based Storage - Learn how Kafka stores messages using append-only logs

Interview Notes
💼80% of streaming interviews
Interview Relevance
80% of streaming interviews
🏭Prevents data loss at scale
Production Impact
Powers systems at Prevents data loss at scale
Fault tolerance
Performance
Fault tolerance query improvement
📈Historical replay
Scalability
Historical replay