Event Sourcing and CQRS with Kafka
Deep dive into using Kafka for event sourcing and CQRS patterns: event stores, projections, state rebuilding, and architectural patterns
Event Sourcing Fundamentals and Kafka Integration
Event Sourcing Core Principles
Event Sourcing stores all changes to application state as a sequence of events rather than storing current state directly. Kafka serves as the durable, scalable event store with natural partitioning and replication:
Kafka as Event Store Architecture
public class KafkaEventStore {
// Topic design for event sourcing
void configureEventStoreTopic() {
Properties topicConfig = new Properties();
// Key partitioning strategy - aggregate ID based
topicConfig.put("partitions", "12"); // Scale based on aggregate count
topicConfig.put("replication.factor", "3");
// Event store specific settings
topicConfig.put("cleanup.policy", "compact,delete"); // Hybrid cleanup
topicConfig.put("retention.ms", "31536000000"); // 1 year retention
topicConfig.put("min.compaction.lag.ms", "86400000"); // 24h before compact
// Ensure ordering and durability
topicConfig.put("min.insync.replicas", "2");
topicConfig.put("unclean.leader.election.enable", "false");
}
// Event schema design
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "eventType")
@JsonSubTypes({
@JsonSubTypes.Type(value = UserCreated.class, name = "UserCreated"),
@JsonSubTypes.Type(value = EmailChanged.class, name = "EmailChanged"),
@JsonSubTypes.Type(value = StatusUpgraded.class, name = "StatusUpgraded")
})
public abstract class DomainEvent {
private final String aggregateId;
private final long aggregateVersion;
private final Instant timestamp;
private final String userId; // Who made the change
// Events are immutable value objects
public abstract String getEventType();
}
// Append events to Kafka
public CompletableFuture<Long> appendEvents(String aggregateId,
List<DomainEvent> events,
long expectedVersion) {
return CompletableFuture.supplyAsync(() -> {
producer.beginTransaction();
try {
long currentVersion = expectedVersion;
for (DomainEvent event : events) {
// Ensure event versioning for optimistic concurrency
event.setAggregateVersion(++currentVersion);
ProducerRecord<String, DomainEvent> record =
new ProducerRecord<>("events", aggregateId, event);
producer.send(record);
}
producer.commitTransaction();
return currentVersion;
} catch (Exception e) {
producer.abortTransaction();
throw new ConcurrencyException("Failed to append events", e);
}
});
}
}
CQRS Architecture Patterns with Kafka
Command and Query Separation
CQRS separates write models (commands) from read models (queries), using Kafka as the event backbone between them:
Command Handler Implementation
@Component
public class UserCommandHandler {
private final KafkaEventStore eventStore;
private final UserRepository userRepository; // Write-side repository
@CommandHandler
public CompletableFuture<Void> handle(CreateUserCommand command) {
return CompletableFuture.runAsync(() -> {
// Load current state (if exists)
User user = userRepository.findById(command.getUserId())
.orElse(null);
if (user != null) {
throw new UserAlreadyExistsException(command.getUserId());
}
// Create domain events
UserCreatedEvent event = UserCreatedEvent.builder()
.aggregateId(command.getUserId())
.email(command.getEmail())
.timestamp(Instant.now())
.userId(command.getRequesterId())
.build();
// Persist events
eventStore.appendEvents(
command.getUserId(),
List.of(event),
0 // Expected version for new aggregate
).join();
// Update write-side state (optional - can be event-driven)
User newUser = User.builder()
.id(command.getUserId())
.email(command.getEmail())
.version(1)
.build();
userRepository.save(newUser);
});
}
@CommandHandler
public CompletableFuture<Void> handle(UpdateEmailCommand command) {
return CompletableFuture.runAsync(() -> {
// Load current state
User user = userRepository.findById(command.getUserId())
.orElseThrow(() -> new UserNotFoundException(command.getUserId()));
// Business logic validation
if (user.getEmail().equals(command.getNewEmail())) {
return; // No change needed
}
// Generate domain event
EmailChangedEvent event = EmailChangedEvent.builder()
.aggregateId(command.getUserId())
.oldEmail(user.getEmail())
.newEmail(command.getNewEmail())
.timestamp(Instant.now())
.userId(command.getRequesterId())
.build();
// Optimistic concurrency control
try {
eventStore.appendEvents(
command.getUserId(),
List.of(event),
user.getVersion() // Expected version
).join();
// Update write-side state
user.setEmail(command.getNewEmail());
user.setVersion(user.getVersion() + 1);
userRepository.save(user);
} catch (ConcurrencyException e) {
throw new OptimisticLockingException("User was modified concurrently");
}
});
}
}
Event Store Design and Optimization
Partitioning Strategies for Event Stores
public class EventStorePartitioning {
// Strategy 1: Aggregate-based partitioning (recommended)
void configureAggregatePartitioning() {
// Key = aggregateId (e.g., user_123, order_456)
// Ensures all events for an aggregate in same partition
// Guarantees ordering within aggregate
// Enables efficient aggregate reconstruction
String partitionKey = event.getAggregateId();
ProducerRecord<String, DomainEvent> record =
new ProducerRecord<>("events", partitionKey, event);
}
// Strategy 2: Tenant-based partitioning (multi-tenant systems)
void configureTenantPartitioning() {
// Key = tenantId:aggregateId
// Isolates tenant data
// Enables tenant-specific scaling
// Supports data locality requirements
String partitionKey = event.getTenantId() + ":" + event.getAggregateId();
ProducerRecord<String, DomainEvent> record =
new ProducerRecord<>("events", partitionKey, event);
}
// Strategy 3: Time-based partitioning (for massive scale)
void configureTimeBasedPartitioning() {
// Key = aggregateId, but route by time
// Enables time-based compaction and archival
// Better for systems with billions of events
CustomPartitioner timePartitioner = new TimeBasedPartitioner();
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
timePartitioner.getClass());
}
}
Event Store Performance Optimization
public class EventStoreOptimization {
// Batch event appending for performance
public CompletableFuture<Void> appendEventsBatch(
Map<String, List<DomainEvent>> eventsByAggregate) {
return CompletableFuture.runAsync(() -> {
producer.beginTransaction();
try {
// Send all events in single transaction
for (Map.Entry<String, List<DomainEvent>> entry :
eventsByAggregate.entrySet()) {
String aggregateId = entry.getKey();
List<DomainEvent> events = entry.getValue();
for (DomainEvent event : events) {
ProducerRecord<String, DomainEvent> record =
new ProducerRecord<>("events", aggregateId, event);
producer.send(record);
}
}
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
});
}
// Snapshot optimization for large aggregates
@Component
public class SnapshotManager {
public void createSnapshot(String aggregateId, Object aggregateState, long version) {
SnapshotEvent snapshot = SnapshotEvent.builder()
.aggregateId(aggregateId)
.aggregateType(aggregateState.getClass().getSimpleName())
.aggregateState(serialize(aggregateState))
.snapshotVersion(version)
.timestamp(Instant.now())
.build();
// Store snapshot in separate topic with compaction
ProducerRecord<String, SnapshotEvent> record =
new ProducerRecord<>("snapshots", aggregateId, snapshot);
snapshotProducer.send(record);
}
public <T> Optional<AggregateSnapshot<T>> getLatestSnapshot(
String aggregateId, Class<T> aggregateType) {
// Read from compacted snapshots topic
// Latest snapshot automatically available due to log compaction
return snapshotRepository.findLatestSnapshot(aggregateId, aggregateType);
}
}
// Optimized aggregate reconstruction
public <T> T rebuildAggregate(String aggregateId, Class<T> aggregateType) {
// Step 1: Try to load latest snapshot
Optional<AggregateSnapshot<T>> snapshot =
snapshotManager.getLatestSnapshot(aggregateId, aggregateType);
T aggregate;
long fromVersion;
if (snapshot.isPresent()) {
aggregate = snapshot.get().getAggregateState();
fromVersion = snapshot.get().getVersion();
} else {
aggregate = createEmptyAggregate(aggregateType);
fromVersion = 0;
}
// Step 2: Load and apply events since snapshot
List<DomainEvent> eventsSinceSnapshot =
eventStore.getEventsSince(aggregateId, fromVersion);
for (DomainEvent event : eventsSinceSnapshot) {
aggregate = applyEvent(aggregate, event);
}
return aggregate;
}
}
Projection Patterns and View Materialization
Projection Builder Implementation
@Component
public class UserProjectionBuilder {
@KafkaListener(topics = "events", groupId = "user-view-builder")
public void handleUserEvent(DomainEvent event) {
switch (event.getEventType()) {
case "UserCreated" -> handleUserCreated((UserCreatedEvent) event);
case "EmailChanged" -> handleEmailChanged((EmailChangedEvent) event);
case "StatusUpgraded" -> handleStatusUpgraded((StatusUpgradedEvent) event);
default -> log.warn("Unknown event type: {}", event.getEventType());
}
}
private void handleUserCreated(UserCreatedEvent event) {
UserView userView = UserView.builder()
.id(event.getAggregateId())
.email(event.getEmail())
.status(UserStatus.ACTIVE)
.createdAt(event.getTimestamp())
.lastModified(event.getTimestamp())
.version(event.getAggregateVersion())
.build();
userViewRepository.save(userView);
// Update search index
searchIndexer.indexUser(userView);
// Update statistics
statisticsService.incrementUserCount();
}
private void handleEmailChanged(EmailChangedEvent event) {
UserView userView = userViewRepository.findById(event.getAggregateId())
.orElseThrow(() -> new ProjectionInconsistencyException(
"User view not found for email change: " + event.getAggregateId()));
// Idempotency check
if (userView.getVersion() >= event.getAggregateVersion()) {
log.info("Event already processed: {}", event);
return;
}
userView.setEmail(event.getNewEmail());
userView.setLastModified(event.getTimestamp());
userView.setVersion(event.getAggregateVersion());
userViewRepository.save(userView);
// Update search index
searchIndexer.updateUser(userView);
}
}
Multiple Projection Strategies
public class MultipleProjectionManager {
// Strategy 1: Dedicated projection builders per view
@Component
public class UserSearchProjectionBuilder {
@KafkaListener(topics = "events", groupId = "user-search-builder")
public void buildSearchProjection(DomainEvent event) {
// Build search-optimized view
// - Denormalized for fast queries
// - Indexed for full-text search
// - Includes computed fields
}
}
@Component
public class UserAnalyticsProjectionBuilder {
@KafkaListener(topics = "events", groupId = "user-analytics-builder")
public void buildAnalyticsProjection(DomainEvent event) {
// Build analytics-optimized view
// - Aggregated statistics
// - Time-series data
// - Pre-computed metrics
}
}
// Strategy 2: Single builder with multiple outputs
@Component
public class MultiViewProjectionBuilder {
@KafkaListener(topics = "events", groupId = "multi-view-builder")
@Transactional
public void buildMultipleViews(DomainEvent event) {
// Update all projections in single transaction
updateUserDetailsView(event);
updateUserSearchView(event);
updateUserAnalyticsView(event);
updateUserAuditView(event);
}
}
// Strategy 3: Event-driven projection chaining
void configureProjectionChain() {
// Primary projection publishes to derived topic
// Secondary projections consume from derived topic
// Enables complex projection dependencies
// events → user-details-view → user-search-events → search-index
// └─→ user-analytics-view → analytics-events → dashboards
}
}
Consistency and Transaction Patterns
Saga Pattern with Event Sourcing
public class OrderProcessingSaga {
@SagaOrchestrationStart
@EventHandler
public void handle(OrderPlacedEvent event) {
SagaState sagaState = SagaState.builder()
.sagaId(UUID.randomUUID().toString())
.orderId(event.getOrderId())
.userId(event.getUserId())
.amount(event.getAmount())
.status(SagaStatus.STARTED)
.build();
// Step 1: Reserve inventory
ReserveInventoryCommand reserveCommand = ReserveInventoryCommand.builder()
.sagaId(sagaState.getSagaId())
.orderId(event.getOrderId())
.items(event.getItems())
.build();
commandGateway.send(reserveCommand);
// Store saga state as event
SagaStartedEvent sagaEvent = SagaStartedEvent.builder()
.sagaId(sagaState.getSagaId())
.orderId(event.getOrderId())
.step("INVENTORY_RESERVATION")
.build();
eventStore.appendEvent(sagaState.getSagaId(), sagaEvent);
}
@EventHandler
public void handle(InventoryReservedEvent event) {
// Step 2: Process payment
ProcessPaymentCommand paymentCommand = ProcessPaymentCommand.builder()
.sagaId(event.getSagaId())
.orderId(event.getOrderId())
.amount(event.getAmount())
.build();
commandGateway.send(paymentCommand);
}
@EventHandler
public void handle(PaymentProcessedEvent event) {
// Step 3: Confirm order
ConfirmOrderCommand confirmCommand = ConfirmOrderCommand.builder()
.sagaId(event.getSagaId())
.orderId(event.getOrderId())
.build();
commandGateway.send(confirmCommand);
}
@EventHandler
public void handle(PaymentFailedEvent event) {
// Compensating transaction: Release inventory
ReleaseInventoryCommand compensateCommand = ReleaseInventoryCommand.builder()
.sagaId(event.getSagaId())
.orderId(event.getOrderId())
.reason("Payment failed")
.build();
commandGateway.send(compensateCommand);
// Mark saga as failed
SagaFailedEvent sagaFailedEvent = SagaFailedEvent.builder()
.sagaId(event.getSagaId())
.reason("Payment processing failed")
.build();
eventStore.appendEvent(event.getSagaId(), sagaFailedEvent);
}
}
Eventual Consistency Management
@Component
public class ConsistencyManager {
// Pattern 1: Version-based consistency
public void handleEventualConsistency(DomainEvent event) {
try {
updateProjection(event);
} catch (ProjectionUpdateException e) {
// Retry with exponential backoff
scheduleRetry(event, calculateBackoff(e.getAttemptCount()));
}
}
// Pattern 2: Consistency boundary enforcement
public void enforceConsistencyBoundary(String aggregateId, DomainEvent event) {
// Strong consistency within aggregate
// Eventual consistency across aggregates
if (isWithinAggregateBoundary(aggregateId, event)) {
// Synchronous update - strong consistency
updateAggregateState(aggregateId, event);
} else {
// Asynchronous update - eventual consistency
scheduleAsyncUpdate(event);
}
}
// Pattern 3: Compensation tracking
public void trackCompensation(String sagaId, CompensatingAction action) {
CompensationEvent compensationEvent = CompensationEvent.builder()
.sagaId(sagaId)
.action(action)
.status(CompensationStatus.PENDING)
.timestamp(Instant.now())
.build();
eventStore.appendEvent(sagaId, compensationEvent);
}
}
Temporal Queries and Event Replay
Time-Travel Query Implementation
@Service
public class TemporalQueryService {
// Query aggregate state at specific point in time
public <T> T getAggregateStateAt(String aggregateId,
Class<T> aggregateType,
Instant pointInTime) {
// Get all events up to the specified time
List<DomainEvent> historicalEvents = eventStore.getEventsUntil(
aggregateId, pointInTime);
T aggregate = createEmptyAggregate(aggregateType);
// Replay events in chronological order
for (DomainEvent event : historicalEvents) {
aggregate = applyEvent(aggregate, event);
}
return aggregate;
}
// Query state changes over time period
public <T> List<StateSnapshot<T>> getStateEvolution(
String aggregateId,
Class<T> aggregateType,
Instant fromTime,
Instant toTime,
Duration snapshotInterval) {
List<StateSnapshot<T>> evolution = new ArrayList<>();
Instant currentTime = fromTime;
while (!currentTime.isAfter(toTime)) {
T state = getAggregateStateAt(aggregateId, aggregateType, currentTime);
evolution.add(StateSnapshot.<T>builder()
.timestamp(currentTime)
.state(state)
.build());
currentTime = currentTime.plus(snapshotInterval);
}
return evolution;
}
// Audit query - who changed what when
public List<AuditRecord> getAuditTrail(String aggregateId,
Instant fromTime,
Instant toTime) {
List<DomainEvent> events = eventStore.getEventsBetween(
aggregateId, fromTime, toTime);
return events.stream()
.map(event -> AuditRecord.builder()
.aggregateId(aggregateId)
.eventType(event.getEventType())
.timestamp(event.getTimestamp())
.userId(event.getUserId())
.changes(extractChanges(event))
.build())
.collect(toList());
}
}
Event Replay and System Recovery
@Component
public class EventReplayService {
// Rebuild all projections from scratch
public CompletableFuture<Void> rebuildAllProjections() {
return CompletableFuture.runAsync(() -> {
log.info("Starting full projection rebuild");
// Clear all existing projections
projectionRepository.deleteAll();
// Create new consumer group for replay
String replayGroupId = "projection-rebuild-" + System.currentTimeMillis();
try (KafkaConsumer<String, DomainEvent> replayConsumer =
createReplayConsumer(replayGroupId)) {
// Seek to beginning of all partitions
List<TopicPartition> partitions = replayConsumer.partitionsFor("events")
.stream()
.map(info -> new TopicPartition("events", info.partition()))
.collect(toList());
replayConsumer.assign(partitions);
replayConsumer.seekToBeginning(partitions);
// Process all events
long processedEvents = 0;
boolean reachedEnd = false;
while (!reachedEnd) {
ConsumerRecords<String, DomainEvent> records =
replayConsumer.poll(Duration.ofSeconds(1));
if (records.isEmpty()) {
reachedEnd = true;
} else {
for (ConsumerRecord<String, DomainEvent> record : records) {
projectionBuilder.processEvent(record.value());
processedEvents++;
if (processedEvents % 10000 == 0) {
log.info("Processed {} events", processedEvents);
}
}
}
}
log.info("Projection rebuild completed. Processed {} events",
processedEvents);
}
});
}
// Selective replay for specific aggregates
public CompletableFuture<Void> replayAggregateProjections(
List<String> aggregateIds) {
return CompletableFuture.runAsync(() -> {
for (String aggregateId : aggregateIds) {
List<DomainEvent> events = eventStore.getAllEvents(aggregateId);
for (DomainEvent event : events) {
projectionBuilder.processEvent(event);
}
}
});
}
}
Schema Evolution and Versioning
Event Schema Evolution Strategies
public class EventSchemaEvolution {
// Strategy 1: Additive changes (backward compatible)
@JsonIgnoreProperties(ignoreUnknown = true) // Ignore new fields in old consumers
public class UserCreatedEventV2 extends UserCreatedEvent {
private String phoneNumber; // New optional field
private Instant lastLogin; // New optional field
// Old consumers can still process V2 events
// New consumers can process V1 and V2 events
}
// Strategy 2: Event upcasting for breaking changes
@Component
public class EventUpcastingService {
public DomainEvent upcastEvent(DomainEvent event) {
return switch (event.getEventType()) {
case "UserCreatedV1" -> upcastUserCreatedV1ToV2((UserCreatedEventV1) event);
case "EmailChangedV1" -> upcastEmailChangedV1ToV2((EmailChangedEventV1) event);
default -> event; // No upcasting needed
};
}
private UserCreatedEventV2 upcastUserCreatedV1ToV2(UserCreatedEventV1 oldEvent) {
return UserCreatedEventV2.builder()
.aggregateId(oldEvent.getAggregateId())
.email(oldEvent.getEmail())
.phoneNumber(null) // Default value for new field
.lastLogin(null) // Default value for new field
.timestamp(oldEvent.getTimestamp())
.userId(oldEvent.getUserId())
.aggregateVersion(oldEvent.getAggregateVersion())
.build();
}
}
// Strategy 3: Event versioning metadata
public abstract class VersionedDomainEvent {
@JsonProperty("eventVersion")
private final int eventVersion = getEventVersion();
public abstract int getEventVersion();
public boolean isCompatibleWith(int consumerVersion) {
return this.eventVersion <= consumerVersion;
}
}
}
Performance Optimization and Scaling
High-Performance Event Processing
@Component
public class PerformanceOptimizedProjectionBuilder {
private final ExecutorService projectionExecutor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
@KafkaListener(topics = "events",
concurrency = "8") // Parallel processing
public void processEventsConcurrently(List<DomainEvent> events) {
// Group events by aggregate to maintain ordering
Map<String, List<DomainEvent>> eventsByAggregate = events.stream()
.collect(groupingBy(DomainEvent::getAggregateId));
// Process each aggregate's events sequentially, aggregates in parallel
List<CompletableFuture<Void>> futures = eventsByAggregate.entrySet()
.stream()
.map(entry -> CompletableFuture.runAsync(
() -> processAggregateEvents(entry.getKey(), entry.getValue()),
projectionExecutor))
.collect(toList());
// Wait for all aggregates to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();
}
// Batch projection updates for performance
@Transactional
public void processAggregateEvents(String aggregateId, List<DomainEvent> events) {
// Load current projection state
UserView currentView = userViewRepository.findById(aggregateId)
.orElse(null);
// Apply all events in batch
UserView updatedView = currentView;
for (DomainEvent event : events) {
updatedView = applyEventToProjection(updatedView, event);
}
// Single database update for all events
if (updatedView != currentView) {
userViewRepository.save(updatedView);
}
}
}
This comprehensive Event Sourcing/CQRS knowledge chunk provides both strategic architectural guidance and detailed implementation patterns needed for L8_PRINCIPAL level discussions and complex system implementations. The content balances theoretical understanding with practical production patterns essential for enterprise-grade event-driven systems.
See Also: [[Kafka_Transactions]], [[Stream_Processing_Frameworks]], [[Microservices_Saga_Patterns]], [[Advanced_Database_Patterns]]