Skip to content

Compression Integration Architecture - Part 3 of 4

Migration & Testing Strategies

Navigation: Index | ← Part 2 | Part 3 | Part 4 →


Migration Strategy

Backward Compatibility

The compression layer is designed for zero-downtime migration:

  1. Lazy Migration: Existing data remains uncompressed until first write
  2. Transparent Reads: Both compressed and uncompressed blocks are supported
  3. Gradual Rollout: New writes use compression; old data migrated on UPDATE
  4. Rollback Safe: Can disable compression without data loss

Migration Phases

┌──────────────────────────────────────────────────────────────┐
│                    Migration Timeline                        │
├──────────────────────────────────────────────────────────────┤
│                                                               │
│  Phase 1: Enable Compression (Day 0)                         │
│  ┌────────────────────────────────────────────────┐         │
│  │ • Deploy new code with compression enabled      │         │
│  │ • All NEW writes compressed automatically       │         │
│  │ • All reads work transparently                  │         │
│  │ • Zero downtime                                 │         │
│  └────────────────────────────────────────────────┘         │
│                                                               │
│  Phase 2: Background Migration (Day 1-7)                     │
│  ┌────────────────────────────────────────────────┐         │
│  │ • Optional background job compresses old data   │         │
│  │ • Low priority, runs during idle periods        │         │
│  │ • Progress: 0% → 100% over 7 days              │         │
│  │ • Can pause/resume anytime                      │         │
│  └────────────────────────────────────────────────┘         │
│                                                               │
│  Phase 3: Optimization (Day 7+)                              │
│  ┌────────────────────────────────────────────────┐         │
│  │ • Analyze compression statistics                │         │
│  │ • Tune codec selection rules                    │         │
│  │ • Adjust configuration based on workload        │         │
│  │ • Monitor storage savings                       │         │
│  └────────────────────────────────────────────────┘         │
│                                                               │
└──────────────────────────────────────────────────────────────┘

Block Format Versioning

//! Block format with version support
//! Location: src/storage/compression/block_format.rs

/// Compressed block format (on-disk)
///
/// Layout:
/// ```
/// +------------------+
/// | Magic (4 bytes)  |  "HCMP" = compressed, "HUNM" = uncompressed
/// +------------------+
/// | Version (2 bytes)|  Format version (currently 1)
/// +------------------+
/// | Algorithm (1)    |  Compression algorithm enum
/// +------------------+
/// | Flags (1)        |  Reserved flags
/// +------------------+
/// | Original Size (8)|  Uncompressed size in bytes
/// +------------------+
/// | Compressed (8)   |  Compressed size in bytes
/// +------------------+
/// | Checksum (4)     |  CRC32 of original data
/// +------------------+
/// | Pattern (1)      |  Detected data pattern
/// +------------------+
/// | Timestamp (8)    |  Compression timestamp
/// +------------------+
/// | Data (variable)  |  Compressed data
/// +------------------+
/// ```
const MAGIC_COMPRESSED: &[u8; 4] = b"HCMP";
const MAGIC_UNCOMPRESSED: &[u8; 4] = b"HUNM";
const CURRENT_VERSION: u16 = 1;

pub struct BlockFormat;

impl BlockFormat {
    /// Serialize compressed block to bytes
    pub fn serialize(block: &CompressedBlock) -> Vec<u8> {
        let mut buf = Vec::with_capacity(37 + block.data.len());

        // Magic
        if block.algorithm == CompressionAlgorithm::None {
            buf.extend_from_slice(MAGIC_UNCOMPRESSED);
        } else {
            buf.extend_from_slice(MAGIC_COMPRESSED);
        }

        // Version
        buf.extend_from_slice(&CURRENT_VERSION.to_le_bytes());

        // Algorithm
        buf.push(Self::algorithm_to_byte(block.algorithm));

        // Flags (reserved)
        buf.push(0);

        // Sizes
        buf.extend_from_slice(&(block.original_size as u64).to_le_bytes());
        buf.extend_from_slice(&(block.data.len() as u64).to_le_bytes());

        // Checksum
        buf.extend_from_slice(&block.checksum.to_le_bytes());

        // Pattern
        buf.push(Self::pattern_to_byte(block.pattern));

        // Timestamp
        buf.extend_from_slice(&block.compressed_at.to_le_bytes());

        // Data
        buf.extend_from_slice(&block.data);

        buf
    }

