Skip to content

Branch Storage Architecture - Part 2 of 3

API & Implementation

Navigation: Index | ← Part 1 | Part 2 | Part 3 →


// 2. Find merge base (common ancestor)
let merge_base = find_merge_base(storage, source.branch_id, target.branch_id)?;

// 3. Three-way merge: base -> source, base -> target
let conflicts = perform_three_way_merge(
    storage,
    merge_base,
    source.branch_id,
    target.branch_id,
    &options,
)?;

// 4. Handle conflicts based on resolution strategy
if !conflicts.is_empty() {
    match options.conflict_resolution {
        ConflictResolution::Fail => {
            return Err(Error::merge_conflicts(conflicts));
        }
        ConflictResolution::BranchWins => {
            // Source wins - already applied
        }
        ConflictResolution::TargetWins => {
            // Revert to target versions
            revert_conflicts_to_target(storage, &conflicts)?;
        }
    }
}

// 5. Update target's merge_base
let mut target_meta = target.clone();
target_meta.merge_base = Some(source.created_from_snapshot);
save_branch_metadata(storage, &target_meta)?;

// 6. Mark source as merged
if options.delete_branch_after {
    let mut source_meta = source.clone();
    source_meta.state = BranchState::Merged {
        into_branch: target.branch_id,
        at_timestamp: storage.current_timestamp(),
    };
    save_branch_metadata(storage, &source_meta)?;
}

