Skip to content

Compression Integration Architecture - Part 1 of 4

Architecture Overview & API Design

Navigation: Index | Part 1 | Part 2 →


HeliosDB Lite v2.1 - Compression Integration Architecture

Version: 2.1.0 Created: November 18, 2025 Status: Architecture Design Target: HeliosDB Lite Phase 3 - Week 2


Executive Summary

This document defines the architecture for integrating advanced compression codecs (FSST and ALP) into HeliosDB Lite v2.1, providing automatic, transparent compression that achieves 5-15x storage reduction with minimal performance overhead.

Key Design Principles

  1. Zero Configuration - Automatic codec selection based on data patterns
  2. Non-Breaking - Fully backward compatible with existing storage
  3. Modular - Clean separation between codecs, storage, and Arrow integration
  4. Observable - Comprehensive metrics and performance monitoring
  5. Zero IP - Only open-source algorithms (FSST, ALP, standard codecs)

Success Metrics

Metric Target Measurement
Storage Reduction 5-15x Compression ratio on real workloads
Read Overhead <3% Query latency with compression vs. without
Write Overhead <5% Insert throughput with compression vs. without
CPU Overhead <15% Background compression CPU usage
Migration Time 0 seconds Lazy migration on read/write

Architecture Overview

System Context Diagram

┌─────────────────────────────────────────────────────────────────┐
│                     HeliosDB Lite v2.1                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌────────────────────────────────────────────────────────┐   │
│  │              SQL Query Interface                        │   │
│  │  (PostgreSQL Wire Protocol + REPL)                     │   │
│  └────────────────┬───────────────────────────────────────┘   │
│                   │                                             │
│  ┌────────────────▼───────────────────────────────────────┐   │
│  │          Query Executor (Volcano Model)                │   │
│  │  • Scan Operator                                       │   │
│  │  • Filter/Project                                      │   │
│  │  • Join/Aggregate                                      │   │
│  └────────────────┬───────────────────────────────────────┘   │
│                   │                                             │
│  ┌────────────────▼───────────────────────────────────────┐   │
│  │         Storage Engine (RocksDB-based)                 │   │
│  │  ┌──────────────────────────────────────────────┐     │   │
│  │  │    Compression Layer (NEW)                    │     │   │
│  │  │  ┌──────────┬──────────┬─────────┬────────┐ │     │   │
│  │  │  │   FSST   │   ALP    │  Dict   │  RLE   │ │     │   │
│  │  │  └──────────┴──────────┴─────────┴────────┘ │     │   │
│  │  │  ┌──────────────────────────────────────┐   │     │   │
│  │  │  │ Codec Selector (Pattern-based)       │   │     │   │
│  │  │  └──────────────────────────────────────┘   │     │   │
│  │  └──────────────────────────────────────────────┘     │   │
│  │  ┌──────────────────────────────────────────────┐     │   │
│  │  │    Existing LZ4/Zstd (RocksDB native)        │     │   │
│  │  └──────────────────────────────────────────────┘     │   │
│  │  ┌──────────────────────────────────────────────┐     │   │
│  │  │    MVCC Layer (Versioned Values)             │     │   │
│  │  └──────────────────────────────────────────────┘     │   │
│  │  ┌──────────────────────────────────────────────┐     │   │
│  │  │    RocksDB (Persistent Storage)              │     │   │
│  │  └──────────────────────────────────────────────┘     │   │
│  └────────────────────────────────────────────────────────┘   │
│                                                                  │
│  ┌────────────────────────────────────────────────────────┐   │
│  │    Apache Arrow Integration (Columnar)                │   │
│  │  • Batch Processing                                    │   │
│  │  • Column-specific Compression                         │   │
│  │  • Parquet Export (with compression metadata)          │   │
│  └────────────────────────────────────────────────────────┘   │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Compression Layer Architecture