    /// Deserialize block from bytes
    pub fn deserialize(data: &[u8]) -> Result<CompressedBlock> {
        if data.len() < 37 {
            return Err(Error::InvalidBlockFormat("Too short".to_string()));
        }

        // Check magic
        let magic = &data[0..4];
        let is_compressed = if magic == MAGIC_COMPRESSED {
            true
        } else if magic == MAGIC_UNCOMPRESSED {
            false
        } else {
            return Err(Error::InvalidBlockFormat("Invalid magic".to_string()));
        };

        // Version
        let version = u16::from_le_bytes([data[4], data[5]]);
        if version != CURRENT_VERSION {
            return Err(Error::UnsupportedVersion(version));
        }

        // Algorithm
        let algorithm = Self::byte_to_algorithm(data[6])?;

        // Flags (unused)
        let _flags = data[7];

        // Sizes
        let original_size = u64::from_le_bytes([
            data[8], data[9], data[10], data[11],
            data[12], data[13], data[14], data[15],
        ]) as usize;

        let compressed_size = u64::from_le_bytes([
            data[16], data[17], data[18], data[19],
            data[20], data[21], data[22], data[23],
        ]) as usize;

        // Checksum
        let checksum = u32::from_le_bytes([data[24], data[25], data[26], data[27]]);

        // Pattern
        let pattern = Self::byte_to_pattern(data[28])?;

        // Timestamp
        let compressed_at = u64::from_le_bytes([
            data[29], data[30], data[31], data[32],
            data[33], data[34], data[35], data[36],
        ]);

        // Data
        if data.len() < 37 + compressed_size {
            return Err(Error::InvalidBlockFormat("Truncated data".to_string()));
        }
        let block_data = data[37..37+compressed_size].to_vec();

        Ok(CompressedBlock {
            algorithm,
            version: CURRENT_VERSION,
            original_size,
            data: block_data,
            checksum,
            compressed_at,
            pattern,
        })
    }

    fn algorithm_to_byte(algo: CompressionAlgorithm) -> u8 {
        match algo {
            CompressionAlgorithm::None => 0,
            CompressionAlgorithm::Lz4 => 1,
            CompressionAlgorithm::Zstd => 2,
            CompressionAlgorithm::Fsst => 3,
            CompressionAlgorithm::Alp => 4,
            CompressionAlgorithm::Dictionary => 5,
            CompressionAlgorithm::Rle => 6,
            CompressionAlgorithm::Delta => 7,
            CompressionAlgorithm::Auto => 255, // Should not be serialized
        }
    }

    fn byte_to_algorithm(byte: u8) -> Result<CompressionAlgorithm> {
        Ok(match byte {
            0 => CompressionAlgorithm::None,
            1 => CompressionAlgorithm::Lz4,
            2 => CompressionAlgorithm::Zstd,
            3 => CompressionAlgorithm::Fsst,
            4 => CompressionAlgorithm::Alp,
            5 => CompressionAlgorithm::Dictionary,
            6 => CompressionAlgorithm::Rle,
            7 => CompressionAlgorithm::Delta,
            _ => return Err(Error::UnknownAlgorithm(byte)),
        })
    }

    fn pattern_to_byte(pattern: DataPattern) -> u8 {
        match pattern {
            DataPattern::Random => 0,
            DataPattern::StringData => 1,
            DataPattern::IntegerData => 2,
            DataPattern::FloatingPointData => 3,
            DataPattern::LowCardinality => 4,
            DataPattern::Sequential => 5,
            DataPattern::TimeSeries => 6,
            DataPattern::StructuredText => 7,
        }
    }

    fn byte_to_pattern(byte: u8) -> Result<DataPattern> {
        Ok(match byte {
            0 => DataPattern::Random,
            1 => DataPattern::StringData,
            2 => DataPattern::IntegerData,
            3 => DataPattern::FloatingPointData,
            4 => DataPattern::LowCardinality,
            5 => DataPattern::Sequential,
            6 => DataPattern::TimeSeries,
            7 => DataPattern::StructuredText,
            _ => return Err(Error::UnknownPattern(byte)),
        })
    }
}

Background Migration Job

//! Background compression migration job
//! Location: src/storage/compression/migration.rs

use super::{CompressionManager, CompressedBlock};
use crate::storage::StorageEngine;
use crate::error::Result;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;

