Skip to content

Compression Integration Architecture - Part 2 of 4

Configuration & Performance Monitoring

Navigation: Index | ← Part 1 | Part 2 | Part 3 →


TOML Configuration

# heliosdb-lite.toml

[storage]
path = "./data"
memory_only = false
cache_size = 536870912  # 512 MB

# Compression configuration (ENHANCED)
[storage.compression]
enabled = true

# Algorithm options:
# - "none" - no compression
# - "lz4" - fast general purpose (via RocksDB)
# - "zstd" - high ratio general purpose (via RocksDB)
# - "auto" - automatic selection based on data pattern (RECOMMENDED)
algorithm = "auto"

# Auto-selection configuration (only used when algorithm = "auto")
[storage.compression.auto]
# Enable automatic codec switching based on data patterns
auto_switch = true

# Minimum data size to compress (bytes)
# Data smaller than this will not be compressed
min_compress_size = 256

# Target compression ratio
# Codec selector aims for this minimum ratio
target_ratio = 2.0

# Maximum compression time (microseconds)
# Fall back to faster codec if exceeded
max_compress_time_us = 1000

# Enable statistics collection
collect_stats = true

# Enabled codecs for auto mode
# Codecs will be selected based on data pattern
enabled_codecs = ["fsst", "alp", "dictionary", "rle", "delta", "lz4", "zstd"]

# Per-codec configuration
[storage.compression.codecs.fsst]
enabled = true
symbol_table_size = 256
min_pattern_length = 3

[storage.compression.codecs.alp]
enabled = true
use_exceptions = true

[storage.compression.codecs.dictionary]
enabled = true
max_dictionary_size = 65536  # 64 KB

[storage.compression.codecs.rle]
enabled = true
min_run_length = 3

[storage.compression.codecs.delta]
enabled = true
encoding = "varint"  # or "fixed"

# Performance tuning
[performance]
worker_threads = 8
query_timeout_secs = 300
simd_enabled = true
parallel_query = true

# Compression performance tuning
[performance.compression]
# Enable parallel compression for large data
parallel_threshold_bytes = 1048576  # 1 MB
# Number of threads for parallel compression
compression_threads = 4

Rust Configuration Structures

//! Configuration structures
//! Location: src/config.rs (ENHANCED)

use serde::{Deserialize, Serialize};

/// Storage configuration (ENHANCED)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageConfig {
    pub path: Option<PathBuf>,
    pub memory_only: bool,
    pub wal_enabled: bool,
    pub cache_size: usize,
    /// Compression configuration (ENHANCED)
    pub compression: CompressionConfig,
}

/// Compression configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressionConfig {
    /// Enable compression
    pub enabled: bool,
    /// Compression algorithm
    pub algorithm: CompressionAlgorithmConfig,
    /// Auto-selection configuration
    #[serde(default)]
    pub auto: AutoCompressionConfig,
    /// Per-codec configuration
    #[serde(default)]
    pub codecs: CodecConfigs,
}

impl Default for CompressionConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            algorithm: CompressionAlgorithmConfig::Auto,
            auto: AutoCompressionConfig::default(),
            codecs: CodecConfigs::default(),
        }
    }
}

/// Compression algorithm configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CompressionAlgorithmConfig {
    None,
    Lz4,
    Zstd,
    Fsst,
    Alp,
    Dictionary,
    Rle,
    Delta,
    Auto,
}

/// Auto-selection configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AutoCompressionConfig {
    pub auto_switch: bool,
    pub min_compress_size: usize,
    pub target_ratio: f64,
    pub max_compress_time_us: u64,
    pub collect_stats: bool,
    pub enabled_codecs: Vec<String>,
}

impl Default for AutoCompressionConfig {
    fn default() -> Self {
        Self {
            auto_switch: true,
            min_compress_size: 256,
            target_ratio: 2.0,
            max_compress_time_us: 1000,
            collect_stats: true,
            enabled_codecs: vec![
                "fsst".to_string(),
                "alp".to_string(),
                "dictionary".to_string(),
                "rle".to_string(),
                "delta".to_string(),
                "lz4".to_string(),
                "zstd".to_string(),
            ],
        }
    }
}

