Compression Integration Architecture - Part 2 of 4¶
Configuration & Performance Monitoring¶
Navigation: Index | ← Part 1 | Part 2 | Part 3 →
TOML Configuration¶
# heliosdb-lite.toml
[storage]
path = "./data"
memory_only = false
cache_size = 536870912 # 512 MB
# Compression configuration (ENHANCED)
[storage.compression]
enabled = true
# Algorithm options:
# - "none" - no compression
# - "lz4" - fast general purpose (via RocksDB)
# - "zstd" - high ratio general purpose (via RocksDB)
# - "auto" - automatic selection based on data pattern (RECOMMENDED)
algorithm = "auto"
# Auto-selection configuration (only used when algorithm = "auto")
[storage.compression.auto]
# Enable automatic codec switching based on data patterns
auto_switch = true
# Minimum data size to compress (bytes)
# Data smaller than this will not be compressed
min_compress_size = 256
# Target compression ratio
# Codec selector aims for this minimum ratio
target_ratio = 2.0
# Maximum compression time (microseconds)
# Fall back to faster codec if exceeded
max_compress_time_us = 1000
# Enable statistics collection
collect_stats = true
# Enabled codecs for auto mode
# Codecs will be selected based on data pattern
enabled_codecs = ["fsst", "alp", "dictionary", "rle", "delta", "lz4", "zstd"]
# Per-codec configuration
[storage.compression.codecs.fsst]
enabled = true
symbol_table_size = 256
min_pattern_length = 3
[storage.compression.codecs.alp]
enabled = true
use_exceptions = true
[storage.compression.codecs.dictionary]
enabled = true
max_dictionary_size = 65536 # 64 KB
[storage.compression.codecs.rle]
enabled = true
min_run_length = 3
[storage.compression.codecs.delta]
enabled = true
encoding = "varint" # or "fixed"
# Performance tuning
[performance]
worker_threads = 8
query_timeout_secs = 300
simd_enabled = true
parallel_query = true
# Compression performance tuning
[performance.compression]
# Enable parallel compression for large data
parallel_threshold_bytes = 1048576 # 1 MB
# Number of threads for parallel compression
compression_threads = 4
Rust Configuration Structures¶
//! Configuration structures
//! Location: src/config.rs (ENHANCED)
use serde::{Deserialize, Serialize};
/// Storage configuration (ENHANCED)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageConfig {
pub path: Option<PathBuf>,
pub memory_only: bool,
pub wal_enabled: bool,
pub cache_size: usize,
/// Compression configuration (ENHANCED)
pub compression: CompressionConfig,
}
/// Compression configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressionConfig {
/// Enable compression
pub enabled: bool,
/// Compression algorithm
pub algorithm: CompressionAlgorithmConfig,
/// Auto-selection configuration
#[serde(default)]
pub auto: AutoCompressionConfig,
/// Per-codec configuration
#[serde(default)]
pub codecs: CodecConfigs,
}
impl Default for CompressionConfig {
fn default() -> Self {
Self {
enabled: true,
algorithm: CompressionAlgorithmConfig::Auto,
auto: AutoCompressionConfig::default(),
codecs: CodecConfigs::default(),
}
}
}
/// Compression algorithm configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CompressionAlgorithmConfig {
None,
Lz4,
Zstd,
Fsst,
Alp,
Dictionary,
Rle,
Delta,
Auto,
}
/// Auto-selection configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AutoCompressionConfig {
pub auto_switch: bool,
pub min_compress_size: usize,
pub target_ratio: f64,
pub max_compress_time_us: u64,
pub collect_stats: bool,
pub enabled_codecs: Vec<String>,
}
impl Default for AutoCompressionConfig {
fn default() -> Self {
Self {
auto_switch: true,
min_compress_size: 256,
target_ratio: 2.0,
max_compress_time_us: 1000,
collect_stats: true,
enabled_codecs: vec![
"fsst".to_string(),
"alp".to_string(),
"dictionary".to_string(),
"rle".to_string(),
"delta".to_string(),
"lz4".to_string(),
"zstd".to_string(),
],
}
}
}
/// Per-codec configuration
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CodecConfigs {
#[serde(default)]
pub fsst: FsstConfig,
#[serde(default)]
pub alp: AlpConfig,
#[serde(default)]
pub dictionary: DictionaryConfig,
#[serde(default)]
pub rle: RleConfig,
#[serde(default)]
pub delta: DeltaConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FsstConfig {
pub enabled: bool,
pub symbol_table_size: usize,
pub min_pattern_length: usize,
}
impl Default for FsstConfig {
fn default() -> Self {
Self {
enabled: true,
symbol_table_size: 256,
min_pattern_length: 3,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlpConfig {
pub enabled: bool,
pub use_exceptions: bool,
}
impl Default for AlpConfig {
fn default() -> Self {
Self {
enabled: true,
use_exceptions: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DictionaryConfig {
pub enabled: bool,
pub max_dictionary_size: usize,
}
impl Default for DictionaryConfig {
fn default() -> Self {
Self {
enabled: true,
max_dictionary_size: 65536,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RleConfig {
pub enabled: bool,
pub min_run_length: usize,
}
impl Default for RleConfig {
fn default() -> Self {
Self {
enabled: true,
min_run_length: 3,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeltaConfig {
pub enabled: bool,
pub encoding: DeltaEncoding,
}
impl Default for DeltaConfig {
fn default() -> Self {
Self {
enabled: true,
encoding: DeltaEncoding::Varint,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum DeltaEncoding {
Varint,
Fixed,
}
Performance Monitoring Design¶
Metrics Collection¶
//! Statistics collector for compression operations
//! Location: src/storage/compression/stats.rs
use super::{CompressionAlgorithm, DataPattern};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
/// Compression statistics
#[derive(Debug, Clone)]
pub struct CompressionStats {
/// Total bytes compressed
pub total_bytes_compressed: u64,
/// Total bytes decompressed
pub total_bytes_decompressed: u64,
/// Total compression operations
pub total_compressions: u64,
/// Total decompression operations
pub total_decompressions: u64,
/// Total time spent compressing
pub total_compress_time: Duration,
/// Total time spent decompressing
pub total_decompress_time: Duration,
/// Average compression ratio
pub avg_compression_ratio: f64,
/// Per-algorithm statistics
pub by_algorithm: HashMap<CompressionAlgorithm, AlgorithmStats>,
/// Per-pattern statistics
pub by_pattern: HashMap<DataPattern, PatternStats>,
}
#[derive(Debug, Clone)]
pub struct AlgorithmStats {
pub algorithm: CompressionAlgorithm,
pub use_count: u64,
pub total_input_bytes: u64,
pub total_output_bytes: u64,
pub avg_ratio: f64,
pub avg_compress_time_us: f64,
pub avg_decompress_time_us: f64,
pub min_ratio: f64,
pub max_ratio: f64,
}
#[derive(Debug, Clone)]
pub struct PatternStats {
pub pattern: DataPattern,
pub detection_count: u64,
pub best_algorithm: CompressionAlgorithm,
pub avg_ratio: f64,
}
/// Statistics collector
pub struct StatsCollector {
stats: Arc<Mutex<CompressionStats>>,
history_size: usize,
history: Arc<Mutex<VecDeque<CompressionEvent>>>,
}
impl StatsCollector {
pub fn new(history_size: usize) -> Self {
Self {
stats: Arc::new(Mutex::new(CompressionStats::default())),
history_size,
history: Arc::new(Mutex::new(VecDeque::new())),
}
}
/// Record compression event
pub fn record(
&self,
algorithm: CompressionAlgorithm,
pattern: DataPattern,
block: &CompressedBlock,
duration: Duration,
) {
let mut stats = self.stats.lock().unwrap();
// Update totals
stats.total_bytes_compressed += block.original_size as u64;
stats.total_compressions += 1;
stats.total_compress_time += duration;
// Update per-algorithm stats
let algo_stats = stats.by_algorithm
.entry(algorithm)
.or_insert_with(|| AlgorithmStats::new(algorithm));
algo_stats.record_compression(block, duration);
// Update per-pattern stats
let pattern_stats = stats.by_pattern
.entry(pattern)
.or_insert_with(|| PatternStats::new(pattern));
pattern_stats.record(algorithm, block.compression_ratio());
// Update average ratio
let total_compressions = stats.total_compressions as f64;
let prev_avg = stats.avg_compression_ratio;
stats.avg_compression_ratio =
(prev_avg * (total_compressions - 1.0) + block.compression_ratio()) / total_compressions;
// Add to history
let mut history = self.history.lock().unwrap();
history.push_back(CompressionEvent {
timestamp: std::time::SystemTime::now(),
algorithm,
pattern,
ratio: block.compression_ratio(),
input_size: block.original_size,
output_size: block.data.len(),
duration,
});
// Trim history
while history.len() > self.history_size {
history.pop_front();
}
}
/// Get current statistics snapshot
pub fn snapshot(&self) -> CompressionStats {
self.stats.lock().unwrap().clone()
}
/// Get recent compression events
pub fn recent_events(&self, count: usize) -> Vec<CompressionEvent> {
let history = self.history.lock().unwrap();
history.iter()
.rev()
.take(count)
.cloned()
.collect()
}
/// Generate statistics report
pub fn report(&self) -> String {
let stats = self.snapshot();
format!(
"Compression Statistics:\n\
Total Compressions: {}\n\
Total Bytes Compressed: {:.2} MB\n\
Average Compression Ratio: {:.2}x\n\
Average Compress Time: {:.2} µs\n\
\n\
By Algorithm:\n{}\n\
\n\
By Pattern:\n{}",
stats.total_compressions,
stats.total_bytes_compressed as f64 / 1_000_000.0,
stats.avg_compression_ratio,
stats.total_compress_time.as_micros() as f64 / stats.total_compressions as f64,
Self::format_algorithm_stats(&stats.by_algorithm),
Self::format_pattern_stats(&stats.by_pattern),
)
}
fn format_algorithm_stats(stats: &HashMap<CompressionAlgorithm, AlgorithmStats>) -> String {
let mut lines = Vec::new();
for (algo, stat) in stats {
lines.push(format!(
" {:?}: {} uses, {:.2}x ratio, {:.0} µs",
algo,
stat.use_count,
stat.avg_ratio,
stat.avg_compress_time_us,
));
}
lines.join("\n")
}
fn format_pattern_stats(stats: &HashMap<DataPattern, PatternStats>) -> String {
let mut lines = Vec::new();
for (pattern, stat) in stats {
lines.push(format!(
" {:?}: {} detections, best: {:?}, {:.2}x ratio",
pattern,
stat.detection_count,
stat.best_algorithm,
stat.avg_ratio,
));
}
lines.join("\n")
}
}
#[derive(Debug, Clone)]
struct CompressionEvent {
timestamp: std::time::SystemTime,
algorithm: CompressionAlgorithm,
pattern: DataPattern,
ratio: f64,
input_size: usize,
output_size: usize,
duration: Duration,
}
impl AlgorithmStats {
fn new(algorithm: CompressionAlgorithm) -> Self {
Self {
algorithm,
use_count: 0,
total_input_bytes: 0,
total_output_bytes: 0,
avg_ratio: 0.0,
avg_compress_time_us: 0.0,
avg_decompress_time_us: 0.0,
min_ratio: f64::MAX,
max_ratio: 0.0,
}
}
fn record_compression(&mut self, block: &CompressedBlock, duration: Duration) {
self.use_count += 1;
self.total_input_bytes += block.original_size as u64;
self.total_output_bytes += block.data.len() as u64;
let ratio = block.compression_ratio();
self.min_ratio = self.min_ratio.min(ratio);
self.max_ratio = self.max_ratio.max(ratio);
// Update average ratio
let n = self.use_count as f64;
self.avg_ratio = (self.avg_ratio * (n - 1.0) + ratio) / n;
// Update average time
let time_us = duration.as_micros() as f64;
self.avg_compress_time_us = (self.avg_compress_time_us * (n - 1.0) + time_us) / n;
}
}
impl PatternStats {
fn new(pattern: DataPattern) -> Self {
Self {
pattern,
detection_count: 0,
best_algorithm: CompressionAlgorithm::None,
avg_ratio: 0.0,
}
}
fn record(&mut self, algorithm: CompressionAlgorithm, ratio: f64) {
self.detection_count += 1;
// Update average ratio
let n = self.detection_count as f64;
self.avg_ratio = (self.avg_ratio * (n - 1.0) + ratio) / n;
// Update best algorithm if this performed better
if ratio > self.avg_ratio * 1.1 {
self.best_algorithm = algorithm;
}
}
}
Monitoring Endpoints¶
//! Monitoring and metrics endpoints
//! Location: src/storage/compression/monitoring.rs
use super::stats::{CompressionStats, StatsCollector};
use std::sync::Arc;
/// Compression monitoring interface
pub struct CompressionMonitor {
stats: Arc<StatsCollector>,
}
impl CompressionMonitor {
pub fn new(stats: Arc<StatsCollector>) -> Self {
Self { stats }
}
/// Get current metrics snapshot
pub fn metrics(&self) -> CompressionMetrics {
let stats = self.stats.snapshot();
CompressionMetrics {
total_compressions: stats.total_compressions,
total_decompressions: stats.total_decompressions,
total_bytes_in: stats.total_bytes_compressed,
total_bytes_out: stats.total_bytes_compressed as f64 / stats.avg_compression_ratio,
avg_ratio: stats.avg_compression_ratio,
avg_compress_time_us: stats.total_compress_time.as_micros() as f64 / stats.total_compressions.max(1) as f64,
avg_decompress_time_us: stats.total_decompress_time.as_micros() as f64 / stats.total_decompressions.max(1) as f64,
algorithms: stats.by_algorithm.iter()
.map(|(algo, stat)| AlgorithmMetric {
algorithm: format!("{:?}", algo),
use_count: stat.use_count,
avg_ratio: stat.avg_ratio,
avg_time_us: stat.avg_compress_time_us,
})
.collect(),
patterns: stats.by_pattern.iter()
.map(|(pattern, stat)| PatternMetric {
pattern: format!("{:?}", pattern),
detection_count: stat.detection_count,
best_algorithm: format!("{:?}", stat.best_algorithm),
avg_ratio: stat.avg_ratio,
})
.collect(),
}
}
/// Get formatted text report
pub fn report(&self) -> String {
self.stats.report()
}
/// Get JSON metrics
pub fn json_metrics(&self) -> serde_json::Value {
let metrics = self.metrics();
serde_json::to_value(&metrics).unwrap()
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct CompressionMetrics {
pub total_compressions: u64,
pub total_decompressions: u64,
pub total_bytes_in: u64,
pub total_bytes_out: f64,
pub avg_ratio: f64,
pub avg_compress_time_us: f64,
pub avg_decompress_time_us: f64,
pub algorithms: Vec<AlgorithmMetric>,
pub patterns: Vec<PatternMetric>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct AlgorithmMetric {
pub algorithm: String,
pub use_count: u64,
pub avg_ratio: f64,
pub avg_time_us: f64,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct PatternMetric {
pub pattern: String,
pub detection_count: u64,
pub best_algorithm: String,
pub avg_ratio: f64,
}
SQL Interface for Monitoring¶
-- View compression statistics
SELECT * FROM heliosdb_compression_stats;
-- Output:
-- algorithm | uses | avg_ratio | avg_compress_us | avg_decompress_us | total_bytes_in | total_bytes_out
-- ----------|-------|-----------|-----------------|-------------------|----------------|----------------
-- FSST | 15023 | 6.2 | 45.3 | 12.1 | 1.2 GB | 195 MB
-- ALP | 8891 | 3.8 | 52.1 | 18.7 | 800 MB | 211 MB
-- Dictionary| 4532 | 12.3 | 28.4 | 8.9 | 450 MB | 37 MB
-- RLE | 1203 | 25.1 | 15.2 | 5.3 | 120 MB | 4.8 MB
-- Delta | 9874 | 8.9 | 22.7 | 9.1 | 950 MB | 107 MB
-- LZ4 | 3421 | 2.1 | 18.3 | 6.2 | 340 MB | 162 MB
-- View pattern detection statistics
SELECT * FROM heliosdb_pattern_stats;
-- Output:
-- pattern | detections | best_algorithm | avg_ratio
-- -----------------|------------|----------------|----------
-- StringData | 15023 | FSST | 6.2
-- FloatingPointData| 8891 | ALP | 3.8
-- LowCardinality | 4532 | Dictionary | 12.3
-- Sequential | 9874 | Delta | 8.9
-- Random | 3421 | LZ4 | 2.1
-- View recent compression events
SELECT * FROM heliosdb_compression_events LIMIT 10;
-- Output:
-- timestamp | algorithm | pattern | ratio | input_kb | output_kb | duration_us
-- --------------------|------------|------------------|-------|----------|-----------|------------
-- 2025-11-18 10:23:45 | FSST | StringData | 7.2 | 124 | 17 | 52
-- 2025-11-18 10:23:44 | ALP | FloatingPointData| 4.1 | 256 | 62 | 68
-- 2025-11-18 10:23:43 | Dictionary | LowCardinality | 15.3 | 64 | 4 | 31
-- ...
-- Get current compression configuration
SELECT * FROM heliosdb_config WHERE key LIKE 'compression.%';
-- Output:
-- key | value
-- -------------------------------------|-------
-- compression.enabled | true
-- compression.algorithm | auto
-- compression.auto.auto_switch | true
-- compression.auto.min_compress_size | 256
-- compression.auto.target_ratio | 2.0
-- compression.auto.max_compress_time_us| 1000