┌──────────────────────────────────────────────────────────────────┐
│                    Compression Layer                             │
├──────────────────────────────────────────────────────────────────┤
│                                                                   │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │             CompressionManager (Main API)                │  │
│  │  • compress(data) -> CompressedBlock                     │  │
│  │  • decompress(block) -> Vec<u8>                          │  │
│  │  • compress_column(column) -> CompressedColumn           │  │
│  └────┬───────────────────────────────────────┬─────────────┘  │
│       │                                        │                 │
│       ▼                                        ▼                 │
│  ┌────────────────────────┐          ┌────────────────────┐    │
│  │   Pattern Analyzer     │          │   Codec Registry   │    │
│  │  • SIMD-accelerated    │          │  • FSST            │    │
│  │  • Data type detection │          │  • ALP             │    │
│  │  • Entropy calculation │          │  • Dictionary      │    │
│  │  • Cardinality check   │          │  • RLE             │    │
│  └────┬───────────────────┘          │  • Delta           │    │
│       │                               │  • LZ4/Zstd        │    │
│       ▼                               └────────────────────┘    │
│  ┌────────────────────────┐                                     │
│  │    Codec Selector      │◄────────────────────┐              │
│  │  • Rule-based          │                      │              │
│  │  • Cost model          │                      │              │
│  │  • Adaptive learning   │                      │              │
│  └────┬───────────────────┘                      │              │
│       │                                           │              │
│       ▼                                           │              │
│  ┌────────────────────────┐          ┌───────────┴──────────┐  │
│  │  Compression Pipeline  │          │   Stats Collector    │  │
│  │  1. Analyze pattern    │─────────▶│  • Ratio history     │  │
│  │  2. Select codec       │          │  • Performance       │  │
│  │  3. Compress           │          │  • Pattern freq      │  │
│  │  4. Validate checksum  │          └──────────────────────┘  │
│  │  5. Record stats       │                                     │
│  └────────────────────────┘                                     │
│                                                                   │
└──────────────────────────────────────────────────────────────────┘

Integration Points

┌─────────────────────────────────────────────────────────────────┐
│                      Integration Points                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  1. RocksDB Storage Layer Integration                           │
│     ┌─────────────────────────────────────────────────┐        │
│     │  StorageEngine::put(key, value)                 │        │
│     │    │                                             │        │
│     │    ├──▶ serialize(value)                        │        │
│     │    ├──▶ compress(serialized)  ◄── NEW          │        │
│     │    ├──▶ encrypt(compressed)   [if enabled]     │        │
│     │    └──▶ db.put(key, encrypted)                 │        │
│     │                                                  │        │
│     │  StorageEngine::get(key)                        │        │
│     │    │                                             │        │
│     │    ├──▶ db.get(key)                             │        │
│     │    ├──▶ decrypt(data)         [if enabled]     │        │
│     │    ├──▶ decompress(decrypted) ◄── NEW          │        │
│     │    └──▶ deserialize(decompressed)              │        │
│     └─────────────────────────────────────────────────┘        │
│                                                                  │
│  2. Apache Arrow Integration                                    │
│     ┌─────────────────────────────────────────────────┐        │
│     │  scan_table_columnar(table) -> RecordBatch      │        │
│     │    │                                             │        │
│     │    ├──▶ collect tuples                          │        │
│     │    ├──▶ convert to Arrow columns                │        │
│     │    ├──▶ compress_column(col) ◄── NEW           │        │
│     │    │     • FSST for strings                     │        │
│     │    │     • ALP for floats                       │        │
│     │    │     • Dictionary for low cardinality       │        │
│     │    └──▶ return RecordBatch                      │        │
│     └─────────────────────────────────────────────────┘        │
│                                                                  │
│  3. Tuple Serialization Integration                             │
│     ┌─────────────────────────────────────────────────┐        │
│     │  insert_tuple(table, tuple) -> row_id           │        │
│     │    │                                             │        │
│     │    ├──▶ bincode::serialize(tuple)               │        │
│     │    ├──▶ compression.compress(bytes) ◄── NEW    │        │
│     │    │     • Per-column analysis                  │        │
│     │    │     • Metadata stored in block             │        │
│     │    └──▶ storage.put(key, compressed)           │        │
│     └─────────────────────────────────────────────────┘        │
│                                                                  │
│  4. Configuration Integration                                   │
│     ┌─────────────────────────────────────────────────┐        │
│     │  Config::storage::compression ◄── ENHANCED     │        │
│     │  • None / Lz4 / Zstd (existing)                │        │
│     │  • Auto (new - uses pattern-based selection)   │        │
│     │  • Custom { codecs: [...], strategy: Auto }    │        │
│     └─────────────────────────────────────────────────┘        │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

