Retention and Log Compaction
Deep dive into time-based and size-based retention policies, log compaction mechanics, and production configuration strategies
Concepts Covered in This Article
Retention Policies and Mechanisms
Time-Based Retention Deep Dive
Kafka’s time-based retention operates at the segment level using file modification timestamps:
RETENTION TIMELINE:
Segment Creation ──▶ Active Writing ──▶ Sealed Segment ──▶ Retention Check ──▶ Deletion
│ │ │ │ │
t=0 t=segment.ms t=sealed t=retention.ms t=cleanup
│ │ │ │ │
New segment Segment full Stop writing Eligible for Delete
created or time limit Start new cleanup based segment
segment on file mtime files
Advanced Time-Based Configuration:
# Production retention configuration
log.retention.ms=604800000 # 7 days (overrides hours/minutes)
log.retention.check.interval.ms=300000 # Check every 5 minutes
log.segment.ms=86400000 # New segment daily (24 hours)
# Retention vs segment interaction
# Retention = 7 days, Segment = 1 day
# Result: 7 segments retained, each representing 1 day of data
Size-Based Retention Implementation
public class SizeBasedRetentionCalculator {
public long calculateRetentionBytes(
long dailyIngressGB,
int retentionDays,
int replicationFactor,
double compressionRatio) {
// Base calculation
long dailyBytes = dailyIngressGB * 1024L * 1024L * 1024L;
long totalUncompressed = dailyBytes * retentionDays;
long totalCompressed = (long)(totalUncompressed / compressionRatio);
long totalWithReplication = totalCompressed * replicationFactor;
// Add safety margin
long retentionBytes = (long)(totalWithReplication * 1.2); // 20% buffer
return retentionBytes;
}
void exampleCalculation() {
// Example: Financial transaction logging
long retentionBytes = calculateRetentionBytes(
100, // 100 GB/day ingress
30, // 30 days retention
3, // 3x replication
2.5 // 2.5:1 compression ratio
);
// Result:
// 100GB * 30 days = 3TB raw
// 3TB / 2.5 compression = 1.2TB compressed
// 1.2TB * 3 replication = 3.6TB total
// 3.6TB * 1.2 safety = 4.32TB retention.bytes
}
}
Policy Combination Strategies
Multi-Policy Retention Configuration:
# Topic with both time and size limits
log.retention.ms=2592000000 # 30 days
log.retention.bytes=1073741824000 # 1TB per partition
# Cleanup triggers when EITHER condition met:
# - Data older than 30 days, OR
# - Partition size exceeds 1TB
# Use cases:
# - Burst traffic protection (size limit)
# - Compliance requirements (time limit)
# - Storage cost control (size limit)
Production Retention Patterns:
public class RetentionPatternCatalog {
// Pattern 1: High-value transactional data
void configureFinancialRetention() {
Properties config = new Properties();
config.put("log.retention.ms", "31536000000"); // 365 days
config.put("log.retention.bytes", "-1"); // No size limit
config.put("log.segment.ms", "86400000"); // Daily segments
config.put("min.insync.replicas", "3"); // High durability
}
// Pattern 2: High-volume logs with cost control
void configureLogRetention() {
Properties config = new Properties();
config.put("log.retention.ms", "604800000"); // 7 days
config.put("log.retention.bytes", "107374182400"); // 100GB per partition
config.put("log.segment.ms", "3600000"); // Hourly segments
config.put("compression.type", "lz4"); // Reduce storage
}
// Pattern 3: Real-time metrics (short retention)
void configureMetricsRetention() {
Properties config = new Properties();
config.put("log.retention.ms", "86400000"); // 1 day
config.put("log.retention.bytes", "10737418240"); // 10GB per partition
config.put("log.segment.ms", "300000"); // 5-minute segments
}
}
Log Compaction Deep Dive
Compaction Algorithm and Process Flow
Log compaction maintains the latest value for each key while removing outdated records:
COMPACTION PROCESS:
Original Log:
[key1:v1][key2:v1][key1:v2][key3:v1][key2:v2][key1:v3]
│ │ │ │ │ │
t1 t2 t3 t4 t5 t6
After Compaction:
[key2:v2][key3:v1][key1:v3]
│ │ │
t5 t4 t6
Only latest value per key retained
Log Cleaner Architecture:
LOG CLEANER INTERNAL FLOW:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Dirty Logs │───▶│ Log Cleaner │───▶│ Clean Logs │
│ (uncomp) │ │ │ │ (compacted) │
│ │ │ ┌─────────────┐ │ │ │
│ - Head segment │ │ │ Key Index │ │ │ - Tail segments │
│ - Tail segments │ │ │ (OffsetMap) │ │ │ - Latest values │
└─────────────────┘ │ └─────────────┘ │ └─────────────────┘
│ │
│ ┌─────────────┐ │
│ │Thread Pool │ │
│ │(configurable│ │
│ │ parallelism)│ │
│ └─────────────┘ │
└─────────────────┘
Advanced Compaction Configuration
Production Compaction Tuning:
# Log cleaner configuration
log.cleanup.policy=compact
log.cleaner.threads=8 # Parallel cleaner threads
log.cleaner.io.max.bytes.per.second=1048576000 # 1GB/s I/O throttling
log.cleaner.dedupe.buffer.size=134217728 # 128MB dedup buffer
# Compaction triggers
log.cleaner.min.compaction.lag.ms=60000 # Wait 1 min before compact
log.cleaner.max.compaction.lag.ms=86400000 # Force compact within 24h
log.segment.ms=86400000 # 24h segments
log.cleaner.min.cleanable.ratio=0.5 # Compact when 50% dirty
# Advanced settings
log.cleaner.io.buffer.size=524288 # 512KB I/O buffer
log.cleaner.io.buffer.load.factor=0.9 # Buffer utilization
log.cleaner.backoff.ms=15000 # Backoff between runs
Compaction Performance Analysis:
public class CompactionPerformanceMonitor {
public CompactionMetrics analyzeCompactionEffectiveness() {
return CompactionMetrics.builder()
.dirtyRatio(calculateDirtyRatio())
.compactionRate(getCompactionRate())
.keySpaceReduction(getKeySpaceReduction())
.storageReclaimed(getStorageReclaimed())
.ioImpact(getIOImpact())
.build();
}
private double calculateDirtyRatio() {
// dirty.ratio = dirty_bytes / (clean_bytes + dirty_bytes)
// Optimal range: 0.3 - 0.7 for balanced performance
long dirtyBytes = getDirtyLogSize();
long cleanBytes = getCleanLogSize();
return (double) dirtyBytes / (dirtyBytes + cleanBytes);
}
void optimizeCompactionScheduling() {
double dirtyRatio = calculateDirtyRatio();
if (dirtyRatio > 0.8) {
// High dirty ratio - increase compaction frequency
updateConfig("log.cleaner.min.compaction.lag.ms", "30000"); // 30s
updateConfig("log.cleaner.threads", "12"); // More threads
} else if (dirtyRatio < 0.2) {
// Low dirty ratio - reduce compaction frequency
updateConfig("log.cleaner.min.compaction.lag.ms", "300000"); // 5min
updateConfig("log.cleaner.threads", "4"); // Fewer threads
}
}
}
Key-Based Compaction Strategies
Effective Key Design Patterns:
public class CompactionKeyStrategies {
// Pattern 1: Entity state management
void entityStateKeys() {
// Key: user_id
// Value: Complete user state (JSON/Avro)
// Result: Latest state per user maintained
String key = "user_123";
UserState value = UserState.builder()
.userId("user_123")
.email("user@example.com")
.lastLogin(Instant.now())
.build();
}
// Pattern 2: Configuration management
void configurationKeys() {
// Key: config_category:setting_name
// Value: Setting value
// Result: Latest config per setting
String key = "database:connection_timeout";
String value = "30000";
}
// Pattern 3: State machine events (anti-pattern)
void stateMachineAntiPattern() {
// WRONG: State transition events with timestamp keys
// Key: user_123:timestamp ← Each event has unique key
// Result: No compaction benefit, all events retained
// CORRECT: Current state with entity key
// Key: user_123 ← Same key for all state updates
// Result: Only latest state retained
}
}
Segment Management and Cleanup
Segment Lifecycle Management
Segment Creation and Rollover:
SEGMENT LIFECYCLE:
Active Segment Sealed Segment Eligible for Cleanup
│ │ │
▼ ▼ ▼
┌──────────┐ Rollover ┌──────────────┐ Time/Size ┌──────────────┐
│.log │ Triggers: │.log (sealed) │ Triggers: │.log (old) │
│.index │ - Size limit │.index │ - retention │.index │
│.timeindex│ - Time limit │.timeindex │ - compaction │.timeindex │
└──────────┘ - Force roll └──────────────┘ └──────────────┘
Production Segment Configuration:
# Segment size management
log.segment.bytes=1073741824 # 1GB segments (balance between size & granularity)
log.index.size.max.bytes=10485760 # 10MB index files
log.index.interval.bytes=4096 # Index entry every 4KB
# Segment time management
log.segment.ms=86400000 # 24h segments (daily rollover)
log.roll.jitter.ms=3600000 # 1h jitter (spread rollover load)
# Segment cleanup optimization
log.retention.check.interval.ms=300000 # Check every 5 minutes
log.segment.delete.delay.ms=60000 # 1 minute delay before deletion
File System Layout and Optimization
Kafka Directory Structure:
# Optimal file system layout
/var/kafka-logs/
├── topic-partition-0/
│ ├── 00000000000000000000.log # Segment 0 (oldest)
│ ├── 00000000000000000000.index # Offset index
│ ├── 00000000000000000000.timeindex # Time index
│ ├── 00000000000000100000.log # Segment 1
│ ├── 00000000000000100000.index
│ ├── 00000000000000100000.timeindex
│ ├── 00000000000000200000.log # Segment 2 (active)
│ ├── 00000000000000200000.index
│ ├── 00000000000000200000.timeindex
│ └── leader-epoch-checkpoint # Leadership changes
File System Optimization:
# Production file system tuning
# Mount options for Kafka log directories
mount -o noatime,nodiratime /dev/sdb1 /var/kafka-logs
# File system selection
# XFS: Recommended for large files, better performance
# EXT4: Acceptable alternative, wider compatibility
# I/O scheduler optimization
echo deadline > /sys/block/sdb/queue/scheduler # Better for sequential I/O
# File descriptor limits
echo "kafka soft nofile 100000" >> /etc/security/limits.conf
echo "kafka hard nofile 100000" >> /etc/security/limits.conf
Cleanup Process Optimization
public class SegmentCleanupOptimizer {
public void optimizeCleanupScheduling() {
// Cleanup process phases:
// 1. Identify segments eligible for deletion
// 2. Mark segments for deletion (rename to .deleted)
// 3. Asynchronous deletion after delay
Properties config = new Properties();
// Balance cleanup frequency vs I/O impact
config.put("log.retention.check.interval.ms", "300000"); // 5 minutes
config.put("log.segment.delete.delay.ms", "60000"); // 1 minute delay
// Spread cleanup load across time
config.put("log.roll.jitter.ms", "3600000"); // 1 hour jitter
}
public void monitorCleanupHealth() {
// Key metrics to track:
long pendingDeletes = countPendingDeletes();
double deletionRate = calculateDeletionRate();
long diskSpaceReclaimed = getDiskSpaceReclaimed();
// Alert conditions:
if (pendingDeletes > 1000) {
alert("High number of pending segment deletes");
}
if (deletionRate < expectedDeletionRate * 0.5) {
alert("Segment deletion falling behind retention schedule");
}
}
}
Capacity Planning and Cost Optimization
Storage Capacity Modeling
public class StorageCapacityPlanner {
public StorageRequirements calculateStorageNeeds(
TopicProfile profile,
RetentionPolicy retention,
ClusterConfig cluster) {
// Base storage calculation
long dailyBytes = profile.messagesPerDay * profile.avgMessageSizeBytes;
long retentionBytes = dailyBytes * retention.retentionDays;
// Apply compression
long compressedBytes = (long)(retentionBytes / profile.compressionRatio);
// Apply replication
long replicatedBytes = compressedBytes * cluster.replicationFactor;
// Add overhead (indexes, metadata)
double overheadRatio = 0.15; // 15% overhead typical
long totalBytes = (long)(replicatedBytes * (1 + overheadRatio));
// Add growth and safety margins
long safetyMargin = (long)(totalBytes * 0.25); // 25% safety
long growthProjection = calculateGrowthProjection(totalBytes, 12); // 12 months
return StorageRequirements.builder()
.baseRequirement(totalBytes)
.withSafetyMargin(totalBytes + safetyMargin)
.withGrowthProjection(totalBytes + safetyMargin + growthProjection)
.build();
}
public void optimizeStorageCosts() {
// Cost optimization strategies:
// 1. Right-size retention policies
optimizeRetentionPolicies();
// 2. Implement tiered storage
configureTieredStorage();
// 3. Optimize compression
tuneCompressionSettings();
// 4. Monitor and alert on storage growth
setupStorageAlerting();
}
private void optimizeRetentionPolicies() {
// Analyze actual data access patterns
Map<String, AccessPattern> accessPatterns = analyzeAccessPatterns();
for (String topic : accessPatterns.keySet()) {
AccessPattern pattern = accessPatterns.get(topic);
if (pattern.getLastAccessDays() < 7 && pattern.getCurrentRetentionDays() > 30) {
// Reduce retention for rarely accessed topics
recommendRetentionChange(topic, 7);
}
}
}
}
Cost Model and Analysis
Storage Cost Breakdown:
public class StorageCostAnalyzer {
public CostBreakdown analyzeMonthlyCosts(
long totalStorageGB,
StorageTier tier,
Region region) {
// Base storage costs (example AWS pricing)
double gpSSDCostPerGB = 0.10; // gp3 SSD
double ioPSSDCostPerGB = 0.125; // io2 SSD
double s3CostPerGB = 0.023; // S3 Standard
double monthlyCost = switch(tier) {
case HOT_SSD -> totalStorageGB * gpSSDCostPerGB;
case PERFORMANCE_SSD -> totalStorageGB * ioPSSDCostPerGB;
case COLD_S3 -> totalStorageGB * s3CostPerGB;
};
// Add I/O costs
double ioCosts = calculateIOCosts(totalStorageGB, tier);
// Add network costs for replication
double networkCosts = calculateNetworkCosts(totalStorageGB);
return CostBreakdown.builder()
.storageCosts(monthlyCost)
.ioCosts(ioCosts)
.networkCosts(networkCosts)
.totalCosts(monthlyCost + ioCosts + networkCosts)
.build();
}
public void generateCostOptimizationReport() {
// Identify cost optimization opportunities:
// 1. Topics with high storage cost but low access
List<String> overRetainedTopics = findOverRetainedTopics();
// 2. Topics suitable for compression optimization
List<String> compressionCandidates = findCompressionCandidates();
// 3. Topics suitable for tiered storage
List<String> tieringCandidates = findTieringCandidates();
// 4. Unused or low-value topics
List<String> unusedTopics = findUnusedTopics();
generateReport(overRetainedTopics, compressionCandidates,
tieringCandidates, unusedTopics);
}
}
Monitoring and Troubleshooting
Essential Retention Metrics
public class RetentionMonitoring {
public void setupRetentionAlerting() {
// Critical storage alerts
alertOnMetric("kafka.log:type=LogSize,name=Size",
threshold -> threshold > getMaxDiskCapacity() * 0.85,
"Disk usage critical - approaching capacity limit");
alertOnMetric("kafka.log:type=LogManager,name=OfflineLogDirectoryCount",
count -> count > 0,
"Log directory offline - data loss risk");
// Retention process health
alertOnMetric("kafka.log:type=LogCleanerManager,name=uncleanable-partitions-count",
count -> count > 0,
"Partitions unable to be cleaned");
alertOnMetric("kafka.log:type=LogCleaner,name=max-dirty-percent",
percent -> percent > 0.8,
"Log cleaner falling behind - high dirty ratio");
// Segment management
alertOnMetric("kafka.log:type=LogManager,name=LogFlushRateAndTimeMs",
rate -> rate < expectedFlushRate * 0.5,
"Log flush rate degraded");
}
public RetentionHealthReport generateHealthReport() {
return RetentionHealthReport.builder()
.storageUtilization(calculateStorageUtilization())
.retentionCompliance(checkRetentionCompliance())
.compactionEffectiveness(measureCompactionEffectiveness())
.segmentHealth(assessSegmentHealth())
.cleanupPerformance(measureCleanupPerformance())
.build();
}
}
Troubleshooting Playbook
Common Issues and Solutions:
public class RetentionTroubleshooting {
public void troubleshootHighDiskUsage() {
System.out.println("DISK USAGE TROUBLESHOOTING:");
System.out.println("1. Check retention policies:");
System.out.println(" kafka-topics --describe --topic <topic>");
System.out.println("2. Verify cleanup is running:");
System.out.println(" Check JMX: kafka.log:type=LogCleaner,name=cleaner-recopy-percent");
System.out.println("3. Look for stuck segments:");
System.out.println(" ls -la /kafka-logs/*/");
System.out.println("4. Check for errors:");
System.out.println(" grep ERROR /kafka-logs/log-cleaner.log");
}
public void troubleshootCompactionIssues() {
System.out.println("COMPACTION TROUBLESHOOTING:");
System.out.println("1. Check compaction lag:");
System.out.println(" JMX: kafka.log:type=LogCleaner,name=max-compaction-delay-secs");
System.out.println("2. Verify key distribution:");
System.out.println(" kafka-dump-log --files /kafka-logs/topic-0/00000.log");
System.out.println("3. Check cleaner thread status:");
System.out.println(" JMX: kafka.log:type=LogCleaner,name=cleaner-recopy-percent");
System.out.println("4. Review memory allocation:");
System.out.println(" Check log.cleaner.dedupe.buffer.size setting");
}
public void troubleshootSlowCleanup() {
// Step 1: Check I/O throttling
double ioThrottle = getIOThrottleLimit();
if (ioThrottle < 100_000_000) { // 100MB/s
System.out.println("Consider increasing log.cleaner.io.max.bytes.per.second");
}
// Step 2: Check thread allocation
int cleanerThreads = getCleanerThreadCount();
int availableCores = Runtime.getRuntime().availableProcessors();
if (cleanerThreads < availableCores / 4) {
System.out.println("Consider increasing log.cleaner.threads");
}
// Step 3: Check segment sizing
analyzeSegmentSizing();
}
}
This production-focused deep dive into Kafka retention and log compaction provides the technical expertise needed for implementing cost-effective, scalable storage strategies in enterprise environments. The content emphasizes operational excellence, capacity planning, and troubleshooting skills essential for managing large-scale Kafka deployments.
See Also: [[Producer_Mechanics]], [[Consumer_Groups]], [[Event_Sourcing_Patterns]], [[Tiered_Storage_Strategies]]