/// Per-codec configuration
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CodecConfigs {
    #[serde(default)]
    pub fsst: FsstConfig,
    #[serde(default)]
    pub alp: AlpConfig,
    #[serde(default)]
    pub dictionary: DictionaryConfig,
    #[serde(default)]
    pub rle: RleConfig,
    #[serde(default)]
    pub delta: DeltaConfig,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FsstConfig {
    pub enabled: bool,
    pub symbol_table_size: usize,
    pub min_pattern_length: usize,
}

impl Default for FsstConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            symbol_table_size: 256,
            min_pattern_length: 3,
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlpConfig {
    pub enabled: bool,
    pub use_exceptions: bool,
}

impl Default for AlpConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            use_exceptions: true,
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DictionaryConfig {
    pub enabled: bool,
    pub max_dictionary_size: usize,
}

impl Default for DictionaryConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            max_dictionary_size: 65536,
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RleConfig {
    pub enabled: bool,
    pub min_run_length: usize,
}

impl Default for RleConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            min_run_length: 3,
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeltaConfig {
    pub enabled: bool,
    pub encoding: DeltaEncoding,
}

impl Default for DeltaConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            encoding: DeltaEncoding::Varint,
        }
    }
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum DeltaEncoding {
    Varint,
    Fixed,
}

Performance Monitoring Design

Metrics Collection

//! Statistics collector for compression operations
//! Location: src/storage/compression/stats.rs

use super::{CompressionAlgorithm, DataPattern};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

/// Compression statistics
#[derive(Debug, Clone)]
pub struct CompressionStats {
    /// Total bytes compressed
    pub total_bytes_compressed: u64,
    /// Total bytes decompressed
    pub total_bytes_decompressed: u64,
    /// Total compression operations
    pub total_compressions: u64,
    /// Total decompression operations
    pub total_decompressions: u64,
    /// Total time spent compressing
    pub total_compress_time: Duration,
    /// Total time spent decompressing
    pub total_decompress_time: Duration,
    /// Average compression ratio
    pub avg_compression_ratio: f64,
    /// Per-algorithm statistics
    pub by_algorithm: HashMap<CompressionAlgorithm, AlgorithmStats>,
    /// Per-pattern statistics
    pub by_pattern: HashMap<DataPattern, PatternStats>,
}

#[derive(Debug, Clone)]
pub struct AlgorithmStats {
    pub algorithm: CompressionAlgorithm,
    pub use_count: u64,
    pub total_input_bytes: u64,
    pub total_output_bytes: u64,
    pub avg_ratio: f64,
    pub avg_compress_time_us: f64,
    pub avg_decompress_time_us: f64,
    pub min_ratio: f64,
    pub max_ratio: f64,
}

#[derive(Debug, Clone)]
pub struct PatternStats {
    pub pattern: DataPattern,
    pub detection_count: u64,
    pub best_algorithm: CompressionAlgorithm,
    pub avg_ratio: f64,
}

/// Statistics collector
pub struct StatsCollector {
    stats: Arc<Mutex<CompressionStats>>,
    history_size: usize,
    history: Arc<Mutex<VecDeque<CompressionEvent>>>,
}

impl StatsCollector {
    pub fn new(history_size: usize) -> Self {
        Self {
            stats: Arc::new(Mutex::new(CompressionStats::default())),
            history_size,
            history: Arc::new(Mutex::new(VecDeque::new())),
        }
    }

    /// Record compression event
    pub fn record(
        &self,
        algorithm: CompressionAlgorithm,
        pattern: DataPattern,
        block: &CompressedBlock,
        duration: Duration,
    ) {
        let mut stats = self.stats.lock().unwrap();

        // Update totals
        stats.total_bytes_compressed += block.original_size as u64;
        stats.total_compressions += 1;
        stats.total_compress_time += duration;

        // Update per-algorithm stats
        let algo_stats = stats.by_algorithm
            .entry(algorithm)
            .or_insert_with(|| AlgorithmStats::new(algorithm));
        algo_stats.record_compression(block, duration);

        // Update per-pattern stats
        let pattern_stats = stats.by_pattern
            .entry(pattern)
            .or_insert_with(|| PatternStats::new(pattern));
        pattern_stats.record(algorithm, block.compression_ratio());

        // Update average ratio
        let total_compressions = stats.total_compressions as f64;
        let prev_avg = stats.avg_compression_ratio;
        stats.avg_compression_ratio =
            (prev_avg * (total_compressions - 1.0) + block.compression_ratio()) / total_compressions;

        // Add to history
        let mut history = self.history.lock().unwrap();
        history.push_back(CompressionEvent {
            timestamp: std::time::SystemTime::now(),
            algorithm,
            pattern,
            ratio: block.compression_ratio(),
            input_size: block.original_size,
            output_size: block.data.len(),
            duration,
        });

        // Trim history
        while history.len() > self.history_size {
            history.pop_front();
        }
    }

    /// Get current statistics snapshot
    pub fn snapshot(&self) -> CompressionStats {
        self.stats.lock().unwrap().clone()
    }

    /// Get recent compression events
    pub fn recent_events(&self, count: usize) -> Vec<CompressionEvent> {
        let history = self.history.lock().unwrap();
        history.iter()
            .rev()
            .take(count)
            .cloned()
            .collect()
    }