API Design

Compression Traits

//! Compression codec trait definitions
//! Location: src/storage/compression/mod.rs

use serde::{Deserialize, Serialize};
use std::time::Duration;

/// Compression algorithm identifier
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum CompressionAlgorithm {
    /// No compression
    None,
    /// LZ4 - fast general purpose (existing, via RocksDB)
    Lz4,
    /// Zstd - high ratio general purpose (existing, via RocksDB)
    Zstd,
    /// FSST - Fast Static Symbol Table for strings (NEW)
    Fsst,
    /// ALP - Adaptive Lossless floating-Point (NEW)
    Alp,
    /// Dictionary encoding - low cardinality data
    Dictionary,
    /// RLE - Run Length Encoding
    Rle,
    /// Delta encoding - sequential numeric data
    Delta,
    /// Automatic selection based on data pattern
    Auto,
}

/// Data pattern detected by analyzer
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DataPattern {
    /// Random/incompressible data
    Random,
    /// String data with repeated patterns
    StringData,
    /// Numeric integer data
    IntegerData,
    /// Floating point data
    FloatingPointData,
    /// Low cardinality (few unique values)
    LowCardinality,
    /// Sequential/sorted data
    Sequential,
    /// Time-series data
    TimeSeries,
    /// JSON/structured text
    StructuredText,
}

/// Codec performance characteristics
#[derive(Debug, Clone)]
pub struct CodecCharacteristics {
    /// Compression speed (MB/s)
    pub compress_speed_mbps: f64,
    /// Decompression speed (MB/s)
    pub decompress_speed_mbps: f64,
    /// Typical compression ratio
    pub typical_ratio: f64,
    /// CPU overhead percentage (0.0-1.0)
    pub cpu_overhead: f64,
    /// Best for pattern
    pub best_for: Vec<DataPattern>,
}

/// Main compression codec trait
pub trait CompressionCodec: Send + Sync {
    /// Get codec algorithm identifier
    fn algorithm(&self) -> CompressionAlgorithm;

    /// Compress data
    fn compress(&self, data: &[u8]) -> Result<Vec<u8>>;

    /// Decompress data
    fn decompress(&self, compressed: &[u8], original_size: Option<usize>) -> Result<Vec<u8>>;

    /// Get codec characteristics
    fn characteristics(&self) -> CodecCharacteristics;

    /// Estimate compressed size without actually compressing
    fn estimate_compressed_size(&self, data: &[u8]) -> usize {
        let ratio = self.characteristics().typical_ratio;
        ((data.len() as f64 / ratio).ceil() as usize).max(128)
    }

    /// Check if codec can handle this data pattern efficiently
    fn supports_pattern(&self, pattern: DataPattern) -> bool {
        self.characteristics().best_for.contains(&pattern)
    }
}

/// Compressed data block with metadata
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressedBlock {
    /// Algorithm used for compression
    pub algorithm: CompressionAlgorithm,
    /// Version of the algorithm
    pub version: u16,
    /// Original uncompressed size
    pub original_size: usize,
    /// Compressed data
    pub data: Vec<u8>,
    /// CRC32 checksum of original data
    pub checksum: u32,
    /// Compression timestamp
    pub compressed_at: u64,
    /// Data pattern detected
    pub pattern: DataPattern,
}

impl CompressedBlock {
    /// Get compression ratio
    pub fn compression_ratio(&self) -> f64 {
        self.original_size as f64 / self.data.len() as f64
    }

    /// Get space savings percentage
    pub fn space_savings_pct(&self) -> f64 {
        (1.0 - (self.data.len() as f64 / self.original_size as f64)) * 100.0
    }
}

