Skip to content

Branch Storage Architecture - Part 3 of 3

Operations & Testing

Navigation: Index | ← Part 2 | Part 3


impl BranchMetadataCache { pub fn new(capacity: usize) -> Self { Self { cache: Arc::new(RwLock::new(LruCache::new(capacity))), capacity, } }

pub fn get(&self, branch_id: BranchId) -> Option<BranchMetadata> {
    self.cache.write().get(&branch_id).cloned()
}

pub fn put(&self, metadata: BranchMetadata) {
    self.cache.write().put(metadata.branch_id, metadata);
}

} ```

11.2.2 Parent Chain Caching

rust impl BranchTransaction { /// Build and cache parent chain fn build_parent_chain( branch: &BranchMetadata, storage: &StorageEngine, ) -> Result<Vec<(BranchId, SnapshotId)>> { let mut chain = Vec::new(); let mut current_id = branch.parent_id; while let Some(parent_id) = current_id { let parent = get_branch_metadata(storage, parent_id)?; chain.push((parent_id, parent.created_from_snapshot)); current_id = parent.parent_id; } Ok(chain) } }

11.2.3 Bloom Filters for Modified Keys

/// Track modified keys per branch with bloom filter
pub struct BranchBloomFilter {
    /// Bloom filter for modified keys
    filter: BloomFilter,

    /// False positive rate
    fpr: f64,
}

impl BranchTransaction {
    /// Check if key might be modified in this branch
    fn key_might_exist_in_branch(&self, key: &str) -> bool {
        if let Some(filter) = &self.bloom_filter {
            filter.contains(key)
        } else {
            true // Conservative: assume might exist
        }
    }

    pub fn get(&self, key: &Key) -> Result<Option<Vec<u8>>> {
        // Fast path: check bloom filter
        let key_str = String::from_utf8_lossy(key);

        if !self.key_might_exist_in_branch(&key_str) {
            // Definitely not in this branch - go to parent
            return self.get_from_parent_chain(key);
        }

        // Slow path: actual lookup
        // ... rest of implementation
    }
}

11.2.4 Lazy Parent Reads

/// Lazy iterator over parent chain
struct ParentChainIterator<'a> {
    storage: &'a StorageEngine,
    current_branch: Option<BranchId>,
    current_snapshot: SnapshotId,
}

impl<'a> Iterator for ParentChainIterator<'a> {
    type Item = (BranchId, SnapshotId);

    fn next(&mut self) -> Option<Self::Item> {
        let branch_id = self.current_branch?;
        let snapshot = self.current_snapshot;

        // Load next parent lazily
        if let Ok(metadata) = get_branch_metadata(self.storage, branch_id) {
            self.current_branch = metadata.parent_id;
            self.current_snapshot = metadata.created_from_snapshot;
        } else {
            self.current_branch = None;
        }

        Some((branch_id, snapshot))
    }
}

11.3 Space Efficiency

Storage Breakdown (per branch):
────────────────────────────────────────────
Metadata:           ~500 bytes
Registry entry:     ~50 bytes
Parent/child index: ~100 bytes
Bloom filter:       ~10KB (optional)
────────────────────────────────────────────
Total overhead:     ~11KB per branch

Data storage:       Only modified data
                    (copy-on-write)

Example:
- 1GB database
- 5 branches
- 10% data modified per branch

Total storage:
- Original:   1GB
- Branches:   5 × (11KB + 100MB) ≈ 500MB
- Total:      1.5GB (50% overhead)
- Per branch: 100MB / 1GB = 10% (as expected)

11.4 Compaction Impact

/// Custom compaction filter for branch GC
struct BranchCompactionFilter {
    active_branches: HashSet<BranchId>,
}

impl rocksdb::CompactionFilter for BranchCompactionFilter {
    fn filter(&mut self, _level: u32, key: &[u8], _value: &[u8]) -> bool {
        // Parse branch ID from key
        if let Some(branch_id) = extract_branch_id(key) {
            // Keep if branch is active
            return !self.active_branches.contains(&branch_id);
        }
        false // Keep non-branch keys
    }
}

12. Testing Strategy

12.1 Unit Tests

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_create_branch_metadata_only() {
        let config = Config::in_memory();
        let storage = StorageEngine::open_in_memory(&config).unwrap();

        // Create branch
        let branch_id = create_branch(
            &storage,
            "dev",
            None,
            AsOfClause::Now,
            BranchOptions::default(),
        ).unwrap();

        // Verify metadata exists
        let metadata = get_branch_metadata(&storage, branch_id).unwrap();
        assert_eq!(metadata.name, "dev");
        assert_eq!(metadata.state, BranchState::Active);
    }

    #[test]
    fn test_copy_on_write_first_write() {
        let config = Config::in_memory();
        let storage = StorageEngine::open_in_memory(&config).unwrap();

        // Insert in main
        storage.put(b"key1", b"value1").unwrap();

        // Create branch
        create_branch(&storage, "dev", None, AsOfClause::Now, Default::default()).unwrap();

        // Read from dev (should see main's value)
        let mut tx = storage.begin_branch_transaction("dev").unwrap();
        assert_eq!(tx.get(b"key1").unwrap(), Some(b"value1".to_vec()));

        // Write to dev (copy-on-write)
        tx.put(b"key1".to_vec(), b"value2".to_vec()).unwrap();
        tx.commit().unwrap();

        // Verify isolation
        assert_eq!(
            storage.get(b"key1").unwrap(),
            Some(b"value1".to_vec())
        ); // main unchanged

        let tx_dev = storage.begin_branch_transaction("dev").unwrap();
        assert_eq!(
            tx_dev.get(b"key1").unwrap(),
            Some(b"value2".to_vec())
        ); // dev has new value
    }

    #[test]
    fn test_branch_isolation() {
        let config = Config::in_memory();
        let storage = StorageEngine::open_in_memory(&config).unwrap();

        // Create two branches
        create_branch(&storage, "branch_a", None, AsOfClause::Now, Default::default()).unwrap();
        create_branch(&storage, "branch_b", None, AsOfClause::Now, Default::default()).unwrap();

        // Write to branch_a
        let mut tx_a = storage.begin_branch_transaction("branch_a").unwrap();
        tx_a.put(b"key1".to_vec(), b"value_a".to_vec()).unwrap();
        tx_a.commit().unwrap();

        // Write to branch_b
        let mut tx_b = storage.begin_branch_transaction("branch_b").unwrap();
        tx_b.put(b"key1".to_vec(), b"value_b".to_vec()).unwrap();
        tx_b.commit().unwrap();

        // Verify isolation
        let tx_a = storage.begin_branch_transaction("branch_a").unwrap();
        assert_eq!(tx_a.get(b"key1").unwrap(), Some(b"value_a".to_vec()));

        let tx_b = storage.begin_branch_transaction("branch_b").unwrap();
        assert_eq!(tx_b.get(b"key1").unwrap(), Some(b"value_b".to_vec()));
    }

    #[test]
    fn test_parent_chain_resolution() {
        let config = Config::in_memory();
        let storage = StorageEngine::open_in_memory(&config).unwrap();

        // Insert in main
        storage.put(b"key1", b"value1").unwrap();

        // Create child branch
        create_branch(&storage, "level1", None, AsOfClause::Now, Default::default()).unwrap();

        // Create grandchild branch
        create_branch(&storage, "level2", Some("level1"), AsOfClause::Now, Default::default()).unwrap();

        // Read from grandchild (should see main's value)
        let tx = storage.begin_branch_transaction("level2").unwrap();
        assert_eq!(tx.get(b"key1").unwrap(), Some(b"value1".to_vec()));
    }

    #[test]
    fn test_three_way_merge_no_conflicts() {
        let config = Config::in_memory();
        let storage = StorageEngine::open_in_memory(&config).unwrap();

        // Setup: main has key1=v1
        storage.put(b"key1", b"v1").unwrap();

        // Create branch
        create_branch(&storage, "dev", None, AsOfClause::Now, Default::default()).unwrap();

        // Modify in dev: key1=v2
        let mut tx = storage.begin_branch_transaction("dev").unwrap();
        tx.put(b"key1".to_vec(), b"v2".to_vec()).unwrap();
        tx.commit().unwrap();

        // Merge dev into main
        let result = merge_branch(
            &storage,
            "dev",
            "main",
            MergeOptions {
                conflict_resolution: ConflictResolution::BranchWins,
                delete_branch_after: false,
            },
        ).unwrap();

        assert_eq!(result.conflicts_count, 0);
        assert_eq!(storage.get(b"key1").unwrap(), Some(b"v2".to_vec()));
    }

    #[test]
    fn test_merge_with_conflicts() {
        let config = Config::in_memory();
        let storage = StorageEngine::open_in_memory(&config).unwrap();

        // Setup: main has key1=v1
        storage.put(b"key1", b"v1").unwrap();

        // Create branch
        create_branch(&storage, "dev", None, AsOfClause::Now, Default::default()).unwrap();

        // Modify same key in both branches
        storage.put(b"key1", b"v2_main").unwrap();

        let mut tx = storage.begin_branch_transaction("dev").unwrap();
        tx.put(b"key1".to_vec(), b"v2_dev".to_vec()).unwrap();
        tx.commit().unwrap();

        // Merge with FAIL resolution
        let result = merge_branch(
            &storage,
            "dev",
            "main",
            MergeOptions {
                conflict_resolution: ConflictResolution::Fail,
                delete_branch_after: false,
            },
        );

        assert!(result.is_err()); // Should fail due to conflict
    }
}

12.2 Integration Tests

#[cfg(test)]
mod integration_tests {
    use super::*;

    #[test]
    fn test_branching_sql_integration() {
        let config = Config::in_memory();
        let storage = StorageEngine::open_in_memory(&config).unwrap();

        // Execute SQL commands
        execute_sql(&storage, "CREATE TABLE users (id INT, name TEXT)").unwrap();
        execute_sql(&storage, "INSERT INTO users VALUES (1, 'Alice')").unwrap();

        // Create branch via SQL
        execute_sql(&storage, "CREATE DATABASE BRANCH dev FROM CURRENT AS OF NOW").unwrap();

        // Insert in dev
        execute_sql(&storage, "SET branch = dev").unwrap();
        execute_sql(&storage, "INSERT INTO users VALUES (2, 'Bob')").unwrap();

        // Verify isolation
        execute_sql(&storage, "SET branch = main").unwrap();
        let result = query_sql(&storage, "SELECT * FROM users").unwrap();
        assert_eq!(result.len(), 1); // Only Alice

        execute_sql(&storage, "SET branch = dev").unwrap();
        let result = query_sql(&storage, "SELECT * FROM users").unwrap();
        assert_eq!(result.len(), 2); // Alice and Bob
    }

    #[test]
    fn test_concurrent_branch_transactions() {
        use std::sync::Arc;
        use std::thread;

        let config = Config::in_memory();
        let storage = Arc::new(StorageEngine::open_in_memory(&config).unwrap());

        // Create branches
        create_branch(&storage, "branch1", None, AsOfClause::Now, Default::default()).unwrap();
        create_branch(&storage, "branch2", None, AsOfClause::Now, Default::default()).unwrap();

        let storage1 = Arc::clone(&storage);
        let storage2 = Arc::clone(&storage);

        // Concurrent writes to different branches
        let t1 = thread::spawn(move || {
            for i in 0..100 {
                let mut tx = storage1.begin_branch_transaction("branch1").unwrap();
                tx.put(format!("key{}", i).into_bytes(), b"value1".to_vec()).unwrap();
                tx.commit().unwrap();
            }
        });

        let t2 = thread::spawn(move || {
            for i in 0..100 {
                let mut tx = storage2.begin_branch_transaction("branch2").unwrap();
                tx.put(format!("key{}", i).into_bytes(), b"value2".to_vec()).unwrap();
                tx.commit().unwrap();
            }
        });

        t1.join().unwrap();
        t2.join().unwrap();

        // Verify isolation
        let tx1 = storage.begin_branch_transaction("branch1").unwrap();
        assert_eq!(tx1.get(b"key50").unwrap(), Some(b"value1".to_vec()));

        let tx2 = storage.begin_branch_transaction("branch2").unwrap();
        assert_eq!(tx2.get(b"key50").unwrap(), Some(b"value2".to_vec()));
    }
}

12.3 Performance Tests

#[cfg(test)]
mod perf_tests {
    use super::*;
    use std::time::Instant;

    #[test]
    fn bench_branch_creation() {
        let config = Config::in_memory();
        let storage = StorageEngine::open_in_memory(&config).unwrap();

        // Insert 1M rows
        for i in 0..1_000_000 {
            storage.put(format!("key{}", i).as_bytes(), b"value").unwrap();
        }

        // Benchmark branch creation
        let start = Instant::now();
        create_branch(&storage, "bench", None, AsOfClause::Now, Default::default()).unwrap();
        let elapsed = start.elapsed();

        println!("Branch creation: {:?}", elapsed);
        assert!(elapsed.as_millis() < 10, "Branch creation too slow: {:?}", elapsed);
    }

    #[test]
    fn bench_branch_read_overhead() {
        let config = Config::in_memory();
        let storage = StorageEngine::open_in_memory(&config).unwrap();

        // Insert 100K rows
        for i in 0..100_000 {
            storage.put(format!("key{}", i).as_bytes(), b"value").unwrap();
        }

        // Create branch
        create_branch(&storage, "bench", None, AsOfClause::Now, Default::default()).unwrap();

        // Benchmark main reads
        let start = Instant::now();
        for i in 0..10_000 {
            storage.get(format!("key{}", i).as_bytes()).unwrap();
        }
        let main_time = start.elapsed();

        // Benchmark branch reads
        let start = Instant::now();
        for i in 0..10_000 {
            let tx = storage.begin_branch_transaction("bench").unwrap();
            tx.get(format!("key{}", i).as_bytes()).unwrap();
        }
        let branch_time = start.elapsed();

        let overhead = (branch_time.as_micros() as f64 / main_time.as_micros() as f64 - 1.0) * 100.0;
        println!("Branch read overhead: {:.2}%", overhead);
        assert!(overhead < 10.0, "Branch read overhead too high: {:.2}%", overhead);
    }
}

12.4 Stress Tests

#[cfg(test)]
mod stress_tests {
    use super::*;

    #[test]
    fn stress_test_many_branches() {
        let config = Config::in_memory();
        let storage = StorageEngine::open_in_memory(&config).unwrap();

        // Create 100 branches
        for i in 0..100 {
            create_branch(
                &storage,
                &format!("branch{}", i),
                None,
                AsOfClause::Now,
                Default::default(),
            ).unwrap();
        }

        // Verify all branches are accessible
        for i in 0..100 {
            let tx = storage.begin_branch_transaction(&format!("branch{}", i)).unwrap();
            assert!(tx.get(b"key").unwrap().is_none());
        }
    }

    #[test]
    fn stress_test_deep_hierarchy() {
        let config = Config::in_memory();
        let storage = StorageEngine::open_in_memory(&config).unwrap();

        // Create 50-level deep hierarchy
        let mut parent = None;
        for i in 0..50 {
            let branch_name = format!("level{}", i);
            create_branch(
                &storage,
                &branch_name,
                parent.as_deref(),
                AsOfClause::Now,
                Default::default(),
            ).unwrap();
            parent = Some(branch_name);
        }

        // Insert at root
        storage.put(b"key", b"value").unwrap();

        // Read from deepest branch (should traverse 50 parents)
        let tx = storage.begin_branch_transaction("level49").unwrap();
        assert_eq!(tx.get(b"key").unwrap(), Some(b"value".to_vec()));
    }
}

Appendix A: SQL Examples

-- Create a development branch from current state
CREATE DATABASE BRANCH dev FROM CURRENT AS OF NOW;

-- Create a branch from a specific timestamp
CREATE DATABASE BRANCH staging
FROM main
AS OF TIMESTAMP '2025-11-18 10:00:00';

-- Create a branch from a transaction ID
CREATE DATABASE BRANCH hotfix
FROM production
AS OF TRANSACTION 123456;

-- Create a branch with options
CREATE DATABASE BRANCH experiment
FROM main
AS OF NOW
WITH (
    replication_factor = 3,
    region = 'us-west'
);

-- Switch to a branch (session-level)
SET branch = dev;

-- Insert data in branch
INSERT INTO users (name) VALUES ('Alice');

-- Query branch
SELECT * FROM users;

-- Merge branch back to main
MERGE DATABASE BRANCH dev INTO main
WITH (
    conflict_resolution = 'branch_wins',
    delete_branch_after = true
);

-- Drop a branch
DROP DATABASE BRANCH IF EXISTS experiment;

-- List all branches (system view)
SELECT * FROM pg_database_branches();

-- View branch hierarchy
SELECT
    name,
    parent_name,
    created_at,
    state,
    modified_keys
FROM pg_database_branches()
ORDER BY created_at;

Appendix B: Architecture Decision Records

ADR-1: Use RocksDB Native MVCC

Decision: Leverage RocksDB's existing MVCC capabilities rather than implementing a separate versioning layer.

Rationale: - RocksDB provides efficient multi-version storage - Reduces implementation complexity - Better integration with existing storage layer

Consequences: - Branch versioning builds on RocksDB timestamps - Natural alignment with existing transaction system - Minor overhead for branch metadata


ADR-2: Branch ID in Physical Key

Decision: Encode branch ID as part of the physical key rather than using column families.

Rationale: - Simpler key space management - Efficient range scans within a branch - Natural clustering for compaction

Alternatives Considered: - Column families per branch: Complex management, limited to ~10K branches - Separate RocksDB instances: Too much overhead

Consequences: - Keys are slightly larger (8 bytes for branch ID) - Efficient branch isolation and GC


ADR-3: Lazy Parent Resolution

Decision: Resolve parent values on-demand during reads rather than eagerly copying.

Rationale: - Instant branch creation - Minimal storage overhead - Most data is never modified in branches

Consequences: - Slight read overhead for unchanged data - Complex read path with parent chain traversal - Excellent storage efficiency


ADR-4: Three-Way Merge Algorithm

Decision: Use three-way merge with configurable conflict resolution.

Rationale: - Standard approach in version control systems - Handles complex merge scenarios - Supports automatic conflict resolution

Alternatives Considered: - Two-way merge: Cannot detect concurrent modifications - CRDTs: Too complex for general key-value storage

Consequences: - Requires merge base tracking - Supports flexible conflict resolution strategies


Appendix C: Future Enhancements

Distributed Branching

Support branches across distributed HeliosDB clusters:

pub struct DistributedBranchOptions {
    /// Replication factor
    pub replication_factor: usize,

    /// Preferred region
    pub region: String,

    /// Cross-region replication
    pub cross_region: bool,
}

Branch Snapshots

Create lightweight snapshots within branches:

CREATE SNAPSHOT dev_snapshot_1 FROM BRANCH dev AS OF NOW;

Branch Permissions

Fine-grained access control per branch:

GRANT SELECT, INSERT ON BRANCH dev TO user_alice;
REVOKE DELETE ON BRANCH production FROM user_bob;

Branch Triggers

Execute triggers on branch events:

CREATE TRIGGER on_branch_merge
AFTER MERGE ON BRANCH dev
EXECUTE PROCEDURE notify_deployment();

Summary

This architecture provides:

  1. Efficient Copy-on-Write: Zero-copy branch creation, minimal storage overhead
  2. Strong Isolation: Branch-aware MVCC with snapshot isolation
  3. Flexible Merging: Three-way merge with conflict detection and resolution
  4. Automatic GC: Reference-counting garbage collection for dropped branches
  5. MVCC Integration: Seamless integration with existing transaction system

Next Steps: 1. Implement core data structures (BranchMetadata, BranchManager) 2. Extend StorageEngine with branch-aware transactions 3. Implement copy-on-write read/write paths 4. Implement three-way merge algorithm 5. Add garbage collection 6. Integration with SQL parser and executor 7. Comprehensive testing (unit, integration, performance)

Implementation Effort: 6-8 weeks - Week 1-2: Core structures and metadata management - Week 3-4: Copy-on-write read/write paths - Week 5: Merge algorithm - Week 6: Garbage collection - Week 7: SQL integration - Week 8: Testing and optimization