Compression Integration Architecture - Part 1 of 4¶
Architecture Overview & API Design¶
Navigation: Index | Part 1 | Part 2 →
HeliosDB Lite v2.1 - Compression Integration Architecture¶
Version: 2.1.0 Created: November 18, 2025 Status: Architecture Design Target: HeliosDB Lite Phase 3 - Week 2
Executive Summary¶
This document defines the architecture for integrating advanced compression codecs (FSST and ALP) into HeliosDB Lite v2.1, providing automatic, transparent compression that achieves 5-15x storage reduction with minimal performance overhead.
Key Design Principles¶
- Zero Configuration - Automatic codec selection based on data patterns
- Non-Breaking - Fully backward compatible with existing storage
- Modular - Clean separation between codecs, storage, and Arrow integration
- Observable - Comprehensive metrics and performance monitoring
- Zero IP - Only open-source algorithms (FSST, ALP, standard codecs)
Success Metrics¶
| Metric | Target | Measurement |
|---|---|---|
| Storage Reduction | 5-15x | Compression ratio on real workloads |
| Read Overhead | <3% | Query latency with compression vs. without |
| Write Overhead | <5% | Insert throughput with compression vs. without |
| CPU Overhead | <15% | Background compression CPU usage |
| Migration Time | 0 seconds | Lazy migration on read/write |
Architecture Overview¶
System Context Diagram¶
┌─────────────────────────────────────────────────────────────────┐
│ HeliosDB Lite v2.1 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ SQL Query Interface │ │
│ │ (PostgreSQL Wire Protocol + REPL) │ │
│ └────────────────┬───────────────────────────────────────┘ │
│ │ │
│ ┌────────────────▼───────────────────────────────────────┐ │
│ │ Query Executor (Volcano Model) │ │
│ │ • Scan Operator │ │
│ │ • Filter/Project │ │
│ │ • Join/Aggregate │ │
│ └────────────────┬───────────────────────────────────────┘ │
│ │ │
│ ┌────────────────▼───────────────────────────────────────┐ │
│ │ Storage Engine (RocksDB-based) │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Compression Layer (NEW) │ │ │
│ │ │ ┌──────────┬──────────┬─────────┬────────┐ │ │ │
│ │ │ │ FSST │ ALP │ Dict │ RLE │ │ │ │
│ │ │ └──────────┴──────────┴─────────┴────────┘ │ │ │
│ │ │ ┌──────────────────────────────────────┐ │ │ │
│ │ │ │ Codec Selector (Pattern-based) │ │ │ │
│ │ │ └──────────────────────────────────────┘ │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Existing LZ4/Zstd (RocksDB native) │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ MVCC Layer (Versioned Values) │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ RocksDB (Persistent Storage) │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Apache Arrow Integration (Columnar) │ │
│ │ • Batch Processing │ │
│ │ • Column-specific Compression │ │
│ │ • Parquet Export (with compression metadata) │ │
│ └────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Compression Layer Architecture¶
┌──────────────────────────────────────────────────────────────────┐
│ Compression Layer │
├──────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ CompressionManager (Main API) │ │
│ │ • compress(data) -> CompressedBlock │ │
│ │ • decompress(block) -> Vec<u8> │ │
│ │ • compress_column(column) -> CompressedColumn │ │
│ └────┬───────────────────────────────────────┬─────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌────────────────────────┐ ┌────────────────────┐ │
│ │ Pattern Analyzer │ │ Codec Registry │ │
│ │ • SIMD-accelerated │ │ • FSST │ │
│ │ • Data type detection │ │ • ALP │ │
│ │ • Entropy calculation │ │ • Dictionary │ │
│ │ • Cardinality check │ │ • RLE │ │
│ └────┬───────────────────┘ │ • Delta │ │
│ │ │ • LZ4/Zstd │ │
│ ▼ └────────────────────┘ │
│ ┌────────────────────────┐ │
│ │ Codec Selector │◄────────────────────┐ │
│ │ • Rule-based │ │ │
│ │ • Cost model │ │ │
│ │ • Adaptive learning │ │ │
│ └────┬───────────────────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌────────────────────────┐ ┌───────────┴──────────┐ │
│ │ Compression Pipeline │ │ Stats Collector │ │
│ │ 1. Analyze pattern │─────────▶│ • Ratio history │ │
│ │ 2. Select codec │ │ • Performance │ │
│ │ 3. Compress │ │ • Pattern freq │ │
│ │ 4. Validate checksum │ └──────────────────────┘ │
│ │ 5. Record stats │ │
│ └────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘
Integration Points¶
┌─────────────────────────────────────────────────────────────────┐
│ Integration Points │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. RocksDB Storage Layer Integration │
│ ┌─────────────────────────────────────────────────┐ │
│ │ StorageEngine::put(key, value) │ │
│ │ │ │ │
│ │ ├──▶ serialize(value) │ │
│ │ ├──▶ compress(serialized) ◄── NEW │ │
│ │ ├──▶ encrypt(compressed) [if enabled] │ │
│ │ └──▶ db.put(key, encrypted) │ │
│ │ │ │
│ │ StorageEngine::get(key) │ │
│ │ │ │ │
│ │ ├──▶ db.get(key) │ │
│ │ ├──▶ decrypt(data) [if enabled] │ │
│ │ ├──▶ decompress(decrypted) ◄── NEW │ │
│ │ └──▶ deserialize(decompressed) │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ 2. Apache Arrow Integration │
│ ┌─────────────────────────────────────────────────┐ │
│ │ scan_table_columnar(table) -> RecordBatch │ │
│ │ │ │ │
│ │ ├──▶ collect tuples │ │
│ │ ├──▶ convert to Arrow columns │ │
│ │ ├──▶ compress_column(col) ◄── NEW │ │
│ │ │ • FSST for strings │ │
│ │ │ • ALP for floats │ │
│ │ │ • Dictionary for low cardinality │ │
│ │ └──▶ return RecordBatch │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ 3. Tuple Serialization Integration │
│ ┌─────────────────────────────────────────────────┐ │
│ │ insert_tuple(table, tuple) -> row_id │ │
│ │ │ │ │
│ │ ├──▶ bincode::serialize(tuple) │ │
│ │ ├──▶ compression.compress(bytes) ◄── NEW │ │
│ │ │ • Per-column analysis │ │
│ │ │ • Metadata stored in block │ │
│ │ └──▶ storage.put(key, compressed) │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ 4. Configuration Integration │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Config::storage::compression ◄── ENHANCED │ │
│ │ • None / Lz4 / Zstd (existing) │ │
│ │ • Auto (new - uses pattern-based selection) │ │
│ │ • Custom { codecs: [...], strategy: Auto } │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
API Design¶
Compression Traits¶
//! Compression codec trait definitions
//! Location: src/storage/compression/mod.rs
use serde::{Deserialize, Serialize};
use std::time::Duration;
/// Compression algorithm identifier
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum CompressionAlgorithm {
/// No compression
None,
/// LZ4 - fast general purpose (existing, via RocksDB)
Lz4,
/// Zstd - high ratio general purpose (existing, via RocksDB)
Zstd,
/// FSST - Fast Static Symbol Table for strings (NEW)
Fsst,
/// ALP - Adaptive Lossless floating-Point (NEW)
Alp,
/// Dictionary encoding - low cardinality data
Dictionary,
/// RLE - Run Length Encoding
Rle,
/// Delta encoding - sequential numeric data
Delta,
/// Automatic selection based on data pattern
Auto,
}
/// Data pattern detected by analyzer
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DataPattern {
/// Random/incompressible data
Random,
/// String data with repeated patterns
StringData,
/// Numeric integer data
IntegerData,
/// Floating point data
FloatingPointData,
/// Low cardinality (few unique values)
LowCardinality,
/// Sequential/sorted data
Sequential,
/// Time-series data
TimeSeries,
/// JSON/structured text
StructuredText,
}
/// Codec performance characteristics
#[derive(Debug, Clone)]
pub struct CodecCharacteristics {
/// Compression speed (MB/s)
pub compress_speed_mbps: f64,
/// Decompression speed (MB/s)
pub decompress_speed_mbps: f64,
/// Typical compression ratio
pub typical_ratio: f64,
/// CPU overhead percentage (0.0-1.0)
pub cpu_overhead: f64,
/// Best for pattern
pub best_for: Vec<DataPattern>,
}
/// Main compression codec trait
pub trait CompressionCodec: Send + Sync {
/// Get codec algorithm identifier
fn algorithm(&self) -> CompressionAlgorithm;
/// Compress data
fn compress(&self, data: &[u8]) -> Result<Vec<u8>>;
/// Decompress data
fn decompress(&self, compressed: &[u8], original_size: Option<usize>) -> Result<Vec<u8>>;
/// Get codec characteristics
fn characteristics(&self) -> CodecCharacteristics;
/// Estimate compressed size without actually compressing
fn estimate_compressed_size(&self, data: &[u8]) -> usize {
let ratio = self.characteristics().typical_ratio;
((data.len() as f64 / ratio).ceil() as usize).max(128)
}
/// Check if codec can handle this data pattern efficiently
fn supports_pattern(&self, pattern: DataPattern) -> bool {
self.characteristics().best_for.contains(&pattern)
}
}
/// Compressed data block with metadata
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressedBlock {
/// Algorithm used for compression
pub algorithm: CompressionAlgorithm,
/// Version of the algorithm
pub version: u16,
/// Original uncompressed size
pub original_size: usize,
/// Compressed data
pub data: Vec<u8>,
/// CRC32 checksum of original data
pub checksum: u32,
/// Compression timestamp
pub compressed_at: u64,
/// Data pattern detected
pub pattern: DataPattern,
}
impl CompressedBlock {
/// Get compression ratio
pub fn compression_ratio(&self) -> f64 {
self.original_size as f64 / self.data.len() as f64
}
/// Get space savings percentage
pub fn space_savings_pct(&self) -> f64 {
(1.0 - (self.data.len() as f64 / self.original_size as f64)) * 100.0
}
}
Codec Implementations¶
//! FSST Codec Implementation
//! Location: src/storage/compression/fsst.rs
use super::{CompressionAlgorithm, CompressionCodec, CodecCharacteristics, DataPattern};
use crate::error::Result;
/// FSST (Fast Static Symbol Table) compression codec
///
/// FSST is a string compression algorithm that builds a symbol table
/// of frequent byte sequences and replaces them with single-byte codes.
/// Excellent for string data with repeated patterns (logs, URLs, JSON).
///
/// Reference: "FSST: Fast Static Symbol Table Compression" (DuckDB)
/// Patent Status: Open source, MIT licensed
pub struct FsstCodec {
// Configuration
symbol_table_size: usize,
min_pattern_length: usize,
}
impl FsstCodec {
pub fn new() -> Self {
Self {
symbol_table_size: 256, // Single-byte codes
min_pattern_length: 3, // Minimum pattern to compress
}
}
/// Build symbol table from sample data
fn build_symbol_table(&self, data: &[u8]) -> SymbolTable {
// Implementation:
// 1. Count n-gram frequencies (n=3..8)
// 2. Select top 256 most frequent patterns
// 3. Build encoding/decoding tables
todo!("Build FSST symbol table")
}
/// Compress data using symbol table
fn compress_with_table(&self, data: &[u8], table: &SymbolTable) -> Vec<u8> {
// Implementation:
// 1. Serialize symbol table (header)
// 2. Replace patterns with single-byte codes
// 3. Emit unmatched bytes literally
todo!("FSST compression")
}
}
impl CompressionCodec for FsstCodec {
fn algorithm(&self) -> CompressionAlgorithm {
CompressionAlgorithm::Fsst
}
fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
// Build symbol table from data
let table = self.build_symbol_table(data);
// Compress using table
Ok(self.compress_with_table(data, &table))
}
fn decompress(&self, compressed: &[u8], _original_size: Option<usize>) -> Result<Vec<u8>> {
// Implementation:
// 1. Deserialize symbol table from header
// 2. Decode byte-by-byte, expanding symbols
todo!("FSST decompression")
}
fn characteristics(&self) -> CodecCharacteristics {
CodecCharacteristics {
compress_speed_mbps: 500.0,
decompress_speed_mbps: 2000.0,
typical_ratio: 5.0, // 5x better than Zstd for strings
cpu_overhead: 0.04,
best_for: vec![
DataPattern::StringData,
DataPattern::StructuredText,
],
}
}
}
struct SymbolTable {
// Symbol encoding table: pattern -> code
encoding: Vec<(Vec<u8>, u8)>,
// Symbol decoding table: code -> pattern
decoding: Vec<Vec<u8>>,
}
//! ALP Codec Implementation
//! Location: src/storage/compression/alp.rs
use super::{CompressionAlgorithm, CompressionCodec, CodecCharacteristics, DataPattern};
use crate::error::Result;
/// ALP (Adaptive Lossless floating-Point) compression codec
///
/// ALP is a floating-point compression algorithm that exploits the structure
/// of floating-point data to achieve better compression than general-purpose
/// algorithms.
///
/// Reference: "ALP: Adaptive Lossless floating-Point Compression" (DuckDB)
/// Patent Status: Open source
pub struct AlpCodec {
// Configuration
use_exceptions: bool,
}
impl AlpCodec {
pub fn new() -> Self {
Self {
use_exceptions: true,
}
}
/// Analyze floating point data and determine encoding parameters
fn analyze_floats(&self, data: &[u8]) -> AlpParameters {
// Implementation:
// 1. Parse as f32/f64 array
// 2. Detect common exponent
// 3. Determine precision requirements
// 4. Calculate optimal encoding
todo!("ALP analysis")
}
}
impl CompressionCodec for AlpCodec {
fn algorithm(&self) -> CompressionAlgorithm {
CompressionAlgorithm::Alp
}
fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
// Implementation:
// 1. Analyze data to get parameters
// 2. Encode parameters in header
// 3. Encode mantissas with common exponent
// 4. Handle exceptions (if enabled)
todo!("ALP compression")
}
fn decompress(&self, compressed: &[u8], original_size: Option<usize>) -> Result<Vec<u8>> {
// Implementation:
// 1. Read parameters from header
// 2. Decode mantissas
// 3. Apply exceptions
// 4. Reconstruct floats
todo!("ALP decompression")
}
fn characteristics(&self) -> CodecCharacteristics {
CodecCharacteristics {
compress_speed_mbps: 600.0,
decompress_speed_mbps: 1800.0,
typical_ratio: 3.0, // 3x better than Zstd for floats
cpu_overhead: 0.03,
best_for: vec![
DataPattern::FloatingPointData,
DataPattern::TimeSeries,
],
}
}
}
struct AlpParameters {
common_exponent: i32,
precision_bits: u8,
exception_count: usize,
}
Compression Manager API¶
//! Compression Manager - Main API
//! Location: src/storage/compression/manager.rs
use super::{CompressionAlgorithm, CompressionCodec, CompressedBlock, DataPattern};
use crate::error::Result;
use std::sync::Arc;
/// Compression manager configuration
#[derive(Debug, Clone)]
pub struct CompressionConfig {
/// Enable compression
pub enabled: bool,
/// Default algorithm (Auto for automatic selection)
pub default_algorithm: CompressionAlgorithm,
/// Enable automatic codec switching
pub auto_switch: bool,
/// Enable statistics collection
pub collect_stats: bool,
/// Minimum data size to compress (bytes)
pub min_compress_size: usize,
/// Target compression ratio (for auto mode)
pub target_ratio: f64,
/// Maximum compression time (microseconds)
pub max_compress_time_us: u64,
}
impl Default for CompressionConfig {
fn default() -> Self {
Self {
enabled: true,
default_algorithm: CompressionAlgorithm::Auto,
auto_switch: true,
collect_stats: true,
min_compress_size: 256, // Don't compress tiny blocks
target_ratio: 2.0, // Aim for 2x minimum
max_compress_time_us: 1000, // 1ms max
}
}
}
/// Main compression manager
pub struct CompressionManager {
config: CompressionConfig,
codecs: Arc<CodecRegistry>,
analyzer: Arc<PatternAnalyzer>,
selector: Arc<CodecSelector>,
stats: Arc<StatsCollector>,
}
impl CompressionManager {
pub fn new(config: CompressionConfig) -> Result<Self> {
Ok(Self {
config,
codecs: Arc::new(CodecRegistry::new()),
analyzer: Arc::new(PatternAnalyzer::new()),
selector: Arc::new(CodecSelector::new()),
stats: Arc::new(StatsCollector::new()),
})
}
/// Compress data with automatic codec selection
pub fn compress(&self, data: &[u8]) -> Result<CompressedBlock> {
// Skip compression for small data
if data.len() < self.config.min_compress_size {
return self.create_uncompressed_block(data);
}
// Analyze data pattern
let pattern = self.analyzer.analyze(data)?;
// Select best codec
let algorithm = if self.config.default_algorithm == CompressionAlgorithm::Auto {
self.selector.select_for_pattern(pattern, data)?
} else {
self.config.default_algorithm
};
// Get codec and compress
let codec = self.codecs.get(algorithm)?;
let start = std::time::Instant::now();
let compressed = codec.compress(data)?;
let duration = start.elapsed();
// Check if compression was beneficial
if compressed.len() >= data.len() {
return self.create_uncompressed_block(data);
}
// Create block with metadata
let block = CompressedBlock {
algorithm,
version: 1,
original_size: data.len(),
data: compressed,
checksum: crc32fast::hash(data),
compressed_at: current_timestamp(),
pattern,
};
// Record statistics
if self.config.collect_stats {
self.stats.record(algorithm, pattern, &block, duration);
}
Ok(block)
}
/// Compress with specific algorithm
pub fn compress_with(
&self,
data: &[u8],
algorithm: CompressionAlgorithm,
) -> Result<CompressedBlock> {
let codec = self.codecs.get(algorithm)?;
let compressed = codec.compress(data)?;
Ok(CompressedBlock {
algorithm,
version: 1,
original_size: data.len(),
data: compressed,
checksum: crc32fast::hash(data),
compressed_at: current_timestamp(),
pattern: self.analyzer.analyze(data)?,
})
}
/// Decompress data
pub fn decompress(&self, block: &CompressedBlock) -> Result<Vec<u8>> {
// Handle uncompressed blocks
if block.algorithm == CompressionAlgorithm::None {
return Ok(block.data.clone());
}
// Get codec and decompress
let codec = self.codecs.get(block.algorithm)?;
let decompressed = codec.decompress(&block.data, Some(block.original_size))?;
// Verify checksum
let checksum = crc32fast::hash(&decompressed);
if checksum != block.checksum {
return Err(Error::ChecksumMismatch {
expected: block.checksum,
actual: checksum,
});
}
Ok(decompressed)
}
/// Get compression statistics
pub fn stats(&self) -> CompressionStats {
self.stats.snapshot()
}
fn create_uncompressed_block(&self, data: &[u8]) -> Result<CompressedBlock> {
Ok(CompressedBlock {
algorithm: CompressionAlgorithm::None,
version: 1,
original_size: data.len(),
data: data.to_vec(),
checksum: crc32fast::hash(data),
compressed_at: current_timestamp(),
pattern: DataPattern::Random,
})
}
}
/// Codec registry
struct CodecRegistry {
codecs: HashMap<CompressionAlgorithm, Arc<dyn CompressionCodec>>,
}
impl CodecRegistry {
fn new() -> Self {
let mut codecs: HashMap<CompressionAlgorithm, Arc<dyn CompressionCodec>> = HashMap::new();
// Register all codecs
codecs.insert(CompressionAlgorithm::Fsst, Arc::new(FsstCodec::new()));
codecs.insert(CompressionAlgorithm::Alp, Arc::new(AlpCodec::new()));
codecs.insert(CompressionAlgorithm::Dictionary, Arc::new(DictionaryCodec::new()));
codecs.insert(CompressionAlgorithm::Rle, Arc::new(RleCodec::new()));
codecs.insert(CompressionAlgorithm::Delta, Arc::new(DeltaCodec::new()));
// Note: LZ4/Zstd handled by RocksDB layer
Self { codecs }
}
fn get(&self, algorithm: CompressionAlgorithm) -> Result<Arc<dyn CompressionCodec>> {
self.codecs
.get(&algorithm)
.cloned()
.ok_or(Error::UnsupportedAlgorithm(algorithm))
}
}
Pattern Analyzer¶
//! Pattern Analyzer - Data pattern detection
//! Location: src/storage/compression/analyzer.rs
use super::DataPattern;
use crate::error::Result;
/// Pattern analyzer using SIMD acceleration
pub struct PatternAnalyzer {
sample_size: usize,
}
impl PatternAnalyzer {
pub fn new() -> Self {
Self {
sample_size: 4096, // Sample first 4KB
}
}
/// Analyze data and detect pattern
pub fn analyze(&self, data: &[u8]) -> Result<DataPattern> {
let sample = self.get_sample(data);
// Run multiple detection heuristics
let is_string = self.detect_string_data(sample);
let is_float = self.detect_float_data(sample);
let is_integer = self.detect_integer_data(sample);
let cardinality = self.calculate_cardinality(sample);
let entropy = self.calculate_entropy(sample);
let is_sequential = self.detect_sequential(sample);
// Pattern selection logic
if entropy < 0.3 {
return Ok(DataPattern::Random);
}
if cardinality < 256 && cardinality as f64 / sample.len() as f64 < 0.1 {
return Ok(DataPattern::LowCardinality);
}
if is_sequential {
return Ok(DataPattern::Sequential);
}
if is_float {
return Ok(DataPattern::FloatingPointData);
}
if is_integer {
return Ok(DataPattern::IntegerData);
}
if is_string {
// Check if it's JSON-like
if self.looks_like_json(sample) {
return Ok(DataPattern::StructuredText);
}
return Ok(DataPattern::StringData);
}
Ok(DataPattern::Random)
}
fn get_sample<'a>(&self, data: &'a [u8]) -> &'a [u8] {
if data.len() <= self.sample_size {
data
} else {
&data[..self.sample_size]
}
}
fn detect_string_data(&self, data: &[u8]) -> bool {
// Check for printable ASCII/UTF-8
let printable_count = data.iter()
.filter(|&&b| (b >= 32 && b <= 126) || b == b'\n' || b == b'\t')
.count();
printable_count as f64 / data.len() as f64 > 0.8
}
fn detect_float_data(&self, data: &[u8]) -> bool {
// Check if data aligns to f32/f64 boundaries and has float patterns
if data.len() % 8 == 0 || data.len() % 4 == 0 {
// Try to interpret as floats and check for valid ranges
// This is a heuristic - not perfect
let floats_count = (data.len() / 8).min(100);
let valid = unsafe {
std::slice::from_raw_parts(data.as_ptr() as *const f64, floats_count)
}.iter()
.filter(|&&f| f.is_finite() && f.abs() < 1e10)
.count();
valid as f64 / floats_count as f64 > 0.7
} else {
false
}
}
fn detect_integer_data(&self, data: &[u8]) -> bool {
// Similar heuristic for integers
if data.len() % 8 == 0 {
// Check for reasonable integer ranges
true // Simplified
} else {
false
}
}
fn calculate_cardinality(&self, data: &[u8]) -> usize {
let mut seen = std::collections::HashSet::new();
for &byte in data.iter().take(1024) {
seen.insert(byte);
}
seen.len()
}
fn calculate_entropy(&self, data: &[u8]) -> f64 {
let mut counts = [0u32; 256];
for &byte in data {
counts[byte as usize] += 1;
}
let len = data.len() as f64;
let mut entropy = 0.0;
for &count in &counts {
if count > 0 {
let p = count as f64 / len;
entropy -= p * p.log2();
}
}
entropy / 8.0 // Normalize to 0-1
}
fn detect_sequential(&self, data: &[u8]) -> bool {
// Check for monotonic sequences
if data.len() < 16 {
return false;
}
let diffs: Vec<i16> = data.windows(2)
.map(|w| w[1] as i16 - w[0] as i16)
.collect();
// Check if most differences are the same
let mut diff_counts = std::collections::HashMap::new();
for &diff in &diffs {
*diff_counts.entry(diff).or_insert(0) += 1;
}
let max_count = diff_counts.values().max().unwrap_or(&0);
*max_count as f64 / diffs.len() as f64 > 0.7
}
fn looks_like_json(&self, data: &[u8]) -> bool {
// Simple heuristic: check for { } [ ] : , "
let json_chars = data.iter()
.filter(|&&b| matches!(b, b'{' | b'}' | b'[' | b']' | b':' | b',' | b'"'))
.count();
json_chars as f64 / data.len() as f64 > 0.1
}
}
Codec Selector¶
//! Codec Selector - Automatic codec selection
//! Location: src/storage/compression/selector.rs
use super::{CompressionAlgorithm, DataPattern};
use crate::error::Result;
/// Codec selector with rule-based and adaptive logic
pub struct CodecSelector {
// Historical performance data
history: Arc<Mutex<SelectionHistory>>,
}
impl CodecSelector {
pub fn new() -> Self {
Self {
history: Arc::new(Mutex::new(SelectionHistory::new())),
}
}
/// Select best codec for data pattern
pub fn select_for_pattern(&self, pattern: DataPattern, data: &[u8]) -> Result<CompressionAlgorithm> {
// Rule-based selection
let algorithm = match pattern {
DataPattern::StringData => CompressionAlgorithm::Fsst,
DataPattern::StructuredText => CompressionAlgorithm::Fsst,
DataPattern::FloatingPointData => CompressionAlgorithm::Alp,
DataPattern::TimeSeries => CompressionAlgorithm::Alp,
DataPattern::LowCardinality => CompressionAlgorithm::Dictionary,
DataPattern::Sequential => CompressionAlgorithm::Delta,
DataPattern::IntegerData => {
// Check if sequential
if self.is_sequential_integers(data) {
CompressionAlgorithm::Delta
} else {
CompressionAlgorithm::Dictionary
}
}
DataPattern::Random => CompressionAlgorithm::Lz4,
};
// Check historical performance and potentially override
if let Some(better) = self.check_history(pattern, algorithm) {
return Ok(better);
}
Ok(algorithm)
}
fn is_sequential_integers(&self, data: &[u8]) -> bool {
// Heuristic for detecting sequential integer patterns
if data.len() < 32 {
return false;
}
// Sample a few integers and check differences
let samples = data.len() / 8;
if samples < 4 {
return false;
}
let integers: Vec<i64> = (0..samples.min(100))
.map(|i| {
let offset = i * 8;
i64::from_le_bytes([
data[offset], data[offset+1], data[offset+2], data[offset+3],
data[offset+4], data[offset+5], data[offset+6], data[offset+7],
])
})
.collect();
// Check if differences are consistent
let diffs: Vec<i64> = integers.windows(2)
.map(|w| w[1] - w[0])
.collect();
let avg_diff = diffs.iter().sum::<i64>() as f64 / diffs.len() as f64;
let variance = diffs.iter()
.map(|&d| (d as f64 - avg_diff).powi(2))
.sum::<f64>() / diffs.len() as f64;
// Low variance indicates sequential
variance < 100.0
}
fn check_history(&self, pattern: DataPattern, current: CompressionAlgorithm) -> Option<CompressionAlgorithm> {
let history = self.history.lock().unwrap();
history.get_better_algorithm(pattern, current)
}
}
struct SelectionHistory {
// Pattern -> Algorithm -> Performance metrics
performance: HashMap<DataPattern, HashMap<CompressionAlgorithm, PerformanceMetrics>>,
}
impl SelectionHistory {
fn new() -> Self {
Self {
performance: HashMap::new(),
}
}
fn get_better_algorithm(&self, pattern: DataPattern, current: CompressionAlgorithm) -> Option<CompressionAlgorithm> {
if let Some(algorithms) = self.performance.get(&pattern) {
// Find algorithm with best ratio and acceptable speed
let current_perf = algorithms.get(¤t)?;
for (algo, perf) in algorithms {
if perf.avg_ratio > current_perf.avg_ratio * 1.2 &&
perf.avg_compress_us < current_perf.avg_compress_us * 2.0 {
return Some(*algo);
}
}
}
None
}
}
#[derive(Debug, Clone)]
struct PerformanceMetrics {
avg_ratio: f64,
avg_compress_us: f64,
avg_decompress_us: f64,
sample_count: usize,
}