Codec Implementations

//! FSST Codec Implementation
//! Location: src/storage/compression/fsst.rs

use super::{CompressionAlgorithm, CompressionCodec, CodecCharacteristics, DataPattern};
use crate::error::Result;

/// FSST (Fast Static Symbol Table) compression codec
///
/// FSST is a string compression algorithm that builds a symbol table
/// of frequent byte sequences and replaces them with single-byte codes.
/// Excellent for string data with repeated patterns (logs, URLs, JSON).
///
/// Reference: "FSST: Fast Static Symbol Table Compression" (DuckDB)
/// Patent Status: Open source, MIT licensed
pub struct FsstCodec {
    // Configuration
    symbol_table_size: usize,
    min_pattern_length: usize,
}

impl FsstCodec {
    pub fn new() -> Self {
        Self {
            symbol_table_size: 256,    // Single-byte codes
            min_pattern_length: 3,     // Minimum pattern to compress
        }
    }

    /// Build symbol table from sample data
    fn build_symbol_table(&self, data: &[u8]) -> SymbolTable {
        // Implementation:
        // 1. Count n-gram frequencies (n=3..8)
        // 2. Select top 256 most frequent patterns
        // 3. Build encoding/decoding tables
        todo!("Build FSST symbol table")
    }

    /// Compress data using symbol table
    fn compress_with_table(&self, data: &[u8], table: &SymbolTable) -> Vec<u8> {
        // Implementation:
        // 1. Serialize symbol table (header)
        // 2. Replace patterns with single-byte codes
        // 3. Emit unmatched bytes literally
        todo!("FSST compression")
    }
}

impl CompressionCodec for FsstCodec {
    fn algorithm(&self) -> CompressionAlgorithm {
        CompressionAlgorithm::Fsst
    }

    fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
        // Build symbol table from data
        let table = self.build_symbol_table(data);
        // Compress using table
        Ok(self.compress_with_table(data, &table))
    }

    fn decompress(&self, compressed: &[u8], _original_size: Option<usize>) -> Result<Vec<u8>> {
        // Implementation:
        // 1. Deserialize symbol table from header
        // 2. Decode byte-by-byte, expanding symbols
        todo!("FSST decompression")
    }

    fn characteristics(&self) -> CodecCharacteristics {
        CodecCharacteristics {
            compress_speed_mbps: 500.0,
            decompress_speed_mbps: 2000.0,
            typical_ratio: 5.0,  // 5x better than Zstd for strings
            cpu_overhead: 0.04,
            best_for: vec![
                DataPattern::StringData,
                DataPattern::StructuredText,
            ],
        }
    }
}

struct SymbolTable {
    // Symbol encoding table: pattern -> code
    encoding: Vec<(Vec<u8>, u8)>,
    // Symbol decoding table: code -> pattern
    decoding: Vec<Vec<u8>>,
}
//! ALP Codec Implementation
//! Location: src/storage/compression/alp.rs

use super::{CompressionAlgorithm, CompressionCodec, CodecCharacteristics, DataPattern};
use crate::error::Result;

/// ALP (Adaptive Lossless floating-Point) compression codec
///
/// ALP is a floating-point compression algorithm that exploits the structure
/// of floating-point data to achieve better compression than general-purpose
/// algorithms.
///
/// Reference: "ALP: Adaptive Lossless floating-Point Compression" (DuckDB)
/// Patent Status: Open source
pub struct AlpCodec {
    // Configuration
    use_exceptions: bool,
}

impl AlpCodec {
    pub fn new() -> Self {
        Self {
            use_exceptions: true,
        }
    }

    /// Analyze floating point data and determine encoding parameters
    fn analyze_floats(&self, data: &[u8]) -> AlpParameters {
        // Implementation:
        // 1. Parse as f32/f64 array
        // 2. Detect common exponent
        // 3. Determine precision requirements
        // 4. Calculate optimal encoding
        todo!("ALP analysis")
    }
}

