Skip to content

Kafka Series

Event Sourcing and CQRS with Kafka

Deep dive into using Kafka for event sourcing and CQRS patterns: event stores, projections, state rebuilding, and architectural patterns

Concepts Covered in This Article

Event Sourcing Fundamentals and Kafka Integration

Event Sourcing Core Principles

Event Sourcing stores all changes to application state as a sequence of events rather than storing current state directly. Kafka serves as the durable, scalable event store with natural partitioning and replication:

TRADITIONAL vs EVENT SOURCING:

TRADITIONAL (CRUD):                    EVENT SOURCING:
┌─────────────────┐                   ┌─────────────────┐
│   Current       │                   │   Event Store   │
│   State DB      │                   │   (Kafka)       │
│                 │                   │                 │
│ User: {         │                   │ UserCreated     │
│   id: 123       │                   │ EmailChanged    │
│   email: new@x  │                   │ ProfileUpdated  │
│   status: VIP   │                   │ StatusUpgraded  │
│ }               │                   │ ...             │
└─────────────────┘                   └─────────────────┘
         │                                     │
         ▼                                     ▼
┌─────────────────┐                   ┌─────────────────┐
│   UPDATE user   │                   │  Rebuild state  │
│   SET status=VIP│                   │  by replaying   │
│   WHERE id=123  │                   │  all events     │
└─────────────────┘                   └─────────────────┘

BENEFITS OF EVENT SOURCING:
├── Complete audit trail (what, when, who, why)
├── Temporal queries (state at any point in time)
├── Natural event-driven architecture
├── Replay and debugging capabilities
├── Multiple projections from same events
└── Append-only storage (performance + reliability)

Kafka as Event Store Architecture

public class KafkaEventStore {

    // Topic design for event sourcing
    void configureEventStoreTopic() {
        Properties topicConfig = new Properties();

        // Key partitioning strategy - aggregate ID based
        topicConfig.put("partitions", "12");  // Scale based on aggregate count
        topicConfig.put("replication.factor", "3");

        // Event store specific settings
        topicConfig.put("cleanup.policy", "compact,delete");     // Hybrid cleanup
        topicConfig.put("retention.ms", "31536000000");          // 1 year retention
        topicConfig.put("min.compaction.lag.ms", "86400000");    // 24h before compact

        // Ensure ordering and durability
        topicConfig.put("min.insync.replicas", "2");
        topicConfig.put("unclean.leader.election.enable", "false");
    }

    // Event schema design
    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "eventType")
    @JsonSubTypes({
        @JsonSubTypes.Type(value = UserCreated.class, name = "UserCreated"),
        @JsonSubTypes.Type(value = EmailChanged.class, name = "EmailChanged"),
        @JsonSubTypes.Type(value = StatusUpgraded.class, name = "StatusUpgraded")
    })
    public abstract class DomainEvent {
        private final String aggregateId;
        private final long aggregateVersion;
        private final Instant timestamp;
        private final String userId;      // Who made the change

        // Events are immutable value objects
        public abstract String getEventType();
    }

    // Append events to Kafka
    public CompletableFuture<Long> appendEvents(String aggregateId,
                                               List<DomainEvent> events,
                                               long expectedVersion) {
        return CompletableFuture.supplyAsync(() -> {
            producer.beginTransaction();

            try {
                long currentVersion = expectedVersion;

                for (DomainEvent event : events) {
                    // Ensure event versioning for optimistic concurrency
                    event.setAggregateVersion(++currentVersion);

                    ProducerRecord<String, DomainEvent> record =
                        new ProducerRecord<>("events", aggregateId, event);

                    producer.send(record);
                }

                producer.commitTransaction();
                return currentVersion;

            } catch (Exception e) {
                producer.abortTransaction();
                throw new ConcurrencyException("Failed to append events", e);
            }
        });
    }
}

CQRS Architecture Patterns with Kafka

Command and Query Separation

CQRS separates write models (commands) from read models (queries), using Kafka as the event backbone between them:

CQRS WITH KAFKA ARCHITECTURE:

WRITE SIDE (Commands)                    EVENT BACKBONE                 READ SIDE (Queries)
┌─────────────────────┐                 ┌─────────────────┐            ┌─────────────────┐
│   Command API       │                 │                 │            │   Query API     │
│ ┌─────────────────┐ │                 │     KAFKA       │            │ ┌─────────────┐ │
│ │CreateUser       │ │                 │                 │            │ │GetUser      │ │
│ │UpdateEmail      │ │──────events────▶│   Topic: events │───────────▶│ │SearchUsers  │ │
│ │UpgradeStatus    │ │                 │                 │            │ │GetUserStats │ │
│ └─────────────────┘ │                 │ ┌─────────────┐ │            │ └─────────────┘ │
│                     │                 │ │Partition 0  │ │            │                 │
│ ┌─────────────────┐ │                 │ │Partition 1  │ │            │ ┌─────────────┐ │
│ │  Write Model    │ │                 │ │Partition 2  │ │            │ │ Read Model  │ │
│ │  (Domain)       │ │                 │ └─────────────┘ │            │ │(Projections)│ │
│ │ - User          │ │                 │                 │            │ │- UserView   │ │
│ │ - Account       │ │                 │  Replication    │            │ │- StatsView  │ │
│ │ - Order         │ │                 │  Partitioning   │            │ │- SearchIdx  │ │
│ └─────────────────┘ │                 │  Durability     │            │ └─────────────┘ │
└─────────────────────┘                 └─────────────────┘            └─────────────────┘

Command Handler Implementation

@Component
public class UserCommandHandler {
    private final KafkaEventStore eventStore;
    private final UserRepository userRepository; // Write-side repository

    @CommandHandler
    public CompletableFuture<Void> handle(CreateUserCommand command) {
        return CompletableFuture.runAsync(() -> {
            // Load current state (if exists)
            User user = userRepository.findById(command.getUserId())
                .orElse(null);

            if (user != null) {
                throw new UserAlreadyExistsException(command.getUserId());
            }

            // Create domain events
            UserCreatedEvent event = UserCreatedEvent.builder()
                .aggregateId(command.getUserId())
                .email(command.getEmail())
                .timestamp(Instant.now())
                .userId(command.getRequesterId())
                .build();

            // Persist events
            eventStore.appendEvents(
                command.getUserId(),
                List.of(event),
                0 // Expected version for new aggregate
            ).join();

            // Update write-side state (optional - can be event-driven)
            User newUser = User.builder()
                .id(command.getUserId())
                .email(command.getEmail())
                .version(1)
                .build();
            userRepository.save(newUser);
        });
    }

    @CommandHandler
    public CompletableFuture<Void> handle(UpdateEmailCommand command) {
        return CompletableFuture.runAsync(() -> {
            // Load current state
            User user = userRepository.findById(command.getUserId())
                .orElseThrow(() -> new UserNotFoundException(command.getUserId()));

            // Business logic validation
            if (user.getEmail().equals(command.getNewEmail())) {
                return; // No change needed
            }

            // Generate domain event
            EmailChangedEvent event = EmailChangedEvent.builder()
                .aggregateId(command.getUserId())
                .oldEmail(user.getEmail())
                .newEmail(command.getNewEmail())
                .timestamp(Instant.now())
                .userId(command.getRequesterId())
                .build();

            // Optimistic concurrency control
            try {
                eventStore.appendEvents(
                    command.getUserId(),
                    List.of(event),
                    user.getVersion() // Expected version
                ).join();

                // Update write-side state
                user.setEmail(command.getNewEmail());
                user.setVersion(user.getVersion() + 1);
                userRepository.save(user);

            } catch (ConcurrencyException e) {
                throw new OptimisticLockingException("User was modified concurrently");
            }
        });
    }
}

Event Store Design and Optimization

Partitioning Strategies for Event Stores

public class EventStorePartitioning {

    // Strategy 1: Aggregate-based partitioning (recommended)
    void configureAggregatePartitioning() {
        // Key = aggregateId (e.g., user_123, order_456)
        // Ensures all events for an aggregate in same partition
        // Guarantees ordering within aggregate
        // Enables efficient aggregate reconstruction

        String partitionKey = event.getAggregateId();

        ProducerRecord<String, DomainEvent> record =
            new ProducerRecord<>("events", partitionKey, event);
    }

    // Strategy 2: Tenant-based partitioning (multi-tenant systems)
    void configureTenantPartitioning() {
        // Key = tenantId:aggregateId
        // Isolates tenant data
        // Enables tenant-specific scaling
        // Supports data locality requirements

        String partitionKey = event.getTenantId() + ":" + event.getAggregateId();

        ProducerRecord<String, DomainEvent> record =
            new ProducerRecord<>("events", partitionKey, event);
    }