/// Background migration configuration
#[derive(Debug, Clone)]
pub struct MigrationConfig {
    /// Enable background migration
    pub enabled: bool,
    /// Batch size (number of keys per batch)
    pub batch_size: usize,
    /// Delay between batches (milliseconds)
    pub batch_delay_ms: u64,
    /// Maximum CPU usage (0.0-1.0)
    pub max_cpu_usage: f64,
    /// Run during specific hours only (24-hour format)
    pub run_hours: Option<(u8, u8)>, // e.g., Some((22, 6)) = 10pm-6am only
}

impl Default for MigrationConfig {
    fn default() -> Self {
        Self {
            enabled: false,  // Opt-in
            batch_size: 100,
            batch_delay_ms: 100,
            max_cpu_usage: 0.15,  // 15% max
            run_hours: None,      // Run anytime
        }
    }
}

/// Background migration job
pub struct MigrationJob {
    config: MigrationConfig,
    storage: Arc<StorageEngine>,
    compression: Arc<CompressionManager>,
    running: Arc<AtomicBool>,
    progress: Arc<AtomicU64>,
    total_keys: Arc<AtomicU64>,
}

impl MigrationJob {
    pub fn new(
        config: MigrationConfig,
        storage: Arc<StorageEngine>,
        compression: Arc<CompressionManager>,
    ) -> Self {
        Self {
            config,
            storage,
            compression,
            running: Arc::new(AtomicBool::new(false)),
            progress: Arc::new(AtomicU64::new(0)),
            total_keys: Arc::new(AtomicU64::new(0)),
        }
    }

    /// Start migration job in background
    pub fn start(&self) -> Result<()> {
        if !self.config.enabled {
            return Ok(());
        }

        if self.running.swap(true, Ordering::SeqCst) {
            return Err(Error::AlreadyRunning);
        }

        // Clone Arcs for thread
        let running = Arc::clone(&self.running);
        let progress = Arc::clone(&self.progress);
        let total_keys = Arc::clone(&self.total_keys);
        let storage = Arc::clone(&self.storage);
        let compression = Arc::clone(&self.compression);
        let config = self.config.clone();

        // Spawn background thread
        std::thread::spawn(move || {
            Self::run_migration(running, progress, total_keys, storage, compression, config);
        });

        Ok(())
    }

    /// Stop migration job
    pub fn stop(&self) {
        self.running.store(false, Ordering::SeqCst);
    }

    /// Get migration progress (0.0-1.0)
    pub fn progress(&self) -> f64 {
        let processed = self.progress.load(Ordering::Relaxed);
        let total = self.total_keys.load(Ordering::Relaxed);
        if total == 0 {
            0.0
        } else {
            processed as f64 / total as f64
        }
    }

    fn run_migration(
        running: Arc<AtomicBool>,
        progress: Arc<AtomicU64>,
        total_keys: Arc<AtomicU64>,
        storage: Arc<StorageEngine>,
        compression: Arc<CompressionManager>,
        config: MigrationConfig,
    ) {
        tracing::info!("Starting background compression migration");

        // Count total keys (approximate)
        // This is a simplified version - actual implementation would use RocksDB iterator
        let total = 1_000_000; // Placeholder
        total_keys.store(total, Ordering::Relaxed);

        let mut processed = 0u64;

        while running.load(Ordering::Relaxed) {
            // Check if we should run now
            if !Self::should_run_now(&config) {
                std::thread::sleep(Duration::from_secs(60));
                continue;
            }

            // Check CPU usage
            if Self::get_cpu_usage() > config.max_cpu_usage {
                std::thread::sleep(Duration::from_millis(config.batch_delay_ms * 10));
                continue;
            }

            // Process batch
            match Self::process_batch(&storage, &compression, config.batch_size) {
                Ok(count) => {
                    processed += count as u64;
                    progress.store(processed, Ordering::Relaxed);

                    if count == 0 {
                        tracing::info!("Background compression migration complete");
                        running.store(false, Ordering::Relaxed);
                        break;
                    }
                }
                Err(e) => {
                    tracing::error!("Migration batch failed: {}", e);
                    std::thread::sleep(Duration::from_secs(10));
                }
            }

            // Delay between batches
            std::thread::sleep(Duration::from_millis(config.batch_delay_ms));
        }

        tracing::info!("Background compression migration stopped");
    }