impl CompressionCodec for AlpCodec {
    fn algorithm(&self) -> CompressionAlgorithm {
        CompressionAlgorithm::Alp
    }

    fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
        // Implementation:
        // 1. Analyze data to get parameters
        // 2. Encode parameters in header
        // 3. Encode mantissas with common exponent
        // 4. Handle exceptions (if enabled)
        todo!("ALP compression")
    }

    fn decompress(&self, compressed: &[u8], original_size: Option<usize>) -> Result<Vec<u8>> {
        // Implementation:
        // 1. Read parameters from header
        // 2. Decode mantissas
        // 3. Apply exceptions
        // 4. Reconstruct floats
        todo!("ALP decompression")
    }

    fn characteristics(&self) -> CodecCharacteristics {
        CodecCharacteristics {
            compress_speed_mbps: 600.0,
            decompress_speed_mbps: 1800.0,
            typical_ratio: 3.0,  // 3x better than Zstd for floats
            cpu_overhead: 0.03,
            best_for: vec![
                DataPattern::FloatingPointData,
                DataPattern::TimeSeries,
            ],
        }
    }
}

struct AlpParameters {
    common_exponent: i32,
    precision_bits: u8,
    exception_count: usize,
}

Compression Manager API

//! Compression Manager - Main API
//! Location: src/storage/compression/manager.rs

use super::{CompressionAlgorithm, CompressionCodec, CompressedBlock, DataPattern};
use crate::error::Result;
use std::sync::Arc;

/// Compression manager configuration
#[derive(Debug, Clone)]
pub struct CompressionConfig {
    /// Enable compression
    pub enabled: bool,
    /// Default algorithm (Auto for automatic selection)
    pub default_algorithm: CompressionAlgorithm,
    /// Enable automatic codec switching
    pub auto_switch: bool,
    /// Enable statistics collection
    pub collect_stats: bool,
    /// Minimum data size to compress (bytes)
    pub min_compress_size: usize,
    /// Target compression ratio (for auto mode)
    pub target_ratio: f64,
    /// Maximum compression time (microseconds)
    pub max_compress_time_us: u64,
}

impl Default for CompressionConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            default_algorithm: CompressionAlgorithm::Auto,
            auto_switch: true,
            collect_stats: true,
            min_compress_size: 256,  // Don't compress tiny blocks
            target_ratio: 2.0,       // Aim for 2x minimum
            max_compress_time_us: 1000, // 1ms max
        }
    }
}

/// Main compression manager
pub struct CompressionManager {
    config: CompressionConfig,
    codecs: Arc<CodecRegistry>,
    analyzer: Arc<PatternAnalyzer>,
    selector: Arc<CodecSelector>,
    stats: Arc<StatsCollector>,
}

impl CompressionManager {
    pub fn new(config: CompressionConfig) -> Result<Self> {
        Ok(Self {
            config,
            codecs: Arc::new(CodecRegistry::new()),
            analyzer: Arc::new(PatternAnalyzer::new()),
            selector: Arc::new(CodecSelector::new()),
            stats: Arc::new(StatsCollector::new()),
        })
    }

    /// Compress data with automatic codec selection
    pub fn compress(&self, data: &[u8]) -> Result<CompressedBlock> {
        // Skip compression for small data
        if data.len() < self.config.min_compress_size {
            return self.create_uncompressed_block(data);
        }

        // Analyze data pattern
        let pattern = self.analyzer.analyze(data)?;

        // Select best codec
        let algorithm = if self.config.default_algorithm == CompressionAlgorithm::Auto {
            self.selector.select_for_pattern(pattern, data)?
        } else {
            self.config.default_algorithm
        };

        // Get codec and compress
        let codec = self.codecs.get(algorithm)?;
        let start = std::time::Instant::now();
        let compressed = codec.compress(data)?;
        let duration = start.elapsed();

        // Check if compression was beneficial
        if compressed.len() >= data.len() {
            return self.create_uncompressed_block(data);
        }

        // Create block with metadata
        let block = CompressedBlock {
            algorithm,
            version: 1,
            original_size: data.len(),
            data: compressed,
            checksum: crc32fast::hash(data),
            compressed_at: current_timestamp(),
            pattern,
        };

        // Record statistics
        if self.config.collect_stats {
            self.stats.record(algorithm, pattern, &block, duration);
        }

        Ok(block)
    }

