Compression Integration Architecture - Part 3 of 4¶
Migration & Testing Strategies¶
Navigation: Index | ← Part 2 | Part 3 | Part 4 →
Migration Strategy¶
Backward Compatibility¶
The compression layer is designed for zero-downtime migration:
- Lazy Migration: Existing data remains uncompressed until first write
- Transparent Reads: Both compressed and uncompressed blocks are supported
- Gradual Rollout: New writes use compression; old data migrated on UPDATE
- Rollback Safe: Can disable compression without data loss
Migration Phases¶
┌──────────────────────────────────────────────────────────────┐
│ Migration Timeline │
├──────────────────────────────────────────────────────────────┤
│ │
│ Phase 1: Enable Compression (Day 0) │
│ ┌────────────────────────────────────────────────┐ │
│ │ • Deploy new code with compression enabled │ │
│ │ • All NEW writes compressed automatically │ │
│ │ • All reads work transparently │ │
│ │ • Zero downtime │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ Phase 2: Background Migration (Day 1-7) │
│ ┌────────────────────────────────────────────────┐ │
│ │ • Optional background job compresses old data │ │
│ │ • Low priority, runs during idle periods │ │
│ │ • Progress: 0% → 100% over 7 days │ │
│ │ • Can pause/resume anytime │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ Phase 3: Optimization (Day 7+) │
│ ┌────────────────────────────────────────────────┐ │
│ │ • Analyze compression statistics │ │
│ │ • Tune codec selection rules │ │
│ │ • Adjust configuration based on workload │ │
│ │ • Monitor storage savings │ │
│ └────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────┘
Block Format Versioning¶
//! Block format with version support
//! Location: src/storage/compression/block_format.rs
/// Compressed block format (on-disk)
///
/// Layout:
/// ```
/// +------------------+
/// | Magic (4 bytes) | "HCMP" = compressed, "HUNM" = uncompressed
/// +------------------+
/// | Version (2 bytes)| Format version (currently 1)
/// +------------------+
/// | Algorithm (1) | Compression algorithm enum
/// +------------------+
/// | Flags (1) | Reserved flags
/// +------------------+
/// | Original Size (8)| Uncompressed size in bytes
/// +------------------+
/// | Compressed (8) | Compressed size in bytes
/// +------------------+
/// | Checksum (4) | CRC32 of original data
/// +------------------+
/// | Pattern (1) | Detected data pattern
/// +------------------+
/// | Timestamp (8) | Compression timestamp
/// +------------------+
/// | Data (variable) | Compressed data
/// +------------------+
/// ```
const MAGIC_COMPRESSED: &[u8; 4] = b"HCMP";
const MAGIC_UNCOMPRESSED: &[u8; 4] = b"HUNM";
const CURRENT_VERSION: u16 = 1;
pub struct BlockFormat;
impl BlockFormat {
/// Serialize compressed block to bytes
pub fn serialize(block: &CompressedBlock) -> Vec<u8> {
let mut buf = Vec::with_capacity(37 + block.data.len());
// Magic
if block.algorithm == CompressionAlgorithm::None {
buf.extend_from_slice(MAGIC_UNCOMPRESSED);
} else {
buf.extend_from_slice(MAGIC_COMPRESSED);
}
// Version
buf.extend_from_slice(&CURRENT_VERSION.to_le_bytes());
// Algorithm
buf.push(Self::algorithm_to_byte(block.algorithm));
// Flags (reserved)
buf.push(0);
// Sizes
buf.extend_from_slice(&(block.original_size as u64).to_le_bytes());
buf.extend_from_slice(&(block.data.len() as u64).to_le_bytes());
// Checksum
buf.extend_from_slice(&block.checksum.to_le_bytes());
// Pattern
buf.push(Self::pattern_to_byte(block.pattern));
// Timestamp
buf.extend_from_slice(&block.compressed_at.to_le_bytes());
// Data
buf.extend_from_slice(&block.data);
buf
}
/// Deserialize block from bytes
pub fn deserialize(data: &[u8]) -> Result<CompressedBlock> {
if data.len() < 37 {
return Err(Error::InvalidBlockFormat("Too short".to_string()));
}
// Check magic
let magic = &data[0..4];
let is_compressed = if magic == MAGIC_COMPRESSED {
true
} else if magic == MAGIC_UNCOMPRESSED {
false
} else {
return Err(Error::InvalidBlockFormat("Invalid magic".to_string()));
};
// Version
let version = u16::from_le_bytes([data[4], data[5]]);
if version != CURRENT_VERSION {
return Err(Error::UnsupportedVersion(version));
}
// Algorithm
let algorithm = Self::byte_to_algorithm(data[6])?;
// Flags (unused)
let _flags = data[7];
// Sizes
let original_size = u64::from_le_bytes([
data[8], data[9], data[10], data[11],
data[12], data[13], data[14], data[15],
]) as usize;
let compressed_size = u64::from_le_bytes([
data[16], data[17], data[18], data[19],
data[20], data[21], data[22], data[23],
]) as usize;
// Checksum
let checksum = u32::from_le_bytes([data[24], data[25], data[26], data[27]]);
// Pattern
let pattern = Self::byte_to_pattern(data[28])?;
// Timestamp
let compressed_at = u64::from_le_bytes([
data[29], data[30], data[31], data[32],
data[33], data[34], data[35], data[36],
]);
// Data
if data.len() < 37 + compressed_size {
return Err(Error::InvalidBlockFormat("Truncated data".to_string()));
}
let block_data = data[37..37+compressed_size].to_vec();
Ok(CompressedBlock {
algorithm,
version: CURRENT_VERSION,
original_size,
data: block_data,
checksum,
compressed_at,
pattern,
})
}
fn algorithm_to_byte(algo: CompressionAlgorithm) -> u8 {
match algo {
CompressionAlgorithm::None => 0,
CompressionAlgorithm::Lz4 => 1,
CompressionAlgorithm::Zstd => 2,
CompressionAlgorithm::Fsst => 3,
CompressionAlgorithm::Alp => 4,
CompressionAlgorithm::Dictionary => 5,
CompressionAlgorithm::Rle => 6,
CompressionAlgorithm::Delta => 7,
CompressionAlgorithm::Auto => 255, // Should not be serialized
}
}
fn byte_to_algorithm(byte: u8) -> Result<CompressionAlgorithm> {
Ok(match byte {
0 => CompressionAlgorithm::None,
1 => CompressionAlgorithm::Lz4,
2 => CompressionAlgorithm::Zstd,
3 => CompressionAlgorithm::Fsst,
4 => CompressionAlgorithm::Alp,
5 => CompressionAlgorithm::Dictionary,
6 => CompressionAlgorithm::Rle,
7 => CompressionAlgorithm::Delta,
_ => return Err(Error::UnknownAlgorithm(byte)),
})
}
fn pattern_to_byte(pattern: DataPattern) -> u8 {
match pattern {
DataPattern::Random => 0,
DataPattern::StringData => 1,
DataPattern::IntegerData => 2,
DataPattern::FloatingPointData => 3,
DataPattern::LowCardinality => 4,
DataPattern::Sequential => 5,
DataPattern::TimeSeries => 6,
DataPattern::StructuredText => 7,
}
}
fn byte_to_pattern(byte: u8) -> Result<DataPattern> {
Ok(match byte {
0 => DataPattern::Random,
1 => DataPattern::StringData,
2 => DataPattern::IntegerData,
3 => DataPattern::FloatingPointData,
4 => DataPattern::LowCardinality,
5 => DataPattern::Sequential,
6 => DataPattern::TimeSeries,
7 => DataPattern::StructuredText,
_ => return Err(Error::UnknownPattern(byte)),
})
}
}
Background Migration Job¶
//! Background compression migration job
//! Location: src/storage/compression/migration.rs
use super::{CompressionManager, CompressedBlock};
use crate::storage::StorageEngine;
use crate::error::Result;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
/// Background migration configuration
#[derive(Debug, Clone)]
pub struct MigrationConfig {
/// Enable background migration
pub enabled: bool,
/// Batch size (number of keys per batch)
pub batch_size: usize,
/// Delay between batches (milliseconds)
pub batch_delay_ms: u64,
/// Maximum CPU usage (0.0-1.0)
pub max_cpu_usage: f64,
/// Run during specific hours only (24-hour format)
pub run_hours: Option<(u8, u8)>, // e.g., Some((22, 6)) = 10pm-6am only
}
impl Default for MigrationConfig {
fn default() -> Self {
Self {
enabled: false, // Opt-in
batch_size: 100,
batch_delay_ms: 100,
max_cpu_usage: 0.15, // 15% max
run_hours: None, // Run anytime
}
}
}
/// Background migration job
pub struct MigrationJob {
config: MigrationConfig,
storage: Arc<StorageEngine>,
compression: Arc<CompressionManager>,
running: Arc<AtomicBool>,
progress: Arc<AtomicU64>,
total_keys: Arc<AtomicU64>,
}
impl MigrationJob {
pub fn new(
config: MigrationConfig,
storage: Arc<StorageEngine>,
compression: Arc<CompressionManager>,
) -> Self {
Self {
config,
storage,
compression,
running: Arc::new(AtomicBool::new(false)),
progress: Arc::new(AtomicU64::new(0)),
total_keys: Arc::new(AtomicU64::new(0)),
}
}
/// Start migration job in background
pub fn start(&self) -> Result<()> {
if !self.config.enabled {
return Ok(());
}
if self.running.swap(true, Ordering::SeqCst) {
return Err(Error::AlreadyRunning);
}
// Clone Arcs for thread
let running = Arc::clone(&self.running);
let progress = Arc::clone(&self.progress);
let total_keys = Arc::clone(&self.total_keys);
let storage = Arc::clone(&self.storage);
let compression = Arc::clone(&self.compression);
let config = self.config.clone();
// Spawn background thread
std::thread::spawn(move || {
Self::run_migration(running, progress, total_keys, storage, compression, config);
});
Ok(())
}
/// Stop migration job
pub fn stop(&self) {
self.running.store(false, Ordering::SeqCst);
}
/// Get migration progress (0.0-1.0)
pub fn progress(&self) -> f64 {
let processed = self.progress.load(Ordering::Relaxed);
let total = self.total_keys.load(Ordering::Relaxed);
if total == 0 {
0.0
} else {
processed as f64 / total as f64
}
}
fn run_migration(
running: Arc<AtomicBool>,
progress: Arc<AtomicU64>,
total_keys: Arc<AtomicU64>,
storage: Arc<StorageEngine>,
compression: Arc<CompressionManager>,
config: MigrationConfig,
) {
tracing::info!("Starting background compression migration");
// Count total keys (approximate)
// This is a simplified version - actual implementation would use RocksDB iterator
let total = 1_000_000; // Placeholder
total_keys.store(total, Ordering::Relaxed);
let mut processed = 0u64;
while running.load(Ordering::Relaxed) {
// Check if we should run now
if !Self::should_run_now(&config) {
std::thread::sleep(Duration::from_secs(60));
continue;
}
// Check CPU usage
if Self::get_cpu_usage() > config.max_cpu_usage {
std::thread::sleep(Duration::from_millis(config.batch_delay_ms * 10));
continue;
}
// Process batch
match Self::process_batch(&storage, &compression, config.batch_size) {
Ok(count) => {
processed += count as u64;
progress.store(processed, Ordering::Relaxed);
if count == 0 {
tracing::info!("Background compression migration complete");
running.store(false, Ordering::Relaxed);
break;
}
}
Err(e) => {
tracing::error!("Migration batch failed: {}", e);
std::thread::sleep(Duration::from_secs(10));
}
}
// Delay between batches
std::thread::sleep(Duration::from_millis(config.batch_delay_ms));
}
tracing::info!("Background compression migration stopped");
}
fn should_run_now(config: &MigrationConfig) -> bool {
if let Some((start, end)) = config.run_hours {
let now = chrono::Local::now().hour() as u8;
if start < end {
now >= start && now < end
} else {
// Wrap around midnight
now >= start || now < end
}
} else {
true
}
}
fn get_cpu_usage() -> f64 {
// Placeholder - actual implementation would use sysinfo crate
0.05
}
fn process_batch(
storage: &Arc<StorageEngine>,
compression: &Arc<CompressionManager>,
batch_size: usize,
) -> Result<usize> {
// Simplified - actual implementation would:
// 1. Iterate over RocksDB keys
// 2. Check if already compressed (parse block header)
// 3. If not compressed, read value, compress, write back
// 4. Use write batch for atomicity
// Placeholder
Ok(batch_size)
}
}
Testing Strategy¶
Unit Tests¶
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fsst_codec_string_data() {
let codec = FsstCodec::new();
let data = b"Hello world! Hello world! Hello world!";
let compressed = codec.compress(data).unwrap();
assert!(compressed.len() < data.len());
let decompressed = codec.decompress(&compressed, Some(data.len())).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn test_alp_codec_float_data() {
let codec = AlpCodec::new();
let floats: Vec<f64> = vec![1.23, 1.24, 1.25, 1.26, 1.27];
let data = unsafe {
std::slice::from_raw_parts(
floats.as_ptr() as *const u8,
floats.len() * 8,
)
};
let compressed = codec.compress(data).unwrap();
assert!(compressed.len() < data.len());
let decompressed = codec.decompress(&compressed, Some(data.len())).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn test_pattern_analyzer() {
let analyzer = PatternAnalyzer::new();
// String data
let string_data = b"Hello world! This is a test string.";
assert_eq!(analyzer.analyze(string_data).unwrap(), DataPattern::StringData);
// Float data
let floats: Vec<f64> = vec![1.1, 2.2, 3.3, 4.4, 5.5];
let float_data = unsafe {
std::slice::from_raw_parts(floats.as_ptr() as *const u8, floats.len() * 8)
};
assert_eq!(analyzer.analyze(float_data).unwrap(), DataPattern::FloatingPointData);
// Low cardinality
let low_card = vec![1u8, 2, 3, 1, 2, 3, 1, 2, 3]; // Only 3 unique values
assert_eq!(analyzer.analyze(&low_card).unwrap(), DataPattern::LowCardinality);
}
#[test]
fn test_codec_selector() {
let selector = CodecSelector::new();
// String data should use FSST
let string_data = b"Hello world!";
let algo = selector.select_for_pattern(DataPattern::StringData, string_data).unwrap();
assert_eq!(algo, CompressionAlgorithm::Fsst);
// Float data should use ALP
let algo = selector.select_for_pattern(DataPattern::FloatingPointData, &[]).unwrap();
assert_eq!(algo, CompressionAlgorithm::Alp);
// Low cardinality should use Dictionary
let algo = selector.select_for_pattern(DataPattern::LowCardinality, &[]).unwrap();
assert_eq!(algo, CompressionAlgorithm::Dictionary);
}
#[test]
fn test_compression_manager() {
let config = CompressionConfig::default();
let manager = CompressionManager::new(config).unwrap();
let data = b"Test data for compression";
let block = manager.compress(data).unwrap();
assert!(block.original_size == data.len());
assert!(block.algorithm != CompressionAlgorithm::None);
let decompressed = manager.decompress(&block).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn test_block_format_serialization() {
let block = CompressedBlock {
algorithm: CompressionAlgorithm::Fsst,
version: 1,
original_size: 1000,
data: vec![1, 2, 3, 4, 5],
checksum: 0x12345678,
compressed_at: 1700000000,
pattern: DataPattern::StringData,
};
let serialized = BlockFormat::serialize(&block);
let deserialized = BlockFormat::deserialize(&serialized).unwrap();
assert_eq!(deserialized.algorithm, block.algorithm);
assert_eq!(deserialized.original_size, block.original_size);
assert_eq!(deserialized.data, block.data);
assert_eq!(deserialized.checksum, block.checksum);
}
}
Integration Tests¶
#[cfg(test)]
mod integration_tests {
use super::*;
use crate::storage::StorageEngine;
use crate::config::Config;
#[test]
fn test_storage_engine_with_compression() {
let mut config = Config::in_memory();
config.storage.compression.enabled = true;
config.storage.compression.algorithm = CompressionAlgorithmConfig::Auto;
let engine = StorageEngine::open_in_memory(&config).unwrap();
// Insert data
let key = b"test_key".to_vec();
let value = b"test_value_with_some_repeated_data_repeated_data".to_vec();
engine.put(&key, &value).unwrap();
// Retrieve data
let retrieved = engine.get(&key).unwrap();
assert_eq!(retrieved, Some(value));
}
#[test]
fn test_tuple_compression() {
let mut config = Config::in_memory();
config.storage.compression.enabled = true;
let engine = StorageEngine::open_in_memory(&config).unwrap();
// Create tuple
let tuple = Tuple::new(vec![
Value::String("Hello".to_string()),
Value::String("World".to_string()),
Value::Int4(42),
]);
// Insert tuple
let row_id = engine.insert_tuple("test_table", tuple.clone()).unwrap();
// Scan table
let tuples = engine.scan_table("test_table").unwrap();
assert_eq!(tuples.len(), 1);
assert_eq!(tuples[0], tuple);
}
#[test]
fn test_compression_stats() {
let mut config = Config::in_memory();
config.storage.compression.enabled = true;
config.storage.compression.auto.collect_stats = true;
let engine = StorageEngine::open_in_memory(&config).unwrap();
// Insert multiple keys
for i in 0..100 {
let key = format!("key_{}", i).into_bytes();
let value = format!("value_{}_repeated_repeated", i).into_bytes();
engine.put(&key, &value).unwrap();
}
// Get stats
let stats = engine.compression_stats().unwrap();
assert!(stats.total_compressions > 0);
assert!(stats.avg_compression_ratio > 1.0);
}
}
Benchmark Tests¶
#[cfg(test)]
mod benchmarks {
use super::*;
use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId};
fn bench_compression_algorithms(c: &mut Criterion) {
let data_sizes = vec![1024, 10_240, 102_400, 1_024_000]; // 1KB to 1MB
for size in data_sizes {
let data = vec![b'x'; size];
let mut group = c.benchmark_group(format!("compress_{}_bytes", size));
// FSST
group.bench_function(BenchmarkId::new("fsst", size), |b| {
let codec = FsstCodec::new();
b.iter(|| codec.compress(black_box(&data)))
});
// ALP (with float data)
let float_data: Vec<f64> = (0..size/8).map(|i| i as f64 * 1.1).collect();
let float_bytes = unsafe {
std::slice::from_raw_parts(
float_data.as_ptr() as *const u8,
float_data.len() * 8,
)
};
group.bench_function(BenchmarkId::new("alp", size), |b| {
let codec = AlpCodec::new();
b.iter(|| codec.compress(black_box(float_bytes)))
});
// Dictionary
group.bench_function(BenchmarkId::new("dictionary", size), |b| {
let codec = DictionaryCodec::new();
b.iter(|| codec.compress(black_box(&data)))
});
group.finish();
}
}
fn bench_pattern_analysis(c: &mut Criterion) {
let analyzer = PatternAnalyzer::new();
let mut group = c.benchmark_group("pattern_analysis");
// String data
let string_data = b"Hello world! ".repeat(100);
group.bench_function("string_data", |b| {
b.iter(|| analyzer.analyze(black_box(&string_data)))
});
// Float data
let floats: Vec<f64> = (0..1000).map(|i| i as f64 * 1.1).collect();
let float_bytes = unsafe {
std::slice::from_raw_parts(floats.as_ptr() as *const u8, floats.len() * 8)
};
group.bench_function("float_data", |b| {
b.iter(|| analyzer.analyze(black_box(float_bytes)))
});
group.finish();
}
criterion_group!(benches, bench_compression_algorithms, bench_pattern_analysis);
criterion_main!(benches);
}