Skip to content

Event Sourcing

An architectural pattern that stores all changes to application state as a sequence of events, enabling complete audit trails and time-travel capabilities

TL;DR

Event Sourcing is an architectural pattern where all changes to application state are stored as a sequence of immutable events rather than updating current state directly. Instead of storing “User email is new@example.com”, you store the sequence of events: UserCreated, EmailChanged(old, new), StatusUpgraded. Current state is derived by replaying these events. This enables complete audit trails, temporal queries, and debugging capabilities.

Visual Overview

CRUD vs Event Sourcing

Core Explanation

What is Event Sourcing?

Event Sourcing is a pattern where:

  1. All changes to application state are captured as events
  2. Events are stored in an append-only log (event store)
  3. Current state is derived by replaying events
  4. Events are immutable and ordered
State Reconstruction

Event Store Design

Event Structure:

// Base event class
public abstract class DomainEvent {
    private final String eventId;        // Unique event identifier
    private final String aggregateId;    // Entity this event applies to
    private final long version;          // Sequence number for aggregate
    private final Instant timestamp;     // When event occurred
    private final String userId;         // Who triggered the event
    private final String eventType;      // Type of event

    // Subclasses define specific event data
    public abstract String getEventType();
}

// Specific event examples
public class UserCreatedEvent extends DomainEvent {
    private final String email;
    private final String name;
    private final String accountType;

    // Event is immutable (final fields, no setters)
}

public class EmailChangedEvent extends DomainEvent {
    private final String oldEmail;
    private final String newEmail;
    private final String changeReason;
}

public class StatusUpgradedEvent extends DomainEvent {
    private final UserStatus oldStatus;
    private final UserStatus newStatus;
    private final String upgradeReason;
}

Event Store Interface:

public interface EventStore {

    // Append new events for an aggregate
    void appendEvents(String aggregateId,
                     List<DomainEvent> events,
                     long expectedVersion);  // Optimistic concurrency

    // Read all events for an aggregate
    List<DomainEvent> getEvents(String aggregateId);

    // Read events since a version
    List<DomainEvent> getEventsSince(String aggregateId, long fromVersion);

    // Read all events since a timestamp (for projections)
    Stream<DomainEvent> getEventsSince(Instant timestamp);
}

Kafka as Event Store:

public class KafkaEventStore implements EventStore {
    private final KafkaProducer<String, DomainEvent> producer;
    private final KafkaConsumer<String, DomainEvent> consumer;

    @Override
    public void appendEvents(String aggregateId,
                            List<DomainEvent> events,
                            long expectedVersion) {

        producer.beginTransaction();

        try {
            long currentVersion = expectedVersion;

            for (DomainEvent event : events) {
                // Set version for optimistic concurrency
                event.setVersion(++currentVersion);

                // Partition by aggregateId ensures ordering
                ProducerRecord<String, DomainEvent> record =
                    new ProducerRecord<>("events", aggregateId, event);

                producer.send(record);
            }

            producer.commitTransaction();

        } catch (Exception e) {
            producer.abortTransaction();
            throw new ConcurrencyException("Version conflict", e);
        }
    }

    @Override
    public List<DomainEvent> getEvents(String aggregateId) {
        List<DomainEvent> events = new ArrayList<>();

        // Seek to beginning of partition for this aggregate
        consumer.subscribe(Arrays.asList("events"));

        // Poll and filter for aggregateId
        while (true) {
            ConsumerRecords<String, DomainEvent> records =
                consumer.poll(Duration.ofMillis(100));

            if (records.isEmpty()) break;

            for (ConsumerRecord<String, DomainEvent> record : records) {
                if (record.key().equals(aggregateId)) {
                    events.add(record.value());
                }
            }
        }

        return events.stream()
            .sorted(Comparator.comparing(DomainEvent::getVersion))
            .collect(Collectors.toList());
    }
}

// Topic configuration for event store
kafka-topics --create \
  --topic events \
  --partitions 12 \
  --replication-factor 3 \
  --config cleanup.policy=compact,delete \  # Hybrid cleanup
  --config retention.ms=31536000000 \       # 1 year retention
  --config min.insync.replicas=2            # Durability

Aggregate Reconstruction

Loading Aggregate State:

public class AggregateRepository<T extends Aggregate> {
    private final EventStore eventStore;

    public T load(String aggregateId, Class<T> aggregateType) {
        // Load all events for aggregate
        List<DomainEvent> events = eventStore.getEvents(aggregateId);

        if (events.isEmpty()) {
            throw new AggregateNotFoundException(aggregateId);
        }

        // Create empty aggregate
        T aggregate = createEmpty(aggregateType);

        // Replay events to rebuild state
        for (DomainEvent event : events) {
            aggregate.apply(event);
        }

        return aggregate;
    }
}