    // Strategy 3: Time-based partitioning (for massive scale)
    void configureTimeBasedPartitioning() {
        // Key = aggregateId, but route by time
        // Enables time-based compaction and archival
        // Better for systems with billions of events

        CustomPartitioner timePartitioner = new TimeBasedPartitioner();

        Properties producerConfig = new Properties();
        producerConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
                          timePartitioner.getClass());
    }
}

Event Store Performance Optimization

public class EventStoreOptimization {

    // Batch event appending for performance
    public CompletableFuture<Void> appendEventsBatch(
            Map<String, List<DomainEvent>> eventsByAggregate) {

        return CompletableFuture.runAsync(() -> {
            producer.beginTransaction();

            try {
                // Send all events in single transaction
                for (Map.Entry<String, List<DomainEvent>> entry :
                     eventsByAggregate.entrySet()) {

                    String aggregateId = entry.getKey();
                    List<DomainEvent> events = entry.getValue();

                    for (DomainEvent event : events) {
                        ProducerRecord<String, DomainEvent> record =
                            new ProducerRecord<>("events", aggregateId, event);
                        producer.send(record);
                    }
                }

                producer.commitTransaction();

            } catch (Exception e) {
                producer.abortTransaction();
                throw e;
            }
        });
    }

    // Snapshot optimization for large aggregates
    @Component
    public class SnapshotManager {

        public void createSnapshot(String aggregateId, Object aggregateState, long version) {
            SnapshotEvent snapshot = SnapshotEvent.builder()
                .aggregateId(aggregateId)
                .aggregateType(aggregateState.getClass().getSimpleName())
                .aggregateState(serialize(aggregateState))
                .snapshotVersion(version)
                .timestamp(Instant.now())
                .build();

            // Store snapshot in separate topic with compaction
            ProducerRecord<String, SnapshotEvent> record =
                new ProducerRecord<>("snapshots", aggregateId, snapshot);
            snapshotProducer.send(record);
        }

        public <T> Optional<AggregateSnapshot<T>> getLatestSnapshot(
                String aggregateId, Class<T> aggregateType) {

            // Read from compacted snapshots topic
            // Latest snapshot automatically available due to log compaction
            return snapshotRepository.findLatestSnapshot(aggregateId, aggregateType);
        }
    }

    // Optimized aggregate reconstruction
    public <T> T rebuildAggregate(String aggregateId, Class<T> aggregateType) {
        // Step 1: Try to load latest snapshot
        Optional<AggregateSnapshot<T>> snapshot =
            snapshotManager.getLatestSnapshot(aggregateId, aggregateType);

        T aggregate;
        long fromVersion;

        if (snapshot.isPresent()) {
            aggregate = snapshot.get().getAggregateState();
            fromVersion = snapshot.get().getVersion();
        } else {
            aggregate = createEmptyAggregate(aggregateType);
            fromVersion = 0;
        }

        // Step 2: Load and apply events since snapshot
        List<DomainEvent> eventsSinceSnapshot =
            eventStore.getEventsSince(aggregateId, fromVersion);

        for (DomainEvent event : eventsSinceSnapshot) {
            aggregate = applyEvent(aggregate, event);
        }

        return aggregate;
    }
}

Projection Patterns and View Materialization

Projection Builder Implementation

@Component
public class UserProjectionBuilder {

    @KafkaListener(topics = "events", groupId = "user-view-builder")
    public void handleUserEvent(DomainEvent event) {
        switch (event.getEventType()) {
            case "UserCreated" -> handleUserCreated((UserCreatedEvent) event);
            case "EmailChanged" -> handleEmailChanged((EmailChangedEvent) event);
            case "StatusUpgraded" -> handleStatusUpgraded((StatusUpgradedEvent) event);
            default -> log.warn("Unknown event type: {}", event.getEventType());
        }
    }

    private void handleUserCreated(UserCreatedEvent event) {
        UserView userView = UserView.builder()
            .id(event.getAggregateId())
            .email(event.getEmail())
            .status(UserStatus.ACTIVE)
            .createdAt(event.getTimestamp())
            .lastModified(event.getTimestamp())
            .version(event.getAggregateVersion())
            .build();

        userViewRepository.save(userView);

        // Update search index
        searchIndexer.indexUser(userView);

        // Update statistics
        statisticsService.incrementUserCount();
    }