    fn should_run_now(config: &MigrationConfig) -> bool {
        if let Some((start, end)) = config.run_hours {
            let now = chrono::Local::now().hour() as u8;
            if start < end {
                now >= start && now < end
            } else {
                // Wrap around midnight
                now >= start || now < end
            }
        } else {
            true
        }
    }

    fn get_cpu_usage() -> f64 {
        // Placeholder - actual implementation would use sysinfo crate
        0.05
    }

    fn process_batch(
        storage: &Arc<StorageEngine>,
        compression: &Arc<CompressionManager>,
        batch_size: usize,
    ) -> Result<usize> {
        // Simplified - actual implementation would:
        // 1. Iterate over RocksDB keys
        // 2. Check if already compressed (parse block header)
        // 3. If not compressed, read value, compress, write back
        // 4. Use write batch for atomicity

        // Placeholder
        Ok(batch_size)
    }
}

Testing Strategy

Unit Tests

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_fsst_codec_string_data() {
        let codec = FsstCodec::new();
        let data = b"Hello world! Hello world! Hello world!";

        let compressed = codec.compress(data).unwrap();
        assert!(compressed.len() < data.len());

        let decompressed = codec.decompress(&compressed, Some(data.len())).unwrap();
        assert_eq!(decompressed, data);
    }

    #[test]
    fn test_alp_codec_float_data() {
        let codec = AlpCodec::new();
        let floats: Vec<f64> = vec![1.23, 1.24, 1.25, 1.26, 1.27];
        let data = unsafe {
            std::slice::from_raw_parts(
                floats.as_ptr() as *const u8,
                floats.len() * 8,
            )
        };

        let compressed = codec.compress(data).unwrap();
        assert!(compressed.len() < data.len());

        let decompressed = codec.decompress(&compressed, Some(data.len())).unwrap();
        assert_eq!(decompressed, data);
    }

    #[test]
    fn test_pattern_analyzer() {
        let analyzer = PatternAnalyzer::new();

        // String data
        let string_data = b"Hello world! This is a test string.";
        assert_eq!(analyzer.analyze(string_data).unwrap(), DataPattern::StringData);

        // Float data
        let floats: Vec<f64> = vec![1.1, 2.2, 3.3, 4.4, 5.5];
        let float_data = unsafe {
            std::slice::from_raw_parts(floats.as_ptr() as *const u8, floats.len() * 8)
        };
        assert_eq!(analyzer.analyze(float_data).unwrap(), DataPattern::FloatingPointData);

        // Low cardinality
        let low_card = vec![1u8, 2, 3, 1, 2, 3, 1, 2, 3]; // Only 3 unique values
        assert_eq!(analyzer.analyze(&low_card).unwrap(), DataPattern::LowCardinality);
    }

    #[test]
    fn test_codec_selector() {
        let selector = CodecSelector::new();

        // String data should use FSST
        let string_data = b"Hello world!";
        let algo = selector.select_for_pattern(DataPattern::StringData, string_data).unwrap();
        assert_eq!(algo, CompressionAlgorithm::Fsst);

        // Float data should use ALP
        let algo = selector.select_for_pattern(DataPattern::FloatingPointData, &[]).unwrap();
        assert_eq!(algo, CompressionAlgorithm::Alp);

        // Low cardinality should use Dictionary
        let algo = selector.select_for_pattern(DataPattern::LowCardinality, &[]).unwrap();
        assert_eq!(algo, CompressionAlgorithm::Dictionary);
    }

    #[test]
    fn test_compression_manager() {
        let config = CompressionConfig::default();
        let manager = CompressionManager::new(config).unwrap();

        let data = b"Test data for compression";
        let block = manager.compress(data).unwrap();

        assert!(block.original_size == data.len());
        assert!(block.algorithm != CompressionAlgorithm::None);

        let decompressed = manager.decompress(&block).unwrap();
        assert_eq!(decompressed, data);
    }

    #[test]
    fn test_block_format_serialization() {
        let block = CompressedBlock {
            algorithm: CompressionAlgorithm::Fsst,
            version: 1,
            original_size: 1000,
            data: vec![1, 2, 3, 4, 5],
            checksum: 0x12345678,
            compressed_at: 1700000000,
            pattern: DataPattern::StringData,
        };

        let serialized = BlockFormat::serialize(&block);
        let deserialized = BlockFormat::deserialize(&serialized).unwrap();

        assert_eq!(deserialized.algorithm, block.algorithm);
        assert_eq!(deserialized.original_size, block.original_size);
        assert_eq!(deserialized.data, block.data);
        assert_eq!(deserialized.checksum, block.checksum);
    }
}