    /// Compress with specific algorithm
    pub fn compress_with(
        &self,
        data: &[u8],
        algorithm: CompressionAlgorithm,
    ) -> Result<CompressedBlock> {
        let codec = self.codecs.get(algorithm)?;
        let compressed = codec.compress(data)?;

        Ok(CompressedBlock {
            algorithm,
            version: 1,
            original_size: data.len(),
            data: compressed,
            checksum: crc32fast::hash(data),
            compressed_at: current_timestamp(),
            pattern: self.analyzer.analyze(data)?,
        })
    }

    /// Decompress data
    pub fn decompress(&self, block: &CompressedBlock) -> Result<Vec<u8>> {
        // Handle uncompressed blocks
        if block.algorithm == CompressionAlgorithm::None {
            return Ok(block.data.clone());
        }

        // Get codec and decompress
        let codec = self.codecs.get(block.algorithm)?;
        let decompressed = codec.decompress(&block.data, Some(block.original_size))?;

        // Verify checksum
        let checksum = crc32fast::hash(&decompressed);
        if checksum != block.checksum {
            return Err(Error::ChecksumMismatch {
                expected: block.checksum,
                actual: checksum,
            });
        }

        Ok(decompressed)
    }

    /// Get compression statistics
    pub fn stats(&self) -> CompressionStats {
        self.stats.snapshot()
    }

    fn create_uncompressed_block(&self, data: &[u8]) -> Result<CompressedBlock> {
        Ok(CompressedBlock {
            algorithm: CompressionAlgorithm::None,
            version: 1,
            original_size: data.len(),
            data: data.to_vec(),
            checksum: crc32fast::hash(data),
            compressed_at: current_timestamp(),
            pattern: DataPattern::Random,
        })
    }
}

/// Codec registry
struct CodecRegistry {
    codecs: HashMap<CompressionAlgorithm, Arc<dyn CompressionCodec>>,
}

impl CodecRegistry {
    fn new() -> Self {
        let mut codecs: HashMap<CompressionAlgorithm, Arc<dyn CompressionCodec>> = HashMap::new();

        // Register all codecs
        codecs.insert(CompressionAlgorithm::Fsst, Arc::new(FsstCodec::new()));
        codecs.insert(CompressionAlgorithm::Alp, Arc::new(AlpCodec::new()));
        codecs.insert(CompressionAlgorithm::Dictionary, Arc::new(DictionaryCodec::new()));
        codecs.insert(CompressionAlgorithm::Rle, Arc::new(RleCodec::new()));
        codecs.insert(CompressionAlgorithm::Delta, Arc::new(DeltaCodec::new()));
        // Note: LZ4/Zstd handled by RocksDB layer

        Self { codecs }
    }

    fn get(&self, algorithm: CompressionAlgorithm) -> Result<Arc<dyn CompressionCodec>> {
        self.codecs
            .get(&algorithm)
            .cloned()
            .ok_or(Error::UnsupportedAlgorithm(algorithm))
    }
}

Pattern Analyzer

//! Pattern Analyzer - Data pattern detection
//! Location: src/storage/compression/analyzer.rs

use super::DataPattern;
use crate::error::Result;

/// Pattern analyzer using SIMD acceleration
pub struct PatternAnalyzer {
    sample_size: usize,
}

impl PatternAnalyzer {
    pub fn new() -> Self {
        Self {
            sample_size: 4096, // Sample first 4KB
        }
    }

