Branch Storage Architecture - Part 3 of 3¶
Operations & Testing¶
Navigation: Index | ← Part 2 | Part 3
impl BranchMetadataCache { pub fn new(capacity: usize) -> Self { Self { cache: Arc::new(RwLock::new(LruCache::new(capacity))), capacity, } }
pub fn get(&self, branch_id: BranchId) -> Option<BranchMetadata> {
self.cache.write().get(&branch_id).cloned()
}
pub fn put(&self, metadata: BranchMetadata) {
self.cache.write().put(metadata.branch_id, metadata);
}
} ```
11.2.2 Parent Chain Caching¶
rust
impl BranchTransaction {
/// Build and cache parent chain
fn build_parent_chain(
branch: &BranchMetadata,
storage: &StorageEngine,
) -> Result<Vec<(BranchId, SnapshotId)>> {
let mut chain = Vec::new();
let mut current_id = branch.parent_id;
while let Some(parent_id) = current_id {
let parent = get_branch_metadata(storage, parent_id)?;
chain.push((parent_id, parent.created_from_snapshot));
current_id = parent.parent_id;
}
Ok(chain)
}
}
11.2.3 Bloom Filters for Modified Keys¶
/// Track modified keys per branch with bloom filter
pub struct BranchBloomFilter {
/// Bloom filter for modified keys
filter: BloomFilter,
/// False positive rate
fpr: f64,
}
impl BranchTransaction {
/// Check if key might be modified in this branch
fn key_might_exist_in_branch(&self, key: &str) -> bool {
if let Some(filter) = &self.bloom_filter {
filter.contains(key)
} else {
true // Conservative: assume might exist
}
}
pub fn get(&self, key: &Key) -> Result<Option<Vec<u8>>> {
// Fast path: check bloom filter
let key_str = String::from_utf8_lossy(key);
if !self.key_might_exist_in_branch(&key_str) {
// Definitely not in this branch - go to parent
return self.get_from_parent_chain(key);
}
// Slow path: actual lookup
// ... rest of implementation
}
}
11.2.4 Lazy Parent Reads¶
/// Lazy iterator over parent chain
struct ParentChainIterator<'a> {
storage: &'a StorageEngine,
current_branch: Option<BranchId>,
current_snapshot: SnapshotId,
}
impl<'a> Iterator for ParentChainIterator<'a> {
type Item = (BranchId, SnapshotId);
fn next(&mut self) -> Option<Self::Item> {
let branch_id = self.current_branch?;
let snapshot = self.current_snapshot;
// Load next parent lazily
if let Ok(metadata) = get_branch_metadata(self.storage, branch_id) {
self.current_branch = metadata.parent_id;
self.current_snapshot = metadata.created_from_snapshot;
} else {
self.current_branch = None;
}
Some((branch_id, snapshot))
}
}
11.3 Space Efficiency¶
Storage Breakdown (per branch):
────────────────────────────────────────────
Metadata: ~500 bytes
Registry entry: ~50 bytes
Parent/child index: ~100 bytes
Bloom filter: ~10KB (optional)
────────────────────────────────────────────
Total overhead: ~11KB per branch
Data storage: Only modified data
(copy-on-write)
Example:
- 1GB database
- 5 branches
- 10% data modified per branch
Total storage:
- Original: 1GB
- Branches: 5 × (11KB + 100MB) ≈ 500MB
- Total: 1.5GB (50% overhead)
- Per branch: 100MB / 1GB = 10% (as expected)
11.4 Compaction Impact¶
/// Custom compaction filter for branch GC
struct BranchCompactionFilter {
active_branches: HashSet<BranchId>,
}
impl rocksdb::CompactionFilter for BranchCompactionFilter {
fn filter(&mut self, _level: u32, key: &[u8], _value: &[u8]) -> bool {
// Parse branch ID from key
if let Some(branch_id) = extract_branch_id(key) {
// Keep if branch is active
return !self.active_branches.contains(&branch_id);
}
false // Keep non-branch keys
}
}
12. Testing Strategy¶
12.1 Unit Tests¶
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_create_branch_metadata_only() {
let config = Config::in_memory();
let storage = StorageEngine::open_in_memory(&config).unwrap();
// Create branch
let branch_id = create_branch(
&storage,
"dev",
None,
AsOfClause::Now,
BranchOptions::default(),
).unwrap();
// Verify metadata exists
let metadata = get_branch_metadata(&storage, branch_id).unwrap();
assert_eq!(metadata.name, "dev");
assert_eq!(metadata.state, BranchState::Active);
}
#[test]
fn test_copy_on_write_first_write() {
let config = Config::in_memory();
let storage = StorageEngine::open_in_memory(&config).unwrap();
// Insert in main
storage.put(b"key1", b"value1").unwrap();
// Create branch
create_branch(&storage, "dev", None, AsOfClause::Now, Default::default()).unwrap();
// Read from dev (should see main's value)
let mut tx = storage.begin_branch_transaction("dev").unwrap();
assert_eq!(tx.get(b"key1").unwrap(), Some(b"value1".to_vec()));
// Write to dev (copy-on-write)
tx.put(b"key1".to_vec(), b"value2".to_vec()).unwrap();
tx.commit().unwrap();
// Verify isolation
assert_eq!(
storage.get(b"key1").unwrap(),
Some(b"value1".to_vec())
); // main unchanged
let tx_dev = storage.begin_branch_transaction("dev").unwrap();
assert_eq!(
tx_dev.get(b"key1").unwrap(),
Some(b"value2".to_vec())
); // dev has new value
}
#[test]
fn test_branch_isolation() {
let config = Config::in_memory();
let storage = StorageEngine::open_in_memory(&config).unwrap();
// Create two branches
create_branch(&storage, "branch_a", None, AsOfClause::Now, Default::default()).unwrap();
create_branch(&storage, "branch_b", None, AsOfClause::Now, Default::default()).unwrap();
// Write to branch_a
let mut tx_a = storage.begin_branch_transaction("branch_a").unwrap();
tx_a.put(b"key1".to_vec(), b"value_a".to_vec()).unwrap();
tx_a.commit().unwrap();
// Write to branch_b
let mut tx_b = storage.begin_branch_transaction("branch_b").unwrap();
tx_b.put(b"key1".to_vec(), b"value_b".to_vec()).unwrap();
tx_b.commit().unwrap();
// Verify isolation
let tx_a = storage.begin_branch_transaction("branch_a").unwrap();
assert_eq!(tx_a.get(b"key1").unwrap(), Some(b"value_a".to_vec()));
let tx_b = storage.begin_branch_transaction("branch_b").unwrap();
assert_eq!(tx_b.get(b"key1").unwrap(), Some(b"value_b".to_vec()));
}
#[test]
fn test_parent_chain_resolution() {
let config = Config::in_memory();
let storage = StorageEngine::open_in_memory(&config).unwrap();
// Insert in main
storage.put(b"key1", b"value1").unwrap();
// Create child branch
create_branch(&storage, "level1", None, AsOfClause::Now, Default::default()).unwrap();
// Create grandchild branch
create_branch(&storage, "level2", Some("level1"), AsOfClause::Now, Default::default()).unwrap();
// Read from grandchild (should see main's value)
let tx = storage.begin_branch_transaction("level2").unwrap();
assert_eq!(tx.get(b"key1").unwrap(), Some(b"value1".to_vec()));
}
#[test]
fn test_three_way_merge_no_conflicts() {
let config = Config::in_memory();
let storage = StorageEngine::open_in_memory(&config).unwrap();
// Setup: main has key1=v1
storage.put(b"key1", b"v1").unwrap();
// Create branch
create_branch(&storage, "dev", None, AsOfClause::Now, Default::default()).unwrap();
// Modify in dev: key1=v2
let mut tx = storage.begin_branch_transaction("dev").unwrap();
tx.put(b"key1".to_vec(), b"v2".to_vec()).unwrap();
tx.commit().unwrap();
// Merge dev into main
let result = merge_branch(
&storage,
"dev",
"main",
MergeOptions {
conflict_resolution: ConflictResolution::BranchWins,
delete_branch_after: false,
},
).unwrap();
assert_eq!(result.conflicts_count, 0);
assert_eq!(storage.get(b"key1").unwrap(), Some(b"v2".to_vec()));
}
#[test]
fn test_merge_with_conflicts() {
let config = Config::in_memory();
let storage = StorageEngine::open_in_memory(&config).unwrap();
// Setup: main has key1=v1
storage.put(b"key1", b"v1").unwrap();
// Create branch
create_branch(&storage, "dev", None, AsOfClause::Now, Default::default()).unwrap();
// Modify same key in both branches
storage.put(b"key1", b"v2_main").unwrap();
let mut tx = storage.begin_branch_transaction("dev").unwrap();
tx.put(b"key1".to_vec(), b"v2_dev".to_vec()).unwrap();
tx.commit().unwrap();
// Merge with FAIL resolution
let result = merge_branch(
&storage,
"dev",
"main",
MergeOptions {
conflict_resolution: ConflictResolution::Fail,
delete_branch_after: false,
},
);
assert!(result.is_err()); // Should fail due to conflict
}
}
12.2 Integration Tests¶
#[cfg(test)]
mod integration_tests {
use super::*;
#[test]
fn test_branching_sql_integration() {
let config = Config::in_memory();
let storage = StorageEngine::open_in_memory(&config).unwrap();
// Execute SQL commands
execute_sql(&storage, "CREATE TABLE users (id INT, name TEXT)").unwrap();
execute_sql(&storage, "INSERT INTO users VALUES (1, 'Alice')").unwrap();
// Create branch via SQL
execute_sql(&storage, "CREATE DATABASE BRANCH dev FROM CURRENT AS OF NOW").unwrap();
// Insert in dev
execute_sql(&storage, "SET branch = dev").unwrap();
execute_sql(&storage, "INSERT INTO users VALUES (2, 'Bob')").unwrap();
// Verify isolation
execute_sql(&storage, "SET branch = main").unwrap();
let result = query_sql(&storage, "SELECT * FROM users").unwrap();
assert_eq!(result.len(), 1); // Only Alice
execute_sql(&storage, "SET branch = dev").unwrap();
let result = query_sql(&storage, "SELECT * FROM users").unwrap();
assert_eq!(result.len(), 2); // Alice and Bob
}
#[test]
fn test_concurrent_branch_transactions() {
use std::sync::Arc;
use std::thread;
let config = Config::in_memory();
let storage = Arc::new(StorageEngine::open_in_memory(&config).unwrap());
// Create branches
create_branch(&storage, "branch1", None, AsOfClause::Now, Default::default()).unwrap();
create_branch(&storage, "branch2", None, AsOfClause::Now, Default::default()).unwrap();
let storage1 = Arc::clone(&storage);
let storage2 = Arc::clone(&storage);
// Concurrent writes to different branches
let t1 = thread::spawn(move || {
for i in 0..100 {
let mut tx = storage1.begin_branch_transaction("branch1").unwrap();
tx.put(format!("key{}", i).into_bytes(), b"value1".to_vec()).unwrap();
tx.commit().unwrap();
}
});
let t2 = thread::spawn(move || {
for i in 0..100 {
let mut tx = storage2.begin_branch_transaction("branch2").unwrap();
tx.put(format!("key{}", i).into_bytes(), b"value2".to_vec()).unwrap();
tx.commit().unwrap();
}
});
t1.join().unwrap();
t2.join().unwrap();
// Verify isolation
let tx1 = storage.begin_branch_transaction("branch1").unwrap();
assert_eq!(tx1.get(b"key50").unwrap(), Some(b"value1".to_vec()));
let tx2 = storage.begin_branch_transaction("branch2").unwrap();
assert_eq!(tx2.get(b"key50").unwrap(), Some(b"value2".to_vec()));
}
}
12.3 Performance Tests¶
#[cfg(test)]
mod perf_tests {
use super::*;
use std::time::Instant;
#[test]
fn bench_branch_creation() {
let config = Config::in_memory();
let storage = StorageEngine::open_in_memory(&config).unwrap();
// Insert 1M rows
for i in 0..1_000_000 {
storage.put(format!("key{}", i).as_bytes(), b"value").unwrap();
}
// Benchmark branch creation
let start = Instant::now();
create_branch(&storage, "bench", None, AsOfClause::Now, Default::default()).unwrap();
let elapsed = start.elapsed();
println!("Branch creation: {:?}", elapsed);
assert!(elapsed.as_millis() < 10, "Branch creation too slow: {:?}", elapsed);
}
#[test]
fn bench_branch_read_overhead() {
let config = Config::in_memory();
let storage = StorageEngine::open_in_memory(&config).unwrap();
// Insert 100K rows
for i in 0..100_000 {
storage.put(format!("key{}", i).as_bytes(), b"value").unwrap();
}
// Create branch
create_branch(&storage, "bench", None, AsOfClause::Now, Default::default()).unwrap();
// Benchmark main reads
let start = Instant::now();
for i in 0..10_000 {
storage.get(format!("key{}", i).as_bytes()).unwrap();
}
let main_time = start.elapsed();
// Benchmark branch reads
let start = Instant::now();
for i in 0..10_000 {
let tx = storage.begin_branch_transaction("bench").unwrap();
tx.get(format!("key{}", i).as_bytes()).unwrap();
}
let branch_time = start.elapsed();
let overhead = (branch_time.as_micros() as f64 / main_time.as_micros() as f64 - 1.0) * 100.0;
println!("Branch read overhead: {:.2}%", overhead);
assert!(overhead < 10.0, "Branch read overhead too high: {:.2}%", overhead);
}
}
12.4 Stress Tests¶
#[cfg(test)]
mod stress_tests {
use super::*;
#[test]
fn stress_test_many_branches() {
let config = Config::in_memory();
let storage = StorageEngine::open_in_memory(&config).unwrap();
// Create 100 branches
for i in 0..100 {
create_branch(
&storage,
&format!("branch{}", i),
None,
AsOfClause::Now,
Default::default(),
).unwrap();
}
// Verify all branches are accessible
for i in 0..100 {
let tx = storage.begin_branch_transaction(&format!("branch{}", i)).unwrap();
assert!(tx.get(b"key").unwrap().is_none());
}
}
#[test]
fn stress_test_deep_hierarchy() {
let config = Config::in_memory();
let storage = StorageEngine::open_in_memory(&config).unwrap();
// Create 50-level deep hierarchy
let mut parent = None;
for i in 0..50 {
let branch_name = format!("level{}", i);
create_branch(
&storage,
&branch_name,
parent.as_deref(),
AsOfClause::Now,
Default::default(),
).unwrap();
parent = Some(branch_name);
}
// Insert at root
storage.put(b"key", b"value").unwrap();
// Read from deepest branch (should traverse 50 parents)
let tx = storage.begin_branch_transaction("level49").unwrap();
assert_eq!(tx.get(b"key").unwrap(), Some(b"value".to_vec()));
}
}
Appendix A: SQL Examples¶
-- Create a development branch from current state
CREATE DATABASE BRANCH dev FROM CURRENT AS OF NOW;
-- Create a branch from a specific timestamp
CREATE DATABASE BRANCH staging
FROM main
AS OF TIMESTAMP '2025-11-18 10:00:00';
-- Create a branch from a transaction ID
CREATE DATABASE BRANCH hotfix
FROM production
AS OF TRANSACTION 123456;
-- Create a branch with options
CREATE DATABASE BRANCH experiment
FROM main
AS OF NOW
WITH (
replication_factor = 3,
region = 'us-west'
);
-- Switch to a branch (session-level)
SET branch = dev;
-- Insert data in branch
INSERT INTO users (name) VALUES ('Alice');
-- Query branch
SELECT * FROM users;
-- Merge branch back to main
MERGE DATABASE BRANCH dev INTO main
WITH (
conflict_resolution = 'branch_wins',
delete_branch_after = true
);
-- Drop a branch
DROP DATABASE BRANCH IF EXISTS experiment;
-- List all branches (system view)
SELECT * FROM pg_database_branches();
-- View branch hierarchy
SELECT
name,
parent_name,
created_at,
state,
modified_keys
FROM pg_database_branches()
ORDER BY created_at;
Appendix B: Architecture Decision Records¶
ADR-1: Use RocksDB Native MVCC¶
Decision: Leverage RocksDB's existing MVCC capabilities rather than implementing a separate versioning layer.
Rationale: - RocksDB provides efficient multi-version storage - Reduces implementation complexity - Better integration with existing storage layer
Consequences: - Branch versioning builds on RocksDB timestamps - Natural alignment with existing transaction system - Minor overhead for branch metadata
ADR-2: Branch ID in Physical Key¶
Decision: Encode branch ID as part of the physical key rather than using column families.
Rationale: - Simpler key space management - Efficient range scans within a branch - Natural clustering for compaction
Alternatives Considered: - Column families per branch: Complex management, limited to ~10K branches - Separate RocksDB instances: Too much overhead
Consequences: - Keys are slightly larger (8 bytes for branch ID) - Efficient branch isolation and GC
ADR-3: Lazy Parent Resolution¶
Decision: Resolve parent values on-demand during reads rather than eagerly copying.
Rationale: - Instant branch creation - Minimal storage overhead - Most data is never modified in branches
Consequences: - Slight read overhead for unchanged data - Complex read path with parent chain traversal - Excellent storage efficiency
ADR-4: Three-Way Merge Algorithm¶
Decision: Use three-way merge with configurable conflict resolution.
Rationale: - Standard approach in version control systems - Handles complex merge scenarios - Supports automatic conflict resolution
Alternatives Considered: - Two-way merge: Cannot detect concurrent modifications - CRDTs: Too complex for general key-value storage
Consequences: - Requires merge base tracking - Supports flexible conflict resolution strategies
Appendix C: Future Enhancements¶
Distributed Branching¶
Support branches across distributed HeliosDB clusters:
pub struct DistributedBranchOptions {
/// Replication factor
pub replication_factor: usize,
/// Preferred region
pub region: String,
/// Cross-region replication
pub cross_region: bool,
}
Branch Snapshots¶
Create lightweight snapshots within branches:
Branch Permissions¶
Fine-grained access control per branch:
Branch Triggers¶
Execute triggers on branch events:
Summary¶
This architecture provides:
- Efficient Copy-on-Write: Zero-copy branch creation, minimal storage overhead
- Strong Isolation: Branch-aware MVCC with snapshot isolation
- Flexible Merging: Three-way merge with conflict detection and resolution
- Automatic GC: Reference-counting garbage collection for dropped branches
- MVCC Integration: Seamless integration with existing transaction system
Next Steps: 1. Implement core data structures (BranchMetadata, BranchManager) 2. Extend StorageEngine with branch-aware transactions 3. Implement copy-on-write read/write paths 4. Implement three-way merge algorithm 5. Add garbage collection 6. Integration with SQL parser and executor 7. Comprehensive testing (unit, integration, performance)
Implementation Effort: 6-8 weeks - Week 1-2: Core structures and metadata management - Week 3-4: Copy-on-write read/write paths - Week 5: Merge algorithm - Week 6: Garbage collection - Week 7: SQL integration - Week 8: Testing and optimization