Integration Tests

#[cfg(test)]
mod integration_tests {
    use super::*;
    use crate::storage::StorageEngine;
    use crate::config::Config;

    #[test]
    fn test_storage_engine_with_compression() {
        let mut config = Config::in_memory();
        config.storage.compression.enabled = true;
        config.storage.compression.algorithm = CompressionAlgorithmConfig::Auto;

        let engine = StorageEngine::open_in_memory(&config).unwrap();

        // Insert data
        let key = b"test_key".to_vec();
        let value = b"test_value_with_some_repeated_data_repeated_data".to_vec();
        engine.put(&key, &value).unwrap();

        // Retrieve data
        let retrieved = engine.get(&key).unwrap();
        assert_eq!(retrieved, Some(value));
    }

    #[test]
    fn test_tuple_compression() {
        let mut config = Config::in_memory();
        config.storage.compression.enabled = true;

        let engine = StorageEngine::open_in_memory(&config).unwrap();

        // Create tuple
        let tuple = Tuple::new(vec![
            Value::String("Hello".to_string()),
            Value::String("World".to_string()),
            Value::Int4(42),
        ]);

        // Insert tuple
        let row_id = engine.insert_tuple("test_table", tuple.clone()).unwrap();

        // Scan table
        let tuples = engine.scan_table("test_table").unwrap();
        assert_eq!(tuples.len(), 1);
        assert_eq!(tuples[0], tuple);
    }

    #[test]
    fn test_compression_stats() {
        let mut config = Config::in_memory();
        config.storage.compression.enabled = true;
        config.storage.compression.auto.collect_stats = true;

        let engine = StorageEngine::open_in_memory(&config).unwrap();

        // Insert multiple keys
        for i in 0..100 {
            let key = format!("key_{}", i).into_bytes();
            let value = format!("value_{}_repeated_repeated", i).into_bytes();
            engine.put(&key, &value).unwrap();
        }

        // Get stats
        let stats = engine.compression_stats().unwrap();
        assert!(stats.total_compressions > 0);
        assert!(stats.avg_compression_ratio > 1.0);
    }
}

Benchmark Tests

#[cfg(test)]
mod benchmarks {
    use super::*;
    use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId};

    fn bench_compression_algorithms(c: &mut Criterion) {
        let data_sizes = vec![1024, 10_240, 102_400, 1_024_000]; // 1KB to 1MB

        for size in data_sizes {
            let data = vec![b'x'; size];

            let mut group = c.benchmark_group(format!("compress_{}_bytes", size));

            // FSST
            group.bench_function(BenchmarkId::new("fsst", size), |b| {
                let codec = FsstCodec::new();
                b.iter(|| codec.compress(black_box(&data)))
            });

            // ALP (with float data)
            let float_data: Vec<f64> = (0..size/8).map(|i| i as f64 * 1.1).collect();
            let float_bytes = unsafe {
                std::slice::from_raw_parts(
                    float_data.as_ptr() as *const u8,
                    float_data.len() * 8,
                )
            };
            group.bench_function(BenchmarkId::new("alp", size), |b| {
                let codec = AlpCodec::new();
                b.iter(|| codec.compress(black_box(float_bytes)))
            });

            // Dictionary
            group.bench_function(BenchmarkId::new("dictionary", size), |b| {
                let codec = DictionaryCodec::new();
                b.iter(|| codec.compress(black_box(&data)))
            });

            group.finish();
        }
    }

    fn bench_pattern_analysis(c: &mut Criterion) {
        let analyzer = PatternAnalyzer::new();

        let mut group = c.benchmark_group("pattern_analysis");

        // String data
        let string_data = b"Hello world! ".repeat(100);
        group.bench_function("string_data", |b| {
            b.iter(|| analyzer.analyze(black_box(&string_data)))
        });

        // Float data
        let floats: Vec<f64> = (0..1000).map(|i| i as f64 * 1.1).collect();
        let float_bytes = unsafe {
            std::slice::from_raw_parts(floats.as_ptr() as *const u8, floats.len() * 8)
        };
        group.bench_function("float_data", |b| {
            b.iter(|| analyzer.analyze(black_box(float_bytes)))
        });

        group.finish();
    }

    criterion_group!(benches, bench_compression_algorithms, bench_pattern_analysis);
    criterion_main!(benches);
}