Event Sourcing and CQRS with Kafka
Deep dive into using Kafka for event sourcing and CQRS patterns: event stores, projections, state rebuilding, and architectural patterns
Concepts Covered in This Article
CQRS (Command Query Responsibility Segregation)
AdvancedAn architectural pattern that separates read and write operations into distinct models, optimizing each for its specific use case
Event Sourcing
AdvancedAn architectural pattern that stores all changes to application state as a sequence of events, enabling complete audit trails and time-travel capabilities
Event Sourcing Fundamentals and Kafka Integration
Event Sourcing Core Principles
Event Sourcing stores all changes to application state as a sequence of events rather than storing current state directly. Kafka serves as the durable, scalable event store with natural partitioning and replication:
TRADITIONAL vs EVENT SOURCING:
TRADITIONAL (CRUD): EVENT SOURCING:
┌─────────────────┐ ┌─────────────────┐
│ Current │ │ Event Store │
│ State DB │ │ (Kafka) │
│ │ │ │
│ User: { │ │ UserCreated │
│ id: 123 │ │ EmailChanged │
│ email: new@x │ │ ProfileUpdated │
│ status: VIP │ │ StatusUpgraded │
│ } │ │ ... │
└─────────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ UPDATE user │ │ Rebuild state │
│ SET status=VIP│ │ by replaying │
│ WHERE id=123 │ │ all events │
└─────────────────┘ └─────────────────┘
BENEFITS OF EVENT SOURCING:
├── Complete audit trail (what, when, who, why)
├── Temporal queries (state at any point in time)
├── Natural event-driven architecture
├── Replay and debugging capabilities
├── Multiple projections from same events
└── Append-only storage (performance + reliability)
Kafka as Event Store Architecture
public class KafkaEventStore {
// Topic design for event sourcing
void configureEventStoreTopic() {
Properties topicConfig = new Properties();
// Key partitioning strategy - aggregate ID based
topicConfig.put("partitions", "12"); // Scale based on aggregate count
topicConfig.put("replication.factor", "3");
// Event store specific settings
topicConfig.put("cleanup.policy", "compact,delete"); // Hybrid cleanup
topicConfig.put("retention.ms", "31536000000"); // 1 year retention
topicConfig.put("min.compaction.lag.ms", "86400000"); // 24h before compact
// Ensure ordering and durability
topicConfig.put("min.insync.replicas", "2");
topicConfig.put("unclean.leader.election.enable", "false");
}
// Event schema design
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "eventType")
@JsonSubTypes({
@JsonSubTypes.Type(value = UserCreated.class, name = "UserCreated"),
@JsonSubTypes.Type(value = EmailChanged.class, name = "EmailChanged"),
@JsonSubTypes.Type(value = StatusUpgraded.class, name = "StatusUpgraded")
})
public abstract class DomainEvent {
private final String aggregateId;
private final long aggregateVersion;
private final Instant timestamp;
private final String userId; // Who made the change
// Events are immutable value objects
public abstract String getEventType();
}
// Append events to Kafka
public CompletableFuture<Long> appendEvents(String aggregateId,
List<DomainEvent> events,
long expectedVersion) {
return CompletableFuture.supplyAsync(() -> {
producer.beginTransaction();
try {
long currentVersion = expectedVersion;
for (DomainEvent event : events) {
// Ensure event versioning for optimistic concurrency
event.setAggregateVersion(++currentVersion);
ProducerRecord<String, DomainEvent> record =
new ProducerRecord<>("events", aggregateId, event);
producer.send(record);
}
producer.commitTransaction();
return currentVersion;
} catch (Exception e) {
producer.abortTransaction();
throw new ConcurrencyException("Failed to append events", e);
}
});
}
}
CQRS Architecture Patterns with Kafka
Command and Query Separation
CQRS separates write models (commands) from read models (queries), using Kafka as the event backbone between them:
CQRS WITH KAFKA ARCHITECTURE:
WRITE SIDE (Commands) EVENT BACKBONE READ SIDE (Queries)
┌─────────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Command API │ │ │ │ Query API │
│ ┌─────────────────┐ │ │ KAFKA │ │ ┌─────────────┐ │
│ │CreateUser │ │ │ │ │ │GetUser │ │
│ │UpdateEmail │ │──────events────▶│ Topic: events │───────────▶│ │SearchUsers │ │
│ │UpgradeStatus │ │ │ │ │ │GetUserStats │ │
│ └─────────────────┘ │ │ ┌─────────────┐ │ │ └─────────────┘ │
│ │ │ │Partition 0 │ │ │ │
│ ┌─────────────────┐ │ │ │Partition 1 │ │ │ ┌─────────────┐ │
│ │ Write Model │ │ │ │Partition 2 │ │ │ │ Read Model │ │
│ │ (Domain) │ │ │ └─────────────┘ │ │ │(Projections)│ │
│ │ - User │ │ │ │ │ │- UserView │ │
│ │ - Account │ │ │ Replication │ │ │- StatsView │ │
│ │ - Order │ │ │ Partitioning │ │ │- SearchIdx │ │
│ └─────────────────┘ │ │ Durability │ │ └─────────────┘ │
└─────────────────────┘ └─────────────────┘ └─────────────────┘
Command Handler Implementation
@Component
public class UserCommandHandler {
private final KafkaEventStore eventStore;
private final UserRepository userRepository; // Write-side repository
@CommandHandler
public CompletableFuture<Void> handle(CreateUserCommand command) {
return CompletableFuture.runAsync(() -> {
// Load current state (if exists)
User user = userRepository.findById(command.getUserId())
.orElse(null);
if (user != null) {
throw new UserAlreadyExistsException(command.getUserId());
}
// Create domain events
UserCreatedEvent event = UserCreatedEvent.builder()
.aggregateId(command.getUserId())
.email(command.getEmail())
.timestamp(Instant.now())
.userId(command.getRequesterId())
.build();
// Persist events
eventStore.appendEvents(
command.getUserId(),
List.of(event),
0 // Expected version for new aggregate
).join();
// Update write-side state (optional - can be event-driven)
User newUser = User.builder()
.id(command.getUserId())
.email(command.getEmail())
.version(1)
.build();
userRepository.save(newUser);
});
}
@CommandHandler
public CompletableFuture<Void> handle(UpdateEmailCommand command) {
return CompletableFuture.runAsync(() -> {
// Load current state
User user = userRepository.findById(command.getUserId())
.orElseThrow(() -> new UserNotFoundException(command.getUserId()));
// Business logic validation
if (user.getEmail().equals(command.getNewEmail())) {
return; // No change needed
}
// Generate domain event
EmailChangedEvent event = EmailChangedEvent.builder()
.aggregateId(command.getUserId())
.oldEmail(user.getEmail())
.newEmail(command.getNewEmail())
.timestamp(Instant.now())
.userId(command.getRequesterId())
.build();
// Optimistic concurrency control
try {
eventStore.appendEvents(
command.getUserId(),
List.of(event),
user.getVersion() // Expected version
).join();
// Update write-side state
user.setEmail(command.getNewEmail());
user.setVersion(user.getVersion() + 1);
userRepository.save(user);
} catch (ConcurrencyException e) {
throw new OptimisticLockingException("User was modified concurrently");
}
});
}
}
Event Store Design and Optimization
Partitioning Strategies for Event Stores
public class EventStorePartitioning {
// Strategy 1: Aggregate-based partitioning (recommended)
void configureAggregatePartitioning() {
// Key = aggregateId (e.g., user_123, order_456)
// Ensures all events for an aggregate in same partition
// Guarantees ordering within aggregate
// Enables efficient aggregate reconstruction
String partitionKey = event.getAggregateId();
ProducerRecord<String, DomainEvent> record =
new ProducerRecord<>("events", partitionKey, event);
}
// Strategy 2: Tenant-based partitioning (multi-tenant systems)
void configureTenantPartitioning() {
// Key = tenantId:aggregateId
// Isolates tenant data
// Enables tenant-specific scaling
// Supports data locality requirements
String partitionKey = event.getTenantId() + ":" + event.getAggregateId();
ProducerRecord<String, DomainEvent> record =
new ProducerRecord<>("events", partitionKey, event);
}
// Strategy 3: Time-based partitioning (for massive scale)
void configureTimeBasedPartitioning() {
// Key = aggregateId, but route by time
// Enables time-based compaction and archival
// Better for systems with billions of events
CustomPartitioner timePartitioner = new TimeBasedPartitioner();
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
timePartitioner.getClass());
}
}
Event Store Performance Optimization
public class EventStoreOptimization {
// Batch event appending for performance
public CompletableFuture<Void> appendEventsBatch(
Map<String, List<DomainEvent>> eventsByAggregate) {
return CompletableFuture.runAsync(() -> {
producer.beginTransaction();
try {
// Send all events in single transaction
for (Map.Entry<String, List<DomainEvent>> entry :
eventsByAggregate.entrySet()) {
String aggregateId = entry.getKey();
List<DomainEvent> events = entry.getValue();
for (DomainEvent event : events) {
ProducerRecord<String, DomainEvent> record =
new ProducerRecord<>("events", aggregateId, event);
producer.send(record);
}
}
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
});
}
// Snapshot optimization for large aggregates
@Component
public class SnapshotManager {
public void createSnapshot(String aggregateId, Object aggregateState, long version) {
SnapshotEvent snapshot = SnapshotEvent.builder()
.aggregateId(aggregateId)
.aggregateType(aggregateState.getClass().getSimpleName())
.aggregateState(serialize(aggregateState))
.snapshotVersion(version)
.timestamp(Instant.now())
.build();
// Store snapshot in separate topic with compaction
ProducerRecord<String, SnapshotEvent> record =
new ProducerRecord<>("snapshots", aggregateId, snapshot);
snapshotProducer.send(record);
}
public <T> Optional<AggregateSnapshot<T>> getLatestSnapshot(
String aggregateId, Class<T> aggregateType) {
// Read from compacted snapshots topic
// Latest snapshot automatically available due to log compaction
return snapshotRepository.findLatestSnapshot(aggregateId, aggregateType);
}
}
// Optimized aggregate reconstruction
public <T> T rebuildAggregate(String aggregateId, Class<T> aggregateType) {
// Step 1: Try to load latest snapshot
Optional<AggregateSnapshot<T>> snapshot =
snapshotManager.getLatestSnapshot(aggregateId, aggregateType);
T aggregate;
long fromVersion;
if (snapshot.isPresent()) {
aggregate = snapshot.get().getAggregateState();
fromVersion = snapshot.get().getVersion();
} else {
aggregate = createEmptyAggregate(aggregateType);
fromVersion = 0;
}
// Step 2: Load and apply events since snapshot
List<DomainEvent> eventsSinceSnapshot =
eventStore.getEventsSince(aggregateId, fromVersion);
for (DomainEvent event : eventsSinceSnapshot) {
aggregate = applyEvent(aggregate, event);
}
return aggregate;
}
}
Projection Patterns and View Materialization
Projection Builder Implementation
@Component
public class UserProjectionBuilder {
@KafkaListener(topics = "events", groupId = "user-view-builder")
public void handleUserEvent(DomainEvent event) {
switch (event.getEventType()) {
case "UserCreated" -> handleUserCreated((UserCreatedEvent) event);
case "EmailChanged" -> handleEmailChanged((EmailChangedEvent) event);
case "StatusUpgraded" -> handleStatusUpgraded((StatusUpgradedEvent) event);
default -> log.warn("Unknown event type: {}", event.getEventType());
}
}
private void handleUserCreated(UserCreatedEvent event) {
UserView userView = UserView.builder()
.id(event.getAggregateId())
.email(event.getEmail())
.status(UserStatus.ACTIVE)
.createdAt(event.getTimestamp())
.lastModified(event.getTimestamp())
.version(event.getAggregateVersion())
.build();
userViewRepository.save(userView);
// Update search index
searchIndexer.indexUser(userView);
// Update statistics
statisticsService.incrementUserCount();
}
private void handleEmailChanged(EmailChangedEvent event) {
UserView userView = userViewRepository.findById(event.getAggregateId())
.orElseThrow(() -> new ProjectionInconsistencyException(
"User view not found for email change: " + event.getAggregateId()));
// Idempotency check
if (userView.getVersion() >= event.getAggregateVersion()) {
log.info("Event already processed: {}", event);
return;
}
userView.setEmail(event.getNewEmail());
userView.setLastModified(event.getTimestamp());
userView.setVersion(event.getAggregateVersion());
userViewRepository.save(userView);
// Update search index
searchIndexer.updateUser(userView);
}
}
Multiple Projection Strategies
public class MultipleProjectionManager {
// Strategy 1: Dedicated projection builders per view
@Component
public class UserSearchProjectionBuilder {
@KafkaListener(topics = "events", groupId = "user-search-builder")
public void buildSearchProjection(DomainEvent event) {
// Build search-optimized view
// - Denormalized for fast queries
// - Indexed for full-text search
// - Includes computed fields
}
}
@Component
public class UserAnalyticsProjectionBuilder {
@KafkaListener(topics = "events", groupId = "user-analytics-builder")
public void buildAnalyticsProjection(DomainEvent event) {
// Build analytics-optimized view
// - Aggregated statistics
// - Time-series data
// - Pre-computed metrics
}
}
// Strategy 2: Single builder with multiple outputs
@Component
public class MultiViewProjectionBuilder {
@KafkaListener(topics = "events", groupId = "multi-view-builder")
@Transactional
public void buildMultipleViews(DomainEvent event) {
// Update all projections in single transaction
updateUserDetailsView(event);
updateUserSearchView(event);
updateUserAnalyticsView(event);
updateUserAuditView(event);
}
}
// Strategy 3: Event-driven projection chaining
void configureProjectionChain() {
// Primary projection publishes to derived topic
// Secondary projections consume from derived topic
// Enables complex projection dependencies
// events → user-details-view → user-search-events → search-index
// └─→ user-analytics-view → analytics-events → dashboards
}
}
Consistency and Transaction Patterns
Saga Pattern with Event Sourcing
public class OrderProcessingSaga {
@SagaOrchestrationStart
@EventHandler
public void handle(OrderPlacedEvent event) {
SagaState sagaState = SagaState.builder()
.sagaId(UUID.randomUUID().toString())
.orderId(event.getOrderId())
.userId(event.getUserId())
.amount(event.getAmount())
.status(SagaStatus.STARTED)
.build();
// Step 1: Reserve inventory
ReserveInventoryCommand reserveCommand = ReserveInventoryCommand.builder()
.sagaId(sagaState.getSagaId())
.orderId(event.getOrderId())
.items(event.getItems())
.build();
commandGateway.send(reserveCommand);
// Store saga state as event
SagaStartedEvent sagaEvent = SagaStartedEvent.builder()
.sagaId(sagaState.getSagaId())
.orderId(event.getOrderId())
.step("INVENTORY_RESERVATION")
.build();
eventStore.appendEvent(sagaState.getSagaId(), sagaEvent);
}
@EventHandler
public void handle(InventoryReservedEvent event) {
// Step 2: Process payment
ProcessPaymentCommand paymentCommand = ProcessPaymentCommand.builder()
.sagaId(event.getSagaId())
.orderId(event.getOrderId())
.amount(event.getAmount())
.build();
commandGateway.send(paymentCommand);
}
@EventHandler
public void handle(PaymentProcessedEvent event) {
// Step 3: Confirm order
ConfirmOrderCommand confirmCommand = ConfirmOrderCommand.builder()
.sagaId(event.getSagaId())
.orderId(event.getOrderId())
.build();
commandGateway.send(confirmCommand);
}
@EventHandler
public void handle(PaymentFailedEvent event) {
// Compensating transaction: Release inventory
ReleaseInventoryCommand compensateCommand = ReleaseInventoryCommand.builder()
.sagaId(event.getSagaId())
.orderId(event.getOrderId())
.reason("Payment failed")
.build();
commandGateway.send(compensateCommand);
// Mark saga as failed
SagaFailedEvent sagaFailedEvent = SagaFailedEvent.builder()
.sagaId(event.getSagaId())
.reason("Payment processing failed")
.build();
eventStore.appendEvent(event.getSagaId(), sagaFailedEvent);
}
}
Eventual Consistency Management
@Component
public class ConsistencyManager {
// Pattern 1: Version-based consistency
public void handleEventualConsistency(DomainEvent event) {
try {
updateProjection(event);
} catch (ProjectionUpdateException e) {
// Retry with exponential backoff
scheduleRetry(event, calculateBackoff(e.getAttemptCount()));
}
}
// Pattern 2: Consistency boundary enforcement
public void enforceConsistencyBoundary(String aggregateId, DomainEvent event) {
// Strong consistency within aggregate
// Eventual consistency across aggregates
if (isWithinAggregateBoundary(aggregateId, event)) {
// Synchronous update - strong consistency
updateAggregateState(aggregateId, event);
} else {
// Asynchronous update - eventual consistency
scheduleAsyncUpdate(event);
}
}
// Pattern 3: Compensation tracking
public void trackCompensation(String sagaId, CompensatingAction action) {
CompensationEvent compensationEvent = CompensationEvent.builder()
.sagaId(sagaId)
.action(action)
.status(CompensationStatus.PENDING)
.timestamp(Instant.now())
.build();
eventStore.appendEvent(sagaId, compensationEvent);
}
}
Temporal Queries and Event Replay
Time-Travel Query Implementation
@Service
public class TemporalQueryService {
// Query aggregate state at specific point in time
public <T> T getAggregateStateAt(String aggregateId,
Class<T> aggregateType,
Instant pointInTime) {
// Get all events up to the specified time
List<DomainEvent> historicalEvents = eventStore.getEventsUntil(
aggregateId, pointInTime);
T aggregate = createEmptyAggregate(aggregateType);
// Replay events in chronological order
for (DomainEvent event : historicalEvents) {
aggregate = applyEvent(aggregate, event);
}
return aggregate;
}
// Query state changes over time period
public <T> List<StateSnapshot<T>> getStateEvolution(
String aggregateId,
Class<T> aggregateType,
Instant fromTime,
Instant toTime,
Duration snapshotInterval) {
List<StateSnapshot<T>> evolution = new ArrayList<>();
Instant currentTime = fromTime;
while (!currentTime.isAfter(toTime)) {
T state = getAggregateStateAt(aggregateId, aggregateType, currentTime);
evolution.add(StateSnapshot.<T>builder()
.timestamp(currentTime)
.state(state)
.build());
currentTime = currentTime.plus(snapshotInterval);
}
return evolution;
}
// Audit query - who changed what when
public List<AuditRecord> getAuditTrail(String aggregateId,
Instant fromTime,
Instant toTime) {
List<DomainEvent> events = eventStore.getEventsBetween(
aggregateId, fromTime, toTime);
return events.stream()
.map(event -> AuditRecord.builder()
.aggregateId(aggregateId)
.eventType(event.getEventType())
.timestamp(event.getTimestamp())
.userId(event.getUserId())
.changes(extractChanges(event))
.build())
.collect(toList());
}
}
Event Replay and System Recovery
@Component
public class EventReplayService {
// Rebuild all projections from scratch
public CompletableFuture<Void> rebuildAllProjections() {
return CompletableFuture.runAsync(() -> {
log.info("Starting full projection rebuild");
// Clear all existing projections
projectionRepository.deleteAll();
// Create new consumer group for replay
String replayGroupId = "projection-rebuild-" + System.currentTimeMillis();
try (KafkaConsumer<String, DomainEvent> replayConsumer =
createReplayConsumer(replayGroupId)) {
// Seek to beginning of all partitions
List<TopicPartition> partitions = replayConsumer.partitionsFor("events")
.stream()
.map(info -> new TopicPartition("events", info.partition()))
.collect(toList());
replayConsumer.assign(partitions);
replayConsumer.seekToBeginning(partitions);
// Process all events
long processedEvents = 0;
boolean reachedEnd = false;
while (!reachedEnd) {
ConsumerRecords<String, DomainEvent> records =
replayConsumer.poll(Duration.ofSeconds(1));
if (records.isEmpty()) {
reachedEnd = true;
} else {
for (ConsumerRecord<String, DomainEvent> record : records) {
projectionBuilder.processEvent(record.value());
processedEvents++;
if (processedEvents % 10000 == 0) {
log.info("Processed {} events", processedEvents);
}
}
}
}
log.info("Projection rebuild completed. Processed {} events",
processedEvents);
}
});
}
// Selective replay for specific aggregates
public CompletableFuture<Void> replayAggregateProjections(
List<String> aggregateIds) {
return CompletableFuture.runAsync(() -> {
for (String aggregateId : aggregateIds) {
List<DomainEvent> events = eventStore.getAllEvents(aggregateId);
for (DomainEvent event : events) {
projectionBuilder.processEvent(event);
}
}
});
}
}
Schema Evolution and Versioning
Event Schema Evolution Strategies
public class EventSchemaEvolution {
// Strategy 1: Additive changes (backward compatible)
@JsonIgnoreProperties(ignoreUnknown = true) // Ignore new fields in old consumers
public class UserCreatedEventV2 extends UserCreatedEvent {
private String phoneNumber; // New optional field
private Instant lastLogin; // New optional field
// Old consumers can still process V2 events
// New consumers can process V1 and V2 events
}
// Strategy 2: Event upcasting for breaking changes
@Component
public class EventUpcastingService {
public DomainEvent upcastEvent(DomainEvent event) {
return switch (event.getEventType()) {
case "UserCreatedV1" -> upcastUserCreatedV1ToV2((UserCreatedEventV1) event);
case "EmailChangedV1" -> upcastEmailChangedV1ToV2((EmailChangedEventV1) event);
default -> event; // No upcasting needed
};
}
private UserCreatedEventV2 upcastUserCreatedV1ToV2(UserCreatedEventV1 oldEvent) {
return UserCreatedEventV2.builder()
.aggregateId(oldEvent.getAggregateId())
.email(oldEvent.getEmail())
.phoneNumber(null) // Default value for new field
.lastLogin(null) // Default value for new field
.timestamp(oldEvent.getTimestamp())
.userId(oldEvent.getUserId())
.aggregateVersion(oldEvent.getAggregateVersion())
.build();
}
}
// Strategy 3: Event versioning metadata
public abstract class VersionedDomainEvent {
@JsonProperty("eventVersion")
private final int eventVersion = getEventVersion();
public abstract int getEventVersion();
public boolean isCompatibleWith(int consumerVersion) {
return this.eventVersion <= consumerVersion;
}
}
}
Performance Optimization and Scaling
High-Performance Event Processing
@Component
public class PerformanceOptimizedProjectionBuilder {
private final ExecutorService projectionExecutor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
@KafkaListener(topics = "events",
concurrency = "8") // Parallel processing
public void processEventsConcurrently(List<DomainEvent> events) {
// Group events by aggregate to maintain ordering
Map<String, List<DomainEvent>> eventsByAggregate = events.stream()
.collect(groupingBy(DomainEvent::getAggregateId));
// Process each aggregate's events sequentially, aggregates in parallel
List<CompletableFuture<Void>> futures = eventsByAggregate.entrySet()
.stream()
.map(entry -> CompletableFuture.runAsync(
() -> processAggregateEvents(entry.getKey(), entry.getValue()),
projectionExecutor))
.collect(toList());
// Wait for all aggregates to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();
}
// Batch projection updates for performance
@Transactional
public void processAggregateEvents(String aggregateId, List<DomainEvent> events) {
// Load current projection state
UserView currentView = userViewRepository.findById(aggregateId)
.orElse(null);
// Apply all events in batch
UserView updatedView = currentView;
for (DomainEvent event : events) {
updatedView = applyEventToProjection(updatedView, event);
}
// Single database update for all events
if (updatedView != currentView) {
userViewRepository.save(updatedView);
}
}
}
This comprehensive Event Sourcing/CQRS knowledge chunk provides both strategic architectural guidance and detailed implementation patterns needed for L8_PRINCIPAL level discussions and complex system implementations. The content balances theoretical understanding with practical production patterns essential for enterprise-grade event-driven systems.
See Also: [[Kafka_Transactions]], [[Stream_Processing_Frameworks]], [[Microservices_Saga_Patterns]], [[Advanced_Database_Patterns]]