    /// Generate statistics report
    pub fn report(&self) -> String {
        let stats = self.snapshot();
        format!(
            "Compression Statistics:\n\
             Total Compressions: {}\n\
             Total Bytes Compressed: {:.2} MB\n\
             Average Compression Ratio: {:.2}x\n\
             Average Compress Time: {:.2} µs\n\
             \n\
             By Algorithm:\n{}\n\
             \n\
             By Pattern:\n{}",
            stats.total_compressions,
            stats.total_bytes_compressed as f64 / 1_000_000.0,
            stats.avg_compression_ratio,
            stats.total_compress_time.as_micros() as f64 / stats.total_compressions as f64,
            Self::format_algorithm_stats(&stats.by_algorithm),
            Self::format_pattern_stats(&stats.by_pattern),
        )
    }

    fn format_algorithm_stats(stats: &HashMap<CompressionAlgorithm, AlgorithmStats>) -> String {
        let mut lines = Vec::new();
        for (algo, stat) in stats {
            lines.push(format!(
                "  {:?}: {} uses, {:.2}x ratio, {:.0} µs",
                algo,
                stat.use_count,
                stat.avg_ratio,
                stat.avg_compress_time_us,
            ));
        }
        lines.join("\n")
    }

    fn format_pattern_stats(stats: &HashMap<DataPattern, PatternStats>) -> String {
        let mut lines = Vec::new();
        for (pattern, stat) in stats {
            lines.push(format!(
                "  {:?}: {} detections, best: {:?}, {:.2}x ratio",
                pattern,
                stat.detection_count,
                stat.best_algorithm,
                stat.avg_ratio,
            ));
        }
        lines.join("\n")
    }
}

#[derive(Debug, Clone)]
struct CompressionEvent {
    timestamp: std::time::SystemTime,
    algorithm: CompressionAlgorithm,
    pattern: DataPattern,
    ratio: f64,
    input_size: usize,
    output_size: usize,
    duration: Duration,
}

impl AlgorithmStats {
    fn new(algorithm: CompressionAlgorithm) -> Self {
        Self {
            algorithm,
            use_count: 0,
            total_input_bytes: 0,
            total_output_bytes: 0,
            avg_ratio: 0.0,
            avg_compress_time_us: 0.0,
            avg_decompress_time_us: 0.0,
            min_ratio: f64::MAX,
            max_ratio: 0.0,
        }
    }

    fn record_compression(&mut self, block: &CompressedBlock, duration: Duration) {
        self.use_count += 1;
        self.total_input_bytes += block.original_size as u64;
        self.total_output_bytes += block.data.len() as u64;

        let ratio = block.compression_ratio();
        self.min_ratio = self.min_ratio.min(ratio);
        self.max_ratio = self.max_ratio.max(ratio);

        // Update average ratio
        let n = self.use_count as f64;
        self.avg_ratio = (self.avg_ratio * (n - 1.0) + ratio) / n;

        // Update average time
        let time_us = duration.as_micros() as f64;
        self.avg_compress_time_us = (self.avg_compress_time_us * (n - 1.0) + time_us) / n;
    }
}

impl PatternStats {
    fn new(pattern: DataPattern) -> Self {
        Self {
            pattern,
            detection_count: 0,
            best_algorithm: CompressionAlgorithm::None,
            avg_ratio: 0.0,
        }
    }

    fn record(&mut self, algorithm: CompressionAlgorithm, ratio: f64) {
        self.detection_count += 1;

        // Update average ratio
        let n = self.detection_count as f64;
        self.avg_ratio = (self.avg_ratio * (n - 1.0) + ratio) / n;

        // Update best algorithm if this performed better
        if ratio > self.avg_ratio * 1.1 {
            self.best_algorithm = algorithm;
        }
    }
}

Monitoring Endpoints

//! Monitoring and metrics endpoints
//! Location: src/storage/compression/monitoring.rs

use super::stats::{CompressionStats, StatsCollector};
use std::sync::Arc;

/// Compression monitoring interface
pub struct CompressionMonitor {
    stats: Arc<StatsCollector>,
}

impl CompressionMonitor {
    pub fn new(stats: Arc<StatsCollector>) -> Self {
        Self { stats }
    }