    private void handleEmailChanged(EmailChangedEvent event) {
        UserView userView = userViewRepository.findById(event.getAggregateId())
            .orElseThrow(() -> new ProjectionInconsistencyException(
                "User view not found for email change: " + event.getAggregateId()));

        // Idempotency check
        if (userView.getVersion() >= event.getAggregateVersion()) {
            log.info("Event already processed: {}", event);
            return;
        }

        userView.setEmail(event.getNewEmail());
        userView.setLastModified(event.getTimestamp());
        userView.setVersion(event.getAggregateVersion());

        userViewRepository.save(userView);

        // Update search index
        searchIndexer.updateUser(userView);
    }
}

Multiple Projection Strategies

public class MultipleProjectionManager {

    // Strategy 1: Dedicated projection builders per view
    @Component
    public class UserSearchProjectionBuilder {

        @KafkaListener(topics = "events", groupId = "user-search-builder")
        public void buildSearchProjection(DomainEvent event) {
            // Build search-optimized view
            // - Denormalized for fast queries
            // - Indexed for full-text search
            // - Includes computed fields
        }
    }

    @Component
    public class UserAnalyticsProjectionBuilder {

        @KafkaListener(topics = "events", groupId = "user-analytics-builder")
        public void buildAnalyticsProjection(DomainEvent event) {
            // Build analytics-optimized view
            // - Aggregated statistics
            // - Time-series data
            // - Pre-computed metrics
        }
    }

    // Strategy 2: Single builder with multiple outputs
    @Component
    public class MultiViewProjectionBuilder {

        @KafkaListener(topics = "events", groupId = "multi-view-builder")
        @Transactional
        public void buildMultipleViews(DomainEvent event) {
            // Update all projections in single transaction
            updateUserDetailsView(event);
            updateUserSearchView(event);
            updateUserAnalyticsView(event);
            updateUserAuditView(event);
        }
    }

    // Strategy 3: Event-driven projection chaining
    void configureProjectionChain() {
        // Primary projection publishes to derived topic
        // Secondary projections consume from derived topic
        // Enables complex projection dependencies

        // events → user-details-view → user-search-events → search-index
        //       └─→ user-analytics-view → analytics-events → dashboards
    }
}

Consistency and Transaction Patterns

Saga Pattern with Event Sourcing

public class OrderProcessingSaga {

    @SagaOrchestrationStart
    @EventHandler
    public void handle(OrderPlacedEvent event) {
        SagaState sagaState = SagaState.builder()
            .sagaId(UUID.randomUUID().toString())
            .orderId(event.getOrderId())
            .userId(event.getUserId())
            .amount(event.getAmount())
            .status(SagaStatus.STARTED)
            .build();

        // Step 1: Reserve inventory
        ReserveInventoryCommand reserveCommand = ReserveInventoryCommand.builder()
            .sagaId(sagaState.getSagaId())
            .orderId(event.getOrderId())
            .items(event.getItems())
            .build();

        commandGateway.send(reserveCommand);

        // Store saga state as event
        SagaStartedEvent sagaEvent = SagaStartedEvent.builder()
            .sagaId(sagaState.getSagaId())
            .orderId(event.getOrderId())
            .step("INVENTORY_RESERVATION")
            .build();

        eventStore.appendEvent(sagaState.getSagaId(), sagaEvent);
    }

    @EventHandler
    public void handle(InventoryReservedEvent event) {
        // Step 2: Process payment
        ProcessPaymentCommand paymentCommand = ProcessPaymentCommand.builder()
            .sagaId(event.getSagaId())
            .orderId(event.getOrderId())
            .amount(event.getAmount())
            .build();

        commandGateway.send(paymentCommand);
    }

    @EventHandler
    public void handle(PaymentProcessedEvent event) {
        // Step 3: Confirm order
        ConfirmOrderCommand confirmCommand = ConfirmOrderCommand.builder()
            .sagaId(event.getSagaId())
            .orderId(event.getOrderId())
            .build();

        commandGateway.send(confirmCommand);
    }

    @EventHandler
    public void handle(PaymentFailedEvent event) {
        // Compensating transaction: Release inventory
        ReleaseInventoryCommand compensateCommand = ReleaseInventoryCommand.builder()
            .sagaId(event.getSagaId())
            .orderId(event.getOrderId())
            .reason("Payment failed")
            .build();

        commandGateway.send(compensateCommand);

        // Mark saga as failed
        SagaFailedEvent sagaFailedEvent = SagaFailedEvent.builder()
            .sagaId(event.getSagaId())
            .reason("Payment processing failed")
            .build();

        eventStore.appendEvent(event.getSagaId(), sagaFailedEvent);
    }
}