    /// Analyze data and detect pattern
    pub fn analyze(&self, data: &[u8]) -> Result<DataPattern> {
        let sample = self.get_sample(data);

        // Run multiple detection heuristics
        let is_string = self.detect_string_data(sample);
        let is_float = self.detect_float_data(sample);
        let is_integer = self.detect_integer_data(sample);
        let cardinality = self.calculate_cardinality(sample);
        let entropy = self.calculate_entropy(sample);
        let is_sequential = self.detect_sequential(sample);

        // Pattern selection logic
        if entropy < 0.3 {
            return Ok(DataPattern::Random);
        }

        if cardinality < 256 && cardinality as f64 / sample.len() as f64 < 0.1 {
            return Ok(DataPattern::LowCardinality);
        }

        if is_sequential {
            return Ok(DataPattern::Sequential);
        }

        if is_float {
            return Ok(DataPattern::FloatingPointData);
        }

        if is_integer {
            return Ok(DataPattern::IntegerData);
        }

        if is_string {
            // Check if it's JSON-like
            if self.looks_like_json(sample) {
                return Ok(DataPattern::StructuredText);
            }
            return Ok(DataPattern::StringData);
        }

        Ok(DataPattern::Random)
    }

    fn get_sample<'a>(&self, data: &'a [u8]) -> &'a [u8] {
        if data.len() <= self.sample_size {
            data
        } else {
            &data[..self.sample_size]
        }
    }

    fn detect_string_data(&self, data: &[u8]) -> bool {
        // Check for printable ASCII/UTF-8
        let printable_count = data.iter()
            .filter(|&&b| (b >= 32 && b <= 126) || b == b'\n' || b == b'\t')
            .count();
        printable_count as f64 / data.len() as f64 > 0.8
    }

    fn detect_float_data(&self, data: &[u8]) -> bool {
        // Check if data aligns to f32/f64 boundaries and has float patterns
        if data.len() % 8 == 0 || data.len() % 4 == 0 {
            // Try to interpret as floats and check for valid ranges
            // This is a heuristic - not perfect
            let floats_count = (data.len() / 8).min(100);
            let valid = unsafe {
                std::slice::from_raw_parts(data.as_ptr() as *const f64, floats_count)
            }.iter()
                .filter(|&&f| f.is_finite() && f.abs() < 1e10)
                .count();

            valid as f64 / floats_count as f64 > 0.7
        } else {
            false
        }
    }

    fn detect_integer_data(&self, data: &[u8]) -> bool {
        // Similar heuristic for integers
        if data.len() % 8 == 0 {
            // Check for reasonable integer ranges
            true // Simplified
        } else {
            false
        }
    }

    fn calculate_cardinality(&self, data: &[u8]) -> usize {
        let mut seen = std::collections::HashSet::new();
        for &byte in data.iter().take(1024) {
            seen.insert(byte);
        }
        seen.len()
    }

    fn calculate_entropy(&self, data: &[u8]) -> f64 {
        let mut counts = [0u32; 256];
        for &byte in data {
            counts[byte as usize] += 1;
        }

        let len = data.len() as f64;
        let mut entropy = 0.0;
        for &count in &counts {
            if count > 0 {
                let p = count as f64 / len;
                entropy -= p * p.log2();
            }
        }
        entropy / 8.0 // Normalize to 0-1
    }

    fn detect_sequential(&self, data: &[u8]) -> bool {
        // Check for monotonic sequences
        if data.len() < 16 {
            return false;
        }

        let diffs: Vec<i16> = data.windows(2)
            .map(|w| w[1] as i16 - w[0] as i16)
            .collect();

        // Check if most differences are the same
        let mut diff_counts = std::collections::HashMap::new();
        for &diff in &diffs {
            *diff_counts.entry(diff).or_insert(0) += 1;
        }

        let max_count = diff_counts.values().max().unwrap_or(&0);
        *max_count as f64 / diffs.len() as f64 > 0.7
    }

    fn looks_like_json(&self, data: &[u8]) -> bool {
        // Simple heuristic: check for { } [ ] : , "
        let json_chars = data.iter()
            .filter(|&&b| matches!(b, b'{' | b'}' | b'[' | b']' | b':' | b',' | b'"'))
            .count();
        json_chars as f64 / data.len() as f64 > 0.1
    }
}

Codec Selector