// Example aggregate
public class User extends Aggregate {
    private String userId;
    private String email;
    private UserStatus status;
    private long version;

    // Apply event to update state
    public void apply(DomainEvent event) {
        switch (event.getEventType()) {
            case "UserCreated":
                applyUserCreated((UserCreatedEvent) event);
                break;
            case "EmailChanged":
                applyEmailChanged((EmailChangedEvent) event);
                break;
            case "StatusUpgraded":
                applyStatusUpgraded((StatusUpgradedEvent) event);
                break;
        }
        this.version = event.getVersion();
    }

    private void applyUserCreated(UserCreatedEvent event) {
        this.userId = event.getAggregateId();
        this.email = event.getEmail();
        this.status = UserStatus.REGULAR;
    }

    private void applyEmailChanged(EmailChangedEvent event) {
        this.email = event.getNewEmail();
    }

    private void applyStatusUpgraded(StatusUpgradedEvent event) {
        this.status = event.getNewStatus();
    }
}

Snapshots for Performance

Problem: Replaying 1 million events is slow!

Solution: Periodic snapshots + events since snapshot

public class SnapshotOptimization {

    // Snapshot = Current state at a point in time
    public void createSnapshot(String aggregateId, Object state, long version) {
        Snapshot snapshot = Snapshot.builder()
            .aggregateId(aggregateId)
            .aggregateState(serialize(state))
            .version(version)
            .timestamp(Instant.now())
            .build();

        // Store in separate compacted topic
        snapshotProducer.send(new ProducerRecord<>(
            "snapshots",
            aggregateId,
            snapshot
        ));
    }

    // Load with snapshot optimization
    public User loadUser(String userId) {
        // Step 1: Try to load latest snapshot
        Optional<Snapshot> snapshot = loadLatestSnapshot(userId);

        User user;
        long fromVersion;

        if (snapshot.isPresent()) {
            user = deserialize(snapshot.get().getAggregateState());
            fromVersion = snapshot.get().getVersion();
        } else {
            user = new User();
            fromVersion = 0;
        }

        // Step 2: Load and apply events since snapshot
        List<DomainEvent> events =
            eventStore.getEventsSince(userId, fromVersion);

        for (DomainEvent event : events) {
            user.apply(event);
        }

        // Step 3: Maybe create new snapshot (every 100 events)
        if (user.getVersion() - fromVersion > 100) {
            createSnapshot(userId, user, user.getVersion());
        }

        return user;
    }
}

// Performance comparison:
Without snapshots: Load 1M events = 5 seconds
With snapshots (every 1000 events): Load snapshot + 500 events = 50ms
100x faster!

Temporal Queries (Time Travel)

Query Past State:

public class TemporalQueries {

    // "What was user's email on June 15, 2024?"
    public String getUserEmailAt(String userId, Instant pointInTime) {
        List<DomainEvent> events = eventStore.getEvents(userId);

        User user = new User();

        // Replay events up to point in time
        for (DomainEvent event : events) {
            if (event.getTimestamp().isBefore(pointInTime)) {
                user.apply(event);
            } else {
                break;  // Stop at point in time
            }
        }

        return user.getEmail();
    }

    // "Show me all changes to user between Jan 1 and Jan 31"
    public List<DomainEvent> getChangesInPeriod(
            String userId,
            Instant start,
            Instant end) {

        return eventStore.getEvents(userId).stream()
            .filter(e -> e.getTimestamp().isAfter(start))
            .filter(e -> e.getTimestamp().isBefore(end))
            .collect(Collectors.toList());
    }

    // "Who changed user email on Jan 15?"
    public String whoChangedEmail(String userId, LocalDate date) {
        List<DomainEvent> events = eventStore.getEvents(userId);

        return events.stream()
            .filter(e -> e.getEventType().equals("EmailChanged"))
            .filter(e -> LocalDate.from(e.getTimestamp()).equals(date))
            .findFirst()
            .map(DomainEvent::getUserId)
            .orElse("Unknown");
    }
}

Tradeoffs

Advantages:

  • Complete audit trail (who, what, when, why)
  • Temporal queries (state at any point in time)
  • Event replay for debugging
  • Multiple read models from same events
  • Natural event-driven architecture
  • Append-only storage (high performance)

Disadvantages:

  • Complexity (more code than CRUD)
  • Storage growth (all events forever)
  • Snapshot management overhead
  • Eventual consistency (read models lag)
  • Schema evolution challenges
  • Steep learning curve

Real Systems Using This

Event Store DB

  • Implementation: Purpose-built database for event sourcing
  • Features: Built-in projections, stream processing, subscriptions
  • Scale: Handles millions of events per second
  • Use case: Financial trading, healthcare records