Eventual Consistency Management

@Component
public class ConsistencyManager {

    // Pattern 1: Version-based consistency
    public void handleEventualConsistency(DomainEvent event) {
        try {
            updateProjection(event);
        } catch (ProjectionUpdateException e) {
            // Retry with exponential backoff
            scheduleRetry(event, calculateBackoff(e.getAttemptCount()));
        }
    }

    // Pattern 2: Consistency boundary enforcement
    public void enforceConsistencyBoundary(String aggregateId, DomainEvent event) {
        // Strong consistency within aggregate
        // Eventual consistency across aggregates

        if (isWithinAggregateBoundary(aggregateId, event)) {
            // Synchronous update - strong consistency
            updateAggregateState(aggregateId, event);
        } else {
            // Asynchronous update - eventual consistency
            scheduleAsyncUpdate(event);
        }
    }

    // Pattern 3: Compensation tracking
    public void trackCompensation(String sagaId, CompensatingAction action) {
        CompensationEvent compensationEvent = CompensationEvent.builder()
            .sagaId(sagaId)
            .action(action)
            .status(CompensationStatus.PENDING)
            .timestamp(Instant.now())
            .build();

        eventStore.appendEvent(sagaId, compensationEvent);
    }
}

Temporal Queries and Event Replay

Time-Travel Query Implementation

@Service
public class TemporalQueryService {

    // Query aggregate state at specific point in time
    public <T> T getAggregateStateAt(String aggregateId,
                                    Class<T> aggregateType,
                                    Instant pointInTime) {

        // Get all events up to the specified time
        List<DomainEvent> historicalEvents = eventStore.getEventsUntil(
            aggregateId, pointInTime);

        T aggregate = createEmptyAggregate(aggregateType);

        // Replay events in chronological order
        for (DomainEvent event : historicalEvents) {
            aggregate = applyEvent(aggregate, event);
        }

        return aggregate;
    }

    // Query state changes over time period
    public <T> List<StateSnapshot<T>> getStateEvolution(
            String aggregateId,
            Class<T> aggregateType,
            Instant fromTime,
            Instant toTime,
            Duration snapshotInterval) {

        List<StateSnapshot<T>> evolution = new ArrayList<>();

        Instant currentTime = fromTime;
        while (!currentTime.isAfter(toTime)) {
            T state = getAggregateStateAt(aggregateId, aggregateType, currentTime);

            evolution.add(StateSnapshot.<T>builder()
                .timestamp(currentTime)
                .state(state)
                .build());

            currentTime = currentTime.plus(snapshotInterval);
        }

        return evolution;
    }

    // Audit query - who changed what when
    public List<AuditRecord> getAuditTrail(String aggregateId,
                                          Instant fromTime,
                                          Instant toTime) {

        List<DomainEvent> events = eventStore.getEventsBetween(
            aggregateId, fromTime, toTime);

        return events.stream()
            .map(event -> AuditRecord.builder()
                .aggregateId(aggregateId)
                .eventType(event.getEventType())
                .timestamp(event.getTimestamp())
                .userId(event.getUserId())
                .changes(extractChanges(event))
                .build())
            .collect(toList());
    }
}

Event Replay and System Recovery

@Component
public class EventReplayService {

    // Rebuild all projections from scratch
    public CompletableFuture<Void> rebuildAllProjections() {
        return CompletableFuture.runAsync(() -> {
            log.info("Starting full projection rebuild");

            // Clear all existing projections
            projectionRepository.deleteAll();

            // Create new consumer group for replay
            String replayGroupId = "projection-rebuild-" + System.currentTimeMillis();

            try (KafkaConsumer<String, DomainEvent> replayConsumer =
                 createReplayConsumer(replayGroupId)) {

                // Seek to beginning of all partitions
                List<TopicPartition> partitions = replayConsumer.partitionsFor("events")
                    .stream()
                    .map(info -> new TopicPartition("events", info.partition()))
                    .collect(toList());

                replayConsumer.assign(partitions);
                replayConsumer.seekToBeginning(partitions);

                // Process all events
                long processedEvents = 0;
                boolean reachedEnd = false;

                while (!reachedEnd) {
                    ConsumerRecords<String, DomainEvent> records =
                        replayConsumer.poll(Duration.ofSeconds(1));

                    if (records.isEmpty()) {
                        reachedEnd = true;
                    } else {
                        for (ConsumerRecord<String, DomainEvent> record : records) {
                            projectionBuilder.processEvent(record.value());
                            processedEvents++;

                            if (processedEvents % 10000 == 0) {
                                log.info("Processed {} events", processedEvents);
                            }
                        }
                    }
                }

                log.info("Projection rebuild completed. Processed {} events",
                        processedEvents);
            }
        });
    }