Ok(MergeResult {
    conflicts_count: conflicts.len(),
    keys_merged: 0, // TODO: track
    resolution: options.conflict_resolution,
})

} ```

6.2 Branch-Aware Transaction API

rust impl StorageEngine { /// Begin transaction on a specific branch pub fn begin_branch_transaction( &self, branch_name: &str, ) -> Result<BranchTransaction> { let branch_mgr = BranchManager::new(Arc::clone(&self.db))?; let branch = branch_mgr.get_branch(branch_name)?; // Create MVCC snapshot let snapshot_id = self.next_timestamp(); let tx = Transaction::new(Arc::clone(&self.db), snapshot_id)?; // Build parent chain for reads let parent_chain = build_parent_chain(&branch)?; Ok(BranchTransaction { tx, branch_id: branch.branch_id, branch_meta: branch, parent_chain, }) } } impl BranchTransaction { /// Get value with branch hierarchy resolution pub fn get(&self, key: &Key) -> Result<Option<Vec<u8>>> { // Try current branch let branch_key = encode_branch_data_key( self.branch_id, &String::from_utf8_lossy(key), self.tx.snapshot_id(), ); if let Some(value) = self.tx.get(&branch_key)? { return Ok(Some(value)); } // Walk parent chain for (parent_id, parent_snapshot) in &self.parent_chain { let parent_key = encode_branch_data_key( *parent_id, &String::from_utf8_lossy(key), *parent_snapshot, ); if let Some(value) = self.tx.get(&parent_key)? { return Ok(Some(value)); } } Ok(None) } /// Put value with copy-on-write pub fn put(&mut self, key: Key, value: Vec<u8>) -> Result<()> { let branch_key = encode_branch_data_key( self.branch_id, &String::from_utf8_lossy(&key), self.tx.snapshot_id(), ); self.tx.put(branch_key, value) } }


7. Branch Isolation

7.1 Isolation Guarantees

Each branch provides snapshot isolation combined with branch isolation:

Isolation Properties:
1. Read Isolation:    Reads in branch A don't see uncommitted changes in branch B
2. Write Isolation:   Writes in branch A don't affect branch B
3. MVCC Snapshots:    Transactions see consistent snapshot within a branch
4. Parent Visibility: Child branches see parent state at branch point
5. No Phantoms:       New branches don't affect existing transactions

7.2 Visibility Rules

/// Check if a version is visible to a branch transaction
fn is_version_visible(
    version_branch: BranchId,
    version_timestamp: Timestamp,
    tx_branch: BranchId,
    tx_snapshot: SnapshotId,
    branch_hierarchy: &[(BranchId, SnapshotId)],
) -> bool {
    // 1. Same branch: standard MVCC visibility
    if version_branch == tx_branch {
        return version_timestamp <= tx_snapshot;
    }

    // 2. Check if version is in parent chain
    for (parent_id, parent_snapshot) in branch_hierarchy {
        if version_branch == *parent_id {
            // Version must be before parent snapshot
            return version_timestamp <= *parent_snapshot;
        }
    }

    // 3. Version is in unrelated branch - not visible
    false
}

7.3 Isolation Example

-- Time T0: Create branches
CREATE DATABASE BRANCH dev FROM main AS OF NOW;
CREATE DATABASE BRANCH staging FROM main AS OF NOW;

-- Time T1: Insert in main
INSERT INTO users (name) VALUES ('Alice');  -- main only

-- Time T2: Switch to dev
SET branch = dev;
SELECT * FROM users;  -- Empty (dev branched before insert)

-- Time T3: Insert in dev
INSERT INTO users (name) VALUES ('Bob');    -- dev only

-- Time T4: Switch to staging
SET branch = staging;
SELECT * FROM users;  -- Empty (isolated from dev)

-- Time T5: Switch to main
SET branch = main;
SELECT * FROM users;  -- Returns 'Alice' (isolated from dev)

7.4 Concurrent Operations

Multiple transactions can operate on different branches concurrently:

Timeline:
T0: TxA begins on main
T1: TxB begins on dev
T2: TxA writes key1='v1' on main
T3: TxB writes key1='v2' on dev
T4: TxA commits (main:key1='v1')
T5: TxB commits (dev:key1='v2')

Result:
main:  key1='v1'  (TxA's write)
dev:   key1='v2'  (TxB's write)

No conflict - branches are isolated

8. Merge Strategies

8.1 Three-Way Merge Algorithm

/// Perform three-way merge
fn perform_three_way_merge(
    storage: &StorageEngine,
    base: SnapshotId,
    source_branch: BranchId,
    target_branch: BranchId,
    options: &MergeOptions,
) -> Result<Vec<MergeConflict>> {
    let mut conflicts = Vec::new();

    // 1. Find all keys modified in source since base
    let source_changes = find_changes_since(storage, source_branch, base)?;

    // 2. Find all keys modified in target since base
    let target_changes = find_changes_since(storage, target_branch, base)?;

    // 3. For each changed key, determine merge action
    let all_keys: HashSet<_> = source_changes.keys()
        .chain(target_changes.keys())
        .collect();

    for key in all_keys {
        let base_value = get_at_snapshot(storage, key, base)?;
        let source_value = source_changes.get(key);
        let target_value = target_changes.get(key);

        match (source_value, target_value) {
            // Only source changed - apply source
            (Some(sv), None) => {
                apply_to_target(storage, target_branch, key, sv.clone())?;
            }

            // Only target changed - keep target (already applied)
            (None, Some(_)) => {
                // No action needed
            }

            // Both changed - potential conflict
            (Some(sv), Some(tv)) => {
                if sv == tv {
                    // Same change - no conflict
                    continue;
                }

                // Conflict detected
                conflicts.push(MergeConflict {
                    key: key.clone(),
                    base_value: base_value.clone(),
                    source_value: sv.clone(),
                    target_value: tv.clone(),
                });

                // Apply resolution strategy
                match options.conflict_resolution {
                    ConflictResolution::BranchWins => {
                        apply_to_target(storage, target_branch, key, sv.clone())?;
                    }
                    ConflictResolution::TargetWins => {
                        // Keep target value (no action)
                    }
                    ConflictResolution::Fail => {
                        // Conflicts will be returned
                    }
                }
            }

            // Neither changed - no action
            (None, None) => {}
        }
    }

    Ok(conflicts)
}

8.2 Merge Conflict Representation

/// Merge conflict information
#[derive(Debug, Clone)]
pub struct MergeConflict {
    /// Conflicting key
    pub key: String,

    /// Value at merge base
    pub base_value: Option<Vec<u8>>,

    /// Value in source branch
    pub source_value: Option<Vec<u8>>,

    /// Value in target branch
    pub target_value: Option<Vec<u8>>,
}

/// Merge result
#[derive(Debug)]
pub struct MergeResult {
    /// Number of conflicts detected
    pub conflicts_count: usize,

    /// Number of keys merged
    pub keys_merged: usize,

    /// Resolution strategy used
    pub resolution: ConflictResolution,
}

/// Merge options
#[derive(Debug, Clone)]
pub struct MergeOptions {
    /// Conflict resolution strategy
    pub conflict_resolution: ConflictResolution,

    /// Delete source branch after merge
    pub delete_branch_after: bool,
}

8.3 Conflict Resolution Strategies

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConflictResolution {
    /// Source branch wins (default)
    BranchWins,

    /// Target branch wins
    TargetWins,

    /// Fail on conflict (manual resolution required)
    Fail,
}

8.4 Finding Merge Base

/// Find common ancestor (merge base) of two branches
fn find_merge_base(
    storage: &StorageEngine,
    branch_a: BranchId,
    branch_b: BranchId,
) -> Result<SnapshotId> {
    // Build ancestry chains
    let ancestors_a = build_ancestry_chain(storage, branch_a)?;
    let ancestors_b = build_ancestry_chain(storage, branch_b)?;

    // Find lowest common ancestor (LCA)
    for (id_a, snapshot_a) in &ancestors_a {
        for (id_b, snapshot_b) in &ancestors_b {
            if id_a == id_b {
                // Found common ancestor
                // Use earlier snapshot as merge base
                return Ok(std::cmp::min(*snapshot_a, *snapshot_b));
            }
        }
    }

    // No common ancestor - use epoch (snapshot 0)
    Ok(0)
}

/// Build ancestry chain from branch to root
fn build_ancestry_chain(
    storage: &StorageEngine,
    branch_id: BranchId,
) -> Result<Vec<(BranchId, SnapshotId)>> {
    let mut chain = Vec::new();
    let mut current_id = branch_id;

    loop {
        let metadata = get_branch_metadata(storage, current_id)?;
        chain.push((current_id, metadata.created_from_snapshot));

        match metadata.parent_id {
            Some(parent) => current_id = parent,
            None => break, // Reached root
        }
    }

    Ok(chain)
}

9. Garbage Collection

9.1 GC Strategy

Garbage collection removes unreachable versions:

GC Targets:
1. Dropped branches (soft-deleted)
2. Merged branches (optionally)
3. Old versions no longer visible to any branch
4. Compacted MVCC versions

9.2 Reference Counting

Track references to each version:

/// Version reference tracking
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionRefs {
    /// Version key
    pub key: String,

    /// Branches referencing this version
    pub branches: HashSet<BranchId>,

    /// Active snapshots referencing this version
    pub snapshots: HashSet<SnapshotId>,
}

/// Check if version is reachable
fn is_version_reachable(
    storage: &StorageEngine,
    version_key: &str,
) -> Result<bool> {
    let refs_key = format!("gc:refs:{}", version_key);
    let refs_data = storage.get(refs_key.as_bytes())?;

    match refs_data {
        None => Ok(false), // No refs = unreachable
        Some(data) => {
            let refs: VersionRefs = bincode::deserialize(&data)?;

            // Check if any referencing branch is still active
            for branch_id in &refs.branches {
                let meta = get_branch_metadata(storage, *branch_id)?;
                if meta.state == BranchState::Active {
                    return Ok(true); // Reachable
                }
            }

            // Check if any snapshot is still active
            // TODO: Track active snapshots

            Ok(false) // Unreachable
        }
    }
}

9.3 GC Process

/// Run garbage collection
pub fn run_gc(storage: &StorageEngine) -> Result<GcStats> {
    let mut stats = GcStats::default();

    // 1. Find all dropped branches
    let dropped_branches = find_dropped_branches(storage)?;

    for branch_id in dropped_branches {
        // 2. Find all versions owned by this branch
        let versions = find_branch_versions(storage, branch_id)?;

        for version_key in versions {
            // 3. Check if version is reachable from other branches
            if !is_version_reachable(storage, &version_key)? {
                // 4. Delete unreachable version
                storage.delete(version_key.as_bytes())?;
                stats.versions_deleted += 1;
                stats.bytes_freed += version_key.len() as u64;
            }
        }

        // 5. Delete branch metadata
        delete_branch_metadata(storage, branch_id)?;
        stats.branches_deleted += 1;
    }

    // 6. Compact RocksDB to reclaim space
    storage.db.compact_range(None::<&[u8]>, None::<&[u8]>);

    Ok(stats)
}

#[derive(Debug, Default)]
pub struct GcStats {
    pub branches_deleted: usize,
    pub versions_deleted: usize,
    pub bytes_freed: u64,
}

9.4 GC Scheduling

/// Background GC configuration
pub struct GcConfig {
    /// Enable automatic GC
    pub enabled: bool,

    /// GC interval (seconds)
    pub interval_secs: u64,

    /// Minimum age before GC (seconds)
    pub min_age_secs: u64,
}

impl Default for GcConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            interval_secs: 3600, // 1 hour
            min_age_secs: 86400, // 24 hours
        }
    }
}

/// Start background GC thread
pub fn start_gc_thread(
    storage: Arc<StorageEngine>,
    config: GcConfig,
) -> std::thread::JoinHandle<()> {
    std::thread::spawn(move || {
        loop {
            std::thread::sleep(Duration::from_secs(config.interval_secs));

            if config.enabled {
                match run_gc(&storage) {
                    Ok(stats) => {
                        log::info!("GC completed: {:?}", stats);
                    }
                    Err(e) => {
                        log::error!("GC failed: {}", e);
                    }
                }
            }
        }
    })
}

10. MVCC Integration

10.1 Branch-Aware MVCC

Branches integrate with existing MVCC by extending snapshot visibility:

/// Extended snapshot for branch-aware transactions
#[derive(Debug, Clone)]
pub struct BranchSnapshot {
    /// Base MVCC snapshot
    pub base_snapshot: Snapshot,

    /// Branch context
    pub branch_id: BranchId,

    /// Parent chain (for visibility checks)
    pub parent_chain: Vec<(BranchId, SnapshotId)>,
}

impl BranchSnapshot {
    /// Check if version is visible
    pub fn is_visible(
        &self,
        version_branch: BranchId,
        version_timestamp: u64,
    ) -> bool {
        // Current branch: standard MVCC
        if version_branch == self.branch_id {
            return self.base_snapshot.is_visible(version_timestamp);
        }

        // Parent chain: check against parent snapshot
        for (parent_id, parent_snapshot) in &self.parent_chain {
            if version_branch == *parent_id {
                return version_timestamp <= *parent_snapshot;
            }
        }

        false
    }
}

10.2 Transaction Isolation Levels

Branches maintain standard isolation levels:

#[derive(Debug, Clone, Copy)]
pub enum IsolationLevel {
    /// Read committed (not supported yet)
    ReadCommitted,

    /// Snapshot isolation (default)
    SnapshotIsolation,

    /// Serializable (not supported yet)
    Serializable,
}

impl BranchTransaction {
    /// Begin with specific isolation level
    pub fn with_isolation_level(
        storage: &StorageEngine,
        branch_name: &str,
        level: IsolationLevel,
    ) -> Result<Self> {
        match level {
            IsolationLevel::SnapshotIsolation => {
                storage.begin_branch_transaction(branch_name)
            }
            _ => {
                Err(Error::unsupported_isolation_level(level))
            }
        }
    }
}

10.3 Timestamp Allocation

Branch transactions use global timestamp ordering:

impl StorageEngine {
    /// Get next timestamp (atomic increment)
    pub fn next_timestamp(&self) -> Timestamp {
        let mut ts = self.timestamp.write();
        *ts += 1;
        *ts
    }

    /// Get current timestamp (read-only)
    pub fn current_timestamp(&self) -> Timestamp {
        *self.timestamp.read()
    }
}

10.4 Commit Protocol

impl BranchTransaction {
    /// Commit with validation
    pub fn commit(mut self) -> Result<()> {
        // 1. Validate no conflicts with concurrent commits
        self.validate_no_conflicts()?;

        // 2. Assign commit timestamp
        let commit_ts = self.storage.next_timestamp();

        // 3. Write all buffered changes with commit timestamp
        for (key, value) in &self.write_set {
            let versioned_key = encode_branch_data_key(
                self.branch_id,
                &String::from_utf8_lossy(key),
                commit_ts,
            );

            self.tx.put(versioned_key, value.clone())?;
        }

        // 4. Commit underlying transaction
        self.tx.commit()?;

        // 5. Update branch statistics
        self.update_branch_stats()?;

        Ok(())
    }

    fn validate_no_conflicts(&self) -> Result<()> {
        // Check for write-write conflicts
        // (Simplified - full implementation would track write sets)
        Ok(())
    }
}

11. Performance Considerations

11.1 Performance Targets

Operation                    Target Latency      Target Throughput
────────────────────────────────────────────────────────────────
CREATE BRANCH               < 10ms              1000 ops/sec
DROP BRANCH                 < 50ms              200 ops/sec
MERGE BRANCH (small)        < 100ms             10 ops/sec
MERGE BRANCH (large)        < 10s               1 op/10sec

Branch Read (hit)           < 0.1ms overhead    Same as non-branch
Branch Read (parent)        < 0.5ms overhead    90% of non-branch
Branch Write (CoW)          < 0.2ms overhead    95% of non-branch

Storage Overhead            < 2% per branch     -
Memory Overhead             < 100KB per branch  -

11.2 Optimization Techniques

11.2.1 Metadata Caching

```rust /// Branch metadata cache pub struct BranchMetadataCache { /// LRU cache of branch metadata cache: Arc>>,

/// Cache size
capacity: usize,

}