//! Codec Selector - Automatic codec selection
//! Location: src/storage/compression/selector.rs

use super::{CompressionAlgorithm, DataPattern};
use crate::error::Result;

/// Codec selector with rule-based and adaptive logic
pub struct CodecSelector {
    // Historical performance data
    history: Arc<Mutex<SelectionHistory>>,
}

impl CodecSelector {
    pub fn new() -> Self {
        Self {
            history: Arc::new(Mutex::new(SelectionHistory::new())),
        }
    }

    /// Select best codec for data pattern
    pub fn select_for_pattern(&self, pattern: DataPattern, data: &[u8]) -> Result<CompressionAlgorithm> {
        // Rule-based selection
        let algorithm = match pattern {
            DataPattern::StringData => CompressionAlgorithm::Fsst,
            DataPattern::StructuredText => CompressionAlgorithm::Fsst,
            DataPattern::FloatingPointData => CompressionAlgorithm::Alp,
            DataPattern::TimeSeries => CompressionAlgorithm::Alp,
            DataPattern::LowCardinality => CompressionAlgorithm::Dictionary,
            DataPattern::Sequential => CompressionAlgorithm::Delta,
            DataPattern::IntegerData => {
                // Check if sequential
                if self.is_sequential_integers(data) {
                    CompressionAlgorithm::Delta
                } else {
                    CompressionAlgorithm::Dictionary
                }
            }
            DataPattern::Random => CompressionAlgorithm::Lz4,
        };

        // Check historical performance and potentially override
        if let Some(better) = self.check_history(pattern, algorithm) {
            return Ok(better);
        }

        Ok(algorithm)
    }

    fn is_sequential_integers(&self, data: &[u8]) -> bool {
        // Heuristic for detecting sequential integer patterns
        if data.len() < 32 {
            return false;
        }

        // Sample a few integers and check differences
        let samples = data.len() / 8;
        if samples < 4 {
            return false;
        }

        let integers: Vec<i64> = (0..samples.min(100))
            .map(|i| {
                let offset = i * 8;
                i64::from_le_bytes([
                    data[offset], data[offset+1], data[offset+2], data[offset+3],
                    data[offset+4], data[offset+5], data[offset+6], data[offset+7],
                ])
            })
            .collect();

        // Check if differences are consistent
        let diffs: Vec<i64> = integers.windows(2)
            .map(|w| w[1] - w[0])
            .collect();

        let avg_diff = diffs.iter().sum::<i64>() as f64 / diffs.len() as f64;
        let variance = diffs.iter()
            .map(|&d| (d as f64 - avg_diff).powi(2))
            .sum::<f64>() / diffs.len() as f64;

        // Low variance indicates sequential
        variance < 100.0
    }

    fn check_history(&self, pattern: DataPattern, current: CompressionAlgorithm) -> Option<CompressionAlgorithm> {
        let history = self.history.lock().unwrap();
        history.get_better_algorithm(pattern, current)
    }
}

struct SelectionHistory {
    // Pattern -> Algorithm -> Performance metrics
    performance: HashMap<DataPattern, HashMap<CompressionAlgorithm, PerformanceMetrics>>,
}

impl SelectionHistory {
    fn new() -> Self {
        Self {
            performance: HashMap::new(),
        }
    }

    fn get_better_algorithm(&self, pattern: DataPattern, current: CompressionAlgorithm) -> Option<CompressionAlgorithm> {
        if let Some(algorithms) = self.performance.get(&pattern) {
            // Find algorithm with best ratio and acceptable speed
            let current_perf = algorithms.get(&current)?;

            for (algo, perf) in algorithms {
                if perf.avg_ratio > current_perf.avg_ratio * 1.2 &&
                   perf.avg_compress_us < current_perf.avg_compress_us * 2.0 {
                    return Some(*algo);
                }
            }
        }
        None
    }
}

#[derive(Debug, Clone)]
struct PerformanceMetrics {
    avg_ratio: f64,
    avg_compress_us: f64,
    avg_decompress_us: f64,
    sample_count: usize,
}

Configuration Schema