    /// Get current metrics snapshot
    pub fn metrics(&self) -> CompressionMetrics {
        let stats = self.stats.snapshot();

        CompressionMetrics {
            total_compressions: stats.total_compressions,
            total_decompressions: stats.total_decompressions,
            total_bytes_in: stats.total_bytes_compressed,
            total_bytes_out: stats.total_bytes_compressed as f64 / stats.avg_compression_ratio,
            avg_ratio: stats.avg_compression_ratio,
            avg_compress_time_us: stats.total_compress_time.as_micros() as f64 / stats.total_compressions.max(1) as f64,
            avg_decompress_time_us: stats.total_decompress_time.as_micros() as f64 / stats.total_decompressions.max(1) as f64,
            algorithms: stats.by_algorithm.iter()
                .map(|(algo, stat)| AlgorithmMetric {
                    algorithm: format!("{:?}", algo),
                    use_count: stat.use_count,
                    avg_ratio: stat.avg_ratio,
                    avg_time_us: stat.avg_compress_time_us,
                })
                .collect(),
            patterns: stats.by_pattern.iter()
                .map(|(pattern, stat)| PatternMetric {
                    pattern: format!("{:?}", pattern),
                    detection_count: stat.detection_count,
                    best_algorithm: format!("{:?}", stat.best_algorithm),
                    avg_ratio: stat.avg_ratio,
                })
                .collect(),
        }
    }

    /// Get formatted text report
    pub fn report(&self) -> String {
        self.stats.report()
    }

    /// Get JSON metrics
    pub fn json_metrics(&self) -> serde_json::Value {
        let metrics = self.metrics();
        serde_json::to_value(&metrics).unwrap()
    }
}

#[derive(Debug, Clone, serde::Serialize)]
pub struct CompressionMetrics {
    pub total_compressions: u64,
    pub total_decompressions: u64,
    pub total_bytes_in: u64,
    pub total_bytes_out: f64,
    pub avg_ratio: f64,
    pub avg_compress_time_us: f64,
    pub avg_decompress_time_us: f64,
    pub algorithms: Vec<AlgorithmMetric>,
    pub patterns: Vec<PatternMetric>,
}

#[derive(Debug, Clone, serde::Serialize)]
pub struct AlgorithmMetric {
    pub algorithm: String,
    pub use_count: u64,
    pub avg_ratio: f64,
    pub avg_time_us: f64,
}

#[derive(Debug, Clone, serde::Serialize)]
pub struct PatternMetric {
    pub pattern: String,
    pub detection_count: u64,
    pub best_algorithm: String,
    pub avg_ratio: f64,
}

SQL Interface for Monitoring

-- View compression statistics
SELECT * FROM heliosdb_compression_stats;

-- Output:
-- algorithm | uses  | avg_ratio | avg_compress_us | avg_decompress_us | total_bytes_in | total_bytes_out
-- ----------|-------|-----------|-----------------|-------------------|----------------|----------------
-- FSST      | 15023 | 6.2       | 45.3            | 12.1              | 1.2 GB         | 195 MB
-- ALP       | 8891  | 3.8       | 52.1            | 18.7              | 800 MB         | 211 MB
-- Dictionary| 4532  | 12.3      | 28.4            | 8.9               | 450 MB         | 37 MB
-- RLE       | 1203  | 25.1      | 15.2            | 5.3               | 120 MB         | 4.8 MB
-- Delta     | 9874  | 8.9       | 22.7            | 9.1               | 950 MB         | 107 MB
-- LZ4       | 3421  | 2.1       | 18.3            | 6.2               | 340 MB         | 162 MB

-- View pattern detection statistics
SELECT * FROM heliosdb_pattern_stats;

-- Output:
-- pattern          | detections | best_algorithm | avg_ratio
-- -----------------|------------|----------------|----------
-- StringData       | 15023      | FSST           | 6.2
-- FloatingPointData| 8891       | ALP            | 3.8
-- LowCardinality   | 4532       | Dictionary     | 12.3
-- Sequential       | 9874       | Delta          | 8.9
-- Random           | 3421       | LZ4            | 2.1

-- View recent compression events
SELECT * FROM heliosdb_compression_events LIMIT 10;

-- Output:
-- timestamp           | algorithm  | pattern          | ratio | input_kb | output_kb | duration_us
-- --------------------|------------|------------------|-------|----------|-----------|------------
-- 2025-11-18 10:23:45 | FSST       | StringData       | 7.2   | 124      | 17        | 52
-- 2025-11-18 10:23:44 | ALP        | FloatingPointData| 4.1   | 256      | 62        | 68
-- 2025-11-18 10:23:43 | Dictionary | LowCardinality   | 15.3  | 64       | 4         | 31
-- ...

-- Get current compression configuration
SELECT * FROM heliosdb_config WHERE key LIKE 'compression.%';

-- Output:
-- key                                  | value
-- -------------------------------------|-------
-- compression.enabled                  | true
-- compression.algorithm                | auto
-- compression.auto.auto_switch         | true
-- compression.auto.min_compress_size   | 256
-- compression.auto.target_ratio        | 2.0
-- compression.auto.max_compress_time_us| 1000