Branch Storage Architecture - Part 2 of 3¶
API & Implementation¶
Navigation: Index | ← Part 1 | Part 2 | Part 3 →
// 2. Find merge base (common ancestor)
let merge_base = find_merge_base(storage, source.branch_id, target.branch_id)?;
// 3. Three-way merge: base -> source, base -> target
let conflicts = perform_three_way_merge(
storage,
merge_base,
source.branch_id,
target.branch_id,
&options,
)?;
// 4. Handle conflicts based on resolution strategy
if !conflicts.is_empty() {
match options.conflict_resolution {
ConflictResolution::Fail => {
return Err(Error::merge_conflicts(conflicts));
}
ConflictResolution::BranchWins => {
// Source wins - already applied
}
ConflictResolution::TargetWins => {
// Revert to target versions
revert_conflicts_to_target(storage, &conflicts)?;
}
}
}
// 5. Update target's merge_base
let mut target_meta = target.clone();
target_meta.merge_base = Some(source.created_from_snapshot);
save_branch_metadata(storage, &target_meta)?;
// 6. Mark source as merged
if options.delete_branch_after {
let mut source_meta = source.clone();
source_meta.state = BranchState::Merged {
into_branch: target.branch_id,
at_timestamp: storage.current_timestamp(),
};
save_branch_metadata(storage, &source_meta)?;
}
Ok(MergeResult {
conflicts_count: conflicts.len(),
keys_merged: 0, // TODO: track
resolution: options.conflict_resolution,
})
} ```
6.2 Branch-Aware Transaction API¶
rust
impl StorageEngine {
/// Begin transaction on a specific branch
pub fn begin_branch_transaction(
&self,
branch_name: &str,
) -> Result<BranchTransaction> {
let branch_mgr = BranchManager::new(Arc::clone(&self.db))?;
let branch = branch_mgr.get_branch(branch_name)?;
// Create MVCC snapshot
let snapshot_id = self.next_timestamp();
let tx = Transaction::new(Arc::clone(&self.db), snapshot_id)?;
// Build parent chain for reads
let parent_chain = build_parent_chain(&branch)?;
Ok(BranchTransaction {
tx,
branch_id: branch.branch_id,
branch_meta: branch,
parent_chain,
})
}
}
impl BranchTransaction {
/// Get value with branch hierarchy resolution
pub fn get(&self, key: &Key) -> Result<Option<Vec<u8>>> {
// Try current branch
let branch_key = encode_branch_data_key(
self.branch_id,
&String::from_utf8_lossy(key),
self.tx.snapshot_id(),
);
if let Some(value) = self.tx.get(&branch_key)? {
return Ok(Some(value));
}
// Walk parent chain
for (parent_id, parent_snapshot) in &self.parent_chain {
let parent_key = encode_branch_data_key(
*parent_id,
&String::from_utf8_lossy(key),
*parent_snapshot,
);
if let Some(value) = self.tx.get(&parent_key)? {
return Ok(Some(value));
}
}
Ok(None)
}
/// Put value with copy-on-write
pub fn put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
let branch_key = encode_branch_data_key(
self.branch_id,
&String::from_utf8_lossy(&key),
self.tx.snapshot_id(),
);
self.tx.put(branch_key, value)
}
}
7. Branch Isolation¶
7.1 Isolation Guarantees¶
Each branch provides snapshot isolation combined with branch isolation:
Isolation Properties:
1. Read Isolation: Reads in branch A don't see uncommitted changes in branch B
2. Write Isolation: Writes in branch A don't affect branch B
3. MVCC Snapshots: Transactions see consistent snapshot within a branch
4. Parent Visibility: Child branches see parent state at branch point
5. No Phantoms: New branches don't affect existing transactions
7.2 Visibility Rules¶
/// Check if a version is visible to a branch transaction
fn is_version_visible(
version_branch: BranchId,
version_timestamp: Timestamp,
tx_branch: BranchId,
tx_snapshot: SnapshotId,
branch_hierarchy: &[(BranchId, SnapshotId)],
) -> bool {
// 1. Same branch: standard MVCC visibility
if version_branch == tx_branch {
return version_timestamp <= tx_snapshot;
}
// 2. Check if version is in parent chain
for (parent_id, parent_snapshot) in branch_hierarchy {
if version_branch == *parent_id {
// Version must be before parent snapshot
return version_timestamp <= *parent_snapshot;
}
}
// 3. Version is in unrelated branch - not visible
false
}
7.3 Isolation Example¶
-- Time T0: Create branches
CREATE DATABASE BRANCH dev FROM main AS OF NOW;
CREATE DATABASE BRANCH staging FROM main AS OF NOW;
-- Time T1: Insert in main
INSERT INTO users (name) VALUES ('Alice'); -- main only
-- Time T2: Switch to dev
SET branch = dev;
SELECT * FROM users; -- Empty (dev branched before insert)
-- Time T3: Insert in dev
INSERT INTO users (name) VALUES ('Bob'); -- dev only
-- Time T4: Switch to staging
SET branch = staging;
SELECT * FROM users; -- Empty (isolated from dev)
-- Time T5: Switch to main
SET branch = main;
SELECT * FROM users; -- Returns 'Alice' (isolated from dev)
7.4 Concurrent Operations¶
Multiple transactions can operate on different branches concurrently:
Timeline:
T0: TxA begins on main
T1: TxB begins on dev
T2: TxA writes key1='v1' on main
T3: TxB writes key1='v2' on dev
T4: TxA commits (main:key1='v1')
T5: TxB commits (dev:key1='v2')
Result:
main: key1='v1' (TxA's write)
dev: key1='v2' (TxB's write)
No conflict - branches are isolated
8. Merge Strategies¶
8.1 Three-Way Merge Algorithm¶
/// Perform three-way merge
fn perform_three_way_merge(
storage: &StorageEngine,
base: SnapshotId,
source_branch: BranchId,
target_branch: BranchId,
options: &MergeOptions,
) -> Result<Vec<MergeConflict>> {
let mut conflicts = Vec::new();
// 1. Find all keys modified in source since base
let source_changes = find_changes_since(storage, source_branch, base)?;
// 2. Find all keys modified in target since base
let target_changes = find_changes_since(storage, target_branch, base)?;
// 3. For each changed key, determine merge action
let all_keys: HashSet<_> = source_changes.keys()
.chain(target_changes.keys())
.collect();
for key in all_keys {
let base_value = get_at_snapshot(storage, key, base)?;
let source_value = source_changes.get(key);
let target_value = target_changes.get(key);
match (source_value, target_value) {
// Only source changed - apply source
(Some(sv), None) => {
apply_to_target(storage, target_branch, key, sv.clone())?;
}
// Only target changed - keep target (already applied)
(None, Some(_)) => {
// No action needed
}
// Both changed - potential conflict
(Some(sv), Some(tv)) => {
if sv == tv {
// Same change - no conflict
continue;
}
// Conflict detected
conflicts.push(MergeConflict {
key: key.clone(),
base_value: base_value.clone(),
source_value: sv.clone(),
target_value: tv.clone(),
});
// Apply resolution strategy
match options.conflict_resolution {
ConflictResolution::BranchWins => {
apply_to_target(storage, target_branch, key, sv.clone())?;
}
ConflictResolution::TargetWins => {
// Keep target value (no action)
}
ConflictResolution::Fail => {
// Conflicts will be returned
}
}
}
// Neither changed - no action
(None, None) => {}
}
}
Ok(conflicts)
}
8.2 Merge Conflict Representation¶
/// Merge conflict information
#[derive(Debug, Clone)]
pub struct MergeConflict {
/// Conflicting key
pub key: String,
/// Value at merge base
pub base_value: Option<Vec<u8>>,
/// Value in source branch
pub source_value: Option<Vec<u8>>,
/// Value in target branch
pub target_value: Option<Vec<u8>>,
}
/// Merge result
#[derive(Debug)]
pub struct MergeResult {
/// Number of conflicts detected
pub conflicts_count: usize,
/// Number of keys merged
pub keys_merged: usize,
/// Resolution strategy used
pub resolution: ConflictResolution,
}
/// Merge options
#[derive(Debug, Clone)]
pub struct MergeOptions {
/// Conflict resolution strategy
pub conflict_resolution: ConflictResolution,
/// Delete source branch after merge
pub delete_branch_after: bool,
}
8.3 Conflict Resolution Strategies¶
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConflictResolution {
/// Source branch wins (default)
BranchWins,
/// Target branch wins
TargetWins,
/// Fail on conflict (manual resolution required)
Fail,
}
8.4 Finding Merge Base¶
/// Find common ancestor (merge base) of two branches
fn find_merge_base(
storage: &StorageEngine,
branch_a: BranchId,
branch_b: BranchId,
) -> Result<SnapshotId> {
// Build ancestry chains
let ancestors_a = build_ancestry_chain(storage, branch_a)?;
let ancestors_b = build_ancestry_chain(storage, branch_b)?;
// Find lowest common ancestor (LCA)
for (id_a, snapshot_a) in &ancestors_a {
for (id_b, snapshot_b) in &ancestors_b {
if id_a == id_b {
// Found common ancestor
// Use earlier snapshot as merge base
return Ok(std::cmp::min(*snapshot_a, *snapshot_b));
}
}
}
// No common ancestor - use epoch (snapshot 0)
Ok(0)
}
/// Build ancestry chain from branch to root
fn build_ancestry_chain(
storage: &StorageEngine,
branch_id: BranchId,
) -> Result<Vec<(BranchId, SnapshotId)>> {
let mut chain = Vec::new();
let mut current_id = branch_id;
loop {
let metadata = get_branch_metadata(storage, current_id)?;
chain.push((current_id, metadata.created_from_snapshot));
match metadata.parent_id {
Some(parent) => current_id = parent,
None => break, // Reached root
}
}
Ok(chain)
}
9. Garbage Collection¶
9.1 GC Strategy¶
Garbage collection removes unreachable versions:
GC Targets:
1. Dropped branches (soft-deleted)
2. Merged branches (optionally)
3. Old versions no longer visible to any branch
4. Compacted MVCC versions
9.2 Reference Counting¶
Track references to each version:
/// Version reference tracking
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionRefs {
/// Version key
pub key: String,
/// Branches referencing this version
pub branches: HashSet<BranchId>,
/// Active snapshots referencing this version
pub snapshots: HashSet<SnapshotId>,
}
/// Check if version is reachable
fn is_version_reachable(
storage: &StorageEngine,
version_key: &str,
) -> Result<bool> {
let refs_key = format!("gc:refs:{}", version_key);
let refs_data = storage.get(refs_key.as_bytes())?;
match refs_data {
None => Ok(false), // No refs = unreachable
Some(data) => {
let refs: VersionRefs = bincode::deserialize(&data)?;
// Check if any referencing branch is still active
for branch_id in &refs.branches {
let meta = get_branch_metadata(storage, *branch_id)?;
if meta.state == BranchState::Active {
return Ok(true); // Reachable
}
}
// Check if any snapshot is still active
// TODO: Track active snapshots
Ok(false) // Unreachable
}
}
}
9.3 GC Process¶
/// Run garbage collection
pub fn run_gc(storage: &StorageEngine) -> Result<GcStats> {
let mut stats = GcStats::default();
// 1. Find all dropped branches
let dropped_branches = find_dropped_branches(storage)?;
for branch_id in dropped_branches {
// 2. Find all versions owned by this branch
let versions = find_branch_versions(storage, branch_id)?;
for version_key in versions {
// 3. Check if version is reachable from other branches
if !is_version_reachable(storage, &version_key)? {
// 4. Delete unreachable version
storage.delete(version_key.as_bytes())?;
stats.versions_deleted += 1;
stats.bytes_freed += version_key.len() as u64;
}
}
// 5. Delete branch metadata
delete_branch_metadata(storage, branch_id)?;
stats.branches_deleted += 1;
}
// 6. Compact RocksDB to reclaim space
storage.db.compact_range(None::<&[u8]>, None::<&[u8]>);
Ok(stats)
}
#[derive(Debug, Default)]
pub struct GcStats {
pub branches_deleted: usize,
pub versions_deleted: usize,
pub bytes_freed: u64,
}
9.4 GC Scheduling¶
/// Background GC configuration
pub struct GcConfig {
/// Enable automatic GC
pub enabled: bool,
/// GC interval (seconds)
pub interval_secs: u64,
/// Minimum age before GC (seconds)
pub min_age_secs: u64,
}
impl Default for GcConfig {
fn default() -> Self {
Self {
enabled: true,
interval_secs: 3600, // 1 hour
min_age_secs: 86400, // 24 hours
}
}
}
/// Start background GC thread
pub fn start_gc_thread(
storage: Arc<StorageEngine>,
config: GcConfig,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
loop {
std::thread::sleep(Duration::from_secs(config.interval_secs));
if config.enabled {
match run_gc(&storage) {
Ok(stats) => {
log::info!("GC completed: {:?}", stats);
}
Err(e) => {
log::error!("GC failed: {}", e);
}
}
}
}
})
}
10. MVCC Integration¶
10.1 Branch-Aware MVCC¶
Branches integrate with existing MVCC by extending snapshot visibility:
/// Extended snapshot for branch-aware transactions
#[derive(Debug, Clone)]
pub struct BranchSnapshot {
/// Base MVCC snapshot
pub base_snapshot: Snapshot,
/// Branch context
pub branch_id: BranchId,
/// Parent chain (for visibility checks)
pub parent_chain: Vec<(BranchId, SnapshotId)>,
}
impl BranchSnapshot {
/// Check if version is visible
pub fn is_visible(
&self,
version_branch: BranchId,
version_timestamp: u64,
) -> bool {
// Current branch: standard MVCC
if version_branch == self.branch_id {
return self.base_snapshot.is_visible(version_timestamp);
}
// Parent chain: check against parent snapshot
for (parent_id, parent_snapshot) in &self.parent_chain {
if version_branch == *parent_id {
return version_timestamp <= *parent_snapshot;
}
}
false
}
}
10.2 Transaction Isolation Levels¶
Branches maintain standard isolation levels:
#[derive(Debug, Clone, Copy)]
pub enum IsolationLevel {
/// Read committed (not supported yet)
ReadCommitted,
/// Snapshot isolation (default)
SnapshotIsolation,
/// Serializable (not supported yet)
Serializable,
}
impl BranchTransaction {
/// Begin with specific isolation level
pub fn with_isolation_level(
storage: &StorageEngine,
branch_name: &str,
level: IsolationLevel,
) -> Result<Self> {
match level {
IsolationLevel::SnapshotIsolation => {
storage.begin_branch_transaction(branch_name)
}
_ => {
Err(Error::unsupported_isolation_level(level))
}
}
}
}
10.3 Timestamp Allocation¶
Branch transactions use global timestamp ordering:
impl StorageEngine {
/// Get next timestamp (atomic increment)
pub fn next_timestamp(&self) -> Timestamp {
let mut ts = self.timestamp.write();
*ts += 1;
*ts
}
/// Get current timestamp (read-only)
pub fn current_timestamp(&self) -> Timestamp {
*self.timestamp.read()
}
}
10.4 Commit Protocol¶
impl BranchTransaction {
/// Commit with validation
pub fn commit(mut self) -> Result<()> {
// 1. Validate no conflicts with concurrent commits
self.validate_no_conflicts()?;
// 2. Assign commit timestamp
let commit_ts = self.storage.next_timestamp();
// 3. Write all buffered changes with commit timestamp
for (key, value) in &self.write_set {
let versioned_key = encode_branch_data_key(
self.branch_id,
&String::from_utf8_lossy(key),
commit_ts,
);
self.tx.put(versioned_key, value.clone())?;
}
// 4. Commit underlying transaction
self.tx.commit()?;
// 5. Update branch statistics
self.update_branch_stats()?;
Ok(())
}
fn validate_no_conflicts(&self) -> Result<()> {
// Check for write-write conflicts
// (Simplified - full implementation would track write sets)
Ok(())
}
}
11. Performance Considerations¶
11.1 Performance Targets¶
Operation Target Latency Target Throughput
────────────────────────────────────────────────────────────────
CREATE BRANCH < 10ms 1000 ops/sec
DROP BRANCH < 50ms 200 ops/sec
MERGE BRANCH (small) < 100ms 10 ops/sec
MERGE BRANCH (large) < 10s 1 op/10sec
Branch Read (hit) < 0.1ms overhead Same as non-branch
Branch Read (parent) < 0.5ms overhead 90% of non-branch
Branch Write (CoW) < 0.2ms overhead 95% of non-branch
Storage Overhead < 2% per branch -
Memory Overhead < 100KB per branch -
11.2 Optimization Techniques¶
11.2.1 Metadata Caching¶
```rust
/// Branch metadata cache
pub struct BranchMetadataCache {
/// LRU cache of branch metadata
cache: Arc
/// Cache size
capacity: usize,
}