Apache Kafka

  • Implementation: Event store using log-based storage
  • Scale: Trillions of events at LinkedIn
  • Features: Partitioning, replication, compaction, retention
  • Use case: Uber trip events, Amazon order events

AWS EventBridge / DynamoDB Streams

  • Implementation: Managed event streaming services
  • Scale: Auto-scaling to millions of events/sec
  • Features: Event filtering, transformations, integrations
  • Use case: E-commerce order processing, IoT event streams

Banking Systems

  • Implementation: Custom event stores for compliance
  • Requirement: Complete audit trail for SOX, PCI-DSS
  • Retention: 7-10 years for regulatory compliance
  • Use case: Transaction history, account changes

When to Use Event Sourcing

Perfect Use Cases

Financial Systems

Financial Systems

Healthcare Records

Healthcare Records

E-Commerce Orders

E-Commerce Orders

Collaborative Editing

Collaborative Editing

When NOT to Use

Simple CRUD Applications

Simple CRUD Applications

High-Volume Low-Value Events

High-Volume Low-Value Events

External System Integration

External System Integration

Interview Application

Common Interview Question

Q: “Design a banking system that handles money transfers with complete audit trail and ability to investigate fraudulent transactions.”

Strong Answer:

“I’d use event sourcing for complete audit trail and temporal queries:

Architecture:

Events Topic (Kafka):
├── AccountCreated(accountId, owner, initialBalance)
├── MoneyDeposited(accountId, amount, source)
├── MoneyWithdrawn(accountId, amount, destination)
├── MoneyTransferred(fromAccount, toAccount, amount)
└── AccountFrozen(accountId, reason, investigator)

Event Store Configuration:

// Kafka topic for events
topic: bank-events
partitions: 24 (partition by accountId)
replication: 3
retention: 10 years (regulatory requirement)
cleanup.policy: delete (no compaction, keep all events)
min.insync.replicas: 3 (high durability)

Money Transfer Flow:

public void transferMoney(String fromAccount, String toAccount, BigDecimal amount) {
    // Load both accounts
    Account from = aggregateRepo.load(fromAccount);
    Account to = aggregateRepo.load(toAccount);

    // Business logic validation
    if (from.getBalance().compareTo(amount) < 0) {
        throw new InsufficientFundsException();
    }

    // Generate events
    List<DomainEvent> events = List.of(
        new MoneyWithdrawnEvent(fromAccount, amount),
        new MoneyDepositedEvent(toAccount, amount),
        new TransferCompletedEvent(fromAccount, toAccount, amount)
    );

    // Atomic append (transaction)
    eventStore.appendEvents(events);
}

Fraud Investigation:

// "Show all transactions for account in last 30 days"
List<DomainEvent> events = eventStore.getEvents(accountId)
    .filter(e -> e.getTimestamp().isAfter(now().minus(30, DAYS)))
    .collect(toList());

// "What was account balance on Jan 15?"
Account account = replayEventsUntil(accountId, jan15);
BigDecimal balance = account.getBalance();

// "Who authorized this transfer?"
MoneyTransferredEvent event = findEvent(transferId);
String investigator = event.getUserId();

Benefits for Banking:

  • Complete audit trail (SOX, PCI-DSS compliance)
  • Temporal queries for investigations
  • Event replay for testing fraud detection
  • Multiple projections (balance view, transaction history view)
  • Immutable records (cannot be tampered with)

Tradeoff: Higher complexity, but regulatory compliance is mandatory for banking.”

Why this is good:

  • Complete system design
  • Specific configuration values
  • Shows event structure
  • Demonstrates temporal queries
  • Explains business value
  • Addresses regulatory requirements

Red Flags to Avoid

  • Confusing event sourcing with event-driven architecture
  • Not understanding snapshot optimization
  • Thinking events can be deleted (they can’t)
  • Not considering storage growth over time
  • Not explaining how to handle schema evolution

Quick Self-Check

Before moving on, can you:

  • Explain event sourcing in 60 seconds?
  • Draw the difference between CRUD and event sourcing?
  • Describe how to reconstruct aggregate state?
  • Explain why snapshots are needed?
  • Identify when to use vs not use event sourcing?
  • Show how temporal queries work?

Prerequisites

Used In Systems

  • Event-Driven Architectures - Broader architectural pattern
  • CQRS - Often used together with event sourcing

Explained In Detail


Next Recommended: CQRS - Pattern often used with event sourcing

Interview Notes
⭐ Must-Know
💼70% of senior interviews
Interview Relevance
70% of senior interviews
🏭Amazon, Uber, banks
Production Impact
Powers systems at Amazon, Uber, banks
Complete audit trail
Performance
Complete audit trail query improvement
📈Multiple projections
Scalability
Multiple projections