    // Selective replay for specific aggregates
    public CompletableFuture<Void> replayAggregateProjections(
            List<String> aggregateIds) {

        return CompletableFuture.runAsync(() -> {
            for (String aggregateId : aggregateIds) {
                List<DomainEvent> events = eventStore.getAllEvents(aggregateId);

                for (DomainEvent event : events) {
                    projectionBuilder.processEvent(event);
                }
            }
        });
    }
}

Schema Evolution and Versioning

Event Schema Evolution Strategies

public class EventSchemaEvolution {

    // Strategy 1: Additive changes (backward compatible)
    @JsonIgnoreProperties(ignoreUnknown = true)  // Ignore new fields in old consumers
    public class UserCreatedEventV2 extends UserCreatedEvent {
        private String phoneNumber;     // New optional field
        private Instant lastLogin;      // New optional field

        // Old consumers can still process V2 events
        // New consumers can process V1 and V2 events
    }

    // Strategy 2: Event upcasting for breaking changes
    @Component
    public class EventUpcastingService {

        public DomainEvent upcastEvent(DomainEvent event) {
            return switch (event.getEventType()) {
                case "UserCreatedV1" -> upcastUserCreatedV1ToV2((UserCreatedEventV1) event);
                case "EmailChangedV1" -> upcastEmailChangedV1ToV2((EmailChangedEventV1) event);
                default -> event; // No upcasting needed
            };
        }

        private UserCreatedEventV2 upcastUserCreatedV1ToV2(UserCreatedEventV1 oldEvent) {
            return UserCreatedEventV2.builder()
                .aggregateId(oldEvent.getAggregateId())
                .email(oldEvent.getEmail())
                .phoneNumber(null)          // Default value for new field
                .lastLogin(null)            // Default value for new field
                .timestamp(oldEvent.getTimestamp())
                .userId(oldEvent.getUserId())
                .aggregateVersion(oldEvent.getAggregateVersion())
                .build();
        }
    }

    // Strategy 3: Event versioning metadata
    public abstract class VersionedDomainEvent {
        @JsonProperty("eventVersion")
        private final int eventVersion = getEventVersion();

        public abstract int getEventVersion();

        public boolean isCompatibleWith(int consumerVersion) {
            return this.eventVersion <= consumerVersion;
        }
    }
}

Performance Optimization and Scaling

High-Performance Event Processing

@Component
public class PerformanceOptimizedProjectionBuilder {

    private final ExecutorService projectionExecutor =
        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    @KafkaListener(topics = "events",
                  concurrency = "8")  // Parallel processing
    public void processEventsConcurrently(List<DomainEvent> events) {

        // Group events by aggregate to maintain ordering
        Map<String, List<DomainEvent>> eventsByAggregate = events.stream()
            .collect(groupingBy(DomainEvent::getAggregateId));

        // Process each aggregate's events sequentially, aggregates in parallel
        List<CompletableFuture<Void>> futures = eventsByAggregate.entrySet()
            .stream()
            .map(entry -> CompletableFuture.runAsync(
                () -> processAggregateEvents(entry.getKey(), entry.getValue()),
                projectionExecutor))
            .collect(toList());

        // Wait for all aggregates to complete
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .join();
    }

    // Batch projection updates for performance
    @Transactional
    public void processAggregateEvents(String aggregateId, List<DomainEvent> events) {
        // Load current projection state
        UserView currentView = userViewRepository.findById(aggregateId)
            .orElse(null);

        // Apply all events in batch
        UserView updatedView = currentView;
        for (DomainEvent event : events) {
            updatedView = applyEventToProjection(updatedView, event);
        }

        // Single database update for all events
        if (updatedView != currentView) {
            userViewRepository.save(updatedView);
        }
    }
}

This comprehensive Event Sourcing/CQRS knowledge chunk provides both strategic architectural guidance and detailed implementation patterns needed for L8_PRINCIPAL level discussions and complex system implementations. The content balances theoretical understanding with practical production patterns essential for enterprise-grade event-driven systems.

See Also: [[Kafka_Transactions]], [[Stream_Processing_Frameworks]], [[Microservices_Saga_Patterns]], [[Advanced_Database_Patterns]]