diff --git a/src/pdata/btree_builder.rs b/src/pdata/btree_builder.rs index 7c509ba..f6c62f2 100644 --- a/src/pdata/btree_builder.rs +++ b/src/pdata/btree_builder.rs @@ -141,16 +141,12 @@ pub struct WriteResult { loc: u64, } -/// Write a node to a free metadata block, and mark the block as reserved, -/// without increasing its reference count. -fn write_reserved_node_( - w: &mut WriteBatcher, - mut node: Node, -) -> Result { +/// Write a node to a free metadata block. +fn write_node_(w: &mut WriteBatcher, mut node: Node) -> Result { let keys = node.get_keys(); let first_key = *keys.first().unwrap_or(&0u64); - let b = w.reserve()?; + let b = w.alloc()?; node.set_block(b.loc); let mut cursor = Cursor::new(b.get_data()); @@ -187,7 +183,7 @@ impl NodeIO for LeafIO { values, }; - write_reserved_node_(w, node) + write_node_(w, node) } fn read(&self, w: &mut WriteBatcher, block: u64) -> Result<(Vec, Vec)> { @@ -220,7 +216,7 @@ impl NodeIO for InternalIO { values, }; - write_reserved_node_(w, node) + write_node_(w, node) } fn read(&self, w: &mut WriteBatcher, block: u64) -> Result<(Vec, Vec)> { @@ -246,6 +242,7 @@ pub struct NodeBuilder { max_entries_per_node: usize, values: VecDeque<(u64, V)>, nodes: Vec, + shared: bool, } /// When the builder is including pre-built nodes it has to decide whether @@ -265,13 +262,14 @@ pub struct NodeSummary { impl<'a, V: Pack + Unpack + Clone> NodeBuilder { /// Create a new NodeBuilder - pub fn new(nio: Box>, value_rc: Box>) -> Self { + pub fn new(nio: Box>, value_rc: Box>, shared: bool) -> Self { NodeBuilder { nio, value_rc, max_entries_per_node: calc_max_entries::(), values: VecDeque::new(), nodes: Vec::new(), + shared, } } /// Push a single value. This may emit a new node, hence the Result @@ -324,6 +322,7 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder { // Add the remaining nodes. for i in 1..nodes.len() { let n = nodes.get(i).unwrap(); + w.sm.lock().unwrap().inc(n.block, 1)?; self.nodes.push(n.clone()); } } else { @@ -332,6 +331,7 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder { // add the nodes for n in nodes { + w.sm.lock().unwrap().inc(n.block, 1)?; self.nodes.push(n.clone()); } } @@ -388,7 +388,7 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder { block: wresult.loc, key: wresult.first_key, nr_entries, - shared: false, + shared: self.shared, }); Ok(()) } @@ -433,6 +433,7 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder { fn unshift_node(&mut self, w: &mut WriteBatcher) -> Result<()> { let ls = self.nodes.pop().unwrap(); let (keys, values) = self.read_node(w, ls.block)?; + w.sm.lock().unwrap().dec(ls.block)?; let mut vals = VecDeque::new(); @@ -460,7 +461,7 @@ pub struct BTreeBuilder { impl BTreeBuilder { pub fn new(value_rc: Box>) -> BTreeBuilder { BTreeBuilder { - leaf_builder: NodeBuilder::new(Box::new(LeafIO {}), value_rc), + leaf_builder: NodeBuilder::new(Box::new(LeafIO {}), value_rc, false), } } @@ -486,10 +487,7 @@ pub fn build_btree(w: &mut WriteBatcher, leaves: Vec) -> Result 1 { - let mut builder = NodeBuilder::new( - Box::new(InternalIO {}), - Box::new(SMRefCounter::new(w.sm.clone())), - ); + let mut builder = NodeBuilder::new(Box::new(InternalIO {}), Box::new(NoopRC {}), false); for n in nodes { builder.push_value(w, n.key, n.block)?; @@ -500,13 +498,36 @@ pub fn build_btree(w: &mut WriteBatcher, leaves: Vec) -> Result( + w: &mut WriteBatcher, + leaves: &[NodeSummary], + value_rc: &mut dyn RefCounter, +) -> Result<()> { + let nio = LeafIO {}; + + for n in leaves { + let deleted = w.sm.lock().unwrap().dec(n.block)?; + if deleted { + let (_, values) = nio.read(w, n.block)?; + for v in values { + value_rc.dec(&v)?; + } + } + } + + Ok(()) +} + +//------------------------------------------ diff --git a/src/thin/restore.rs b/src/thin/restore.rs index e6a756f..a6738bb 100644 --- a/src/thin/restore.rs +++ b/src/thin/restore.rs @@ -101,7 +101,8 @@ impl<'a> Restorer<'a> { let value_rc = Box::new(MappingRC { sm: self.data_sm.as_ref().unwrap().clone(), }); - let leaf_builder = NodeBuilder::new(Box::new(LeafIO {}), value_rc); + let shared = matches!(section, MappedSection::Def(_)); + let leaf_builder = NodeBuilder::new(Box::new(LeafIO {}), value_rc, shared); self.current_map = Some((section, leaf_builder)); Ok(Visit::Continue) @@ -134,9 +135,26 @@ impl<'a> Restorer<'a> { Ok((details_root, mapping_root)) } + // Release the temporary references to the leaves of pre-built subtrees. + // The contained child values will also be decreased if the leaf is + // no longer referenced. + fn release_subtrees(&mut self) -> Result<()> { + let mut value_rc = MappingRC { + sm: self.data_sm.as_ref().unwrap().clone(), + }; + + for (_, leaves) in self.sub_trees.iter() { + release_leaves(self.w, &leaves, &mut value_rc)?; + } + + Ok(()) + } + fn finalize(&mut self) -> Result<()> { let (details_root, mapping_root) = self.build_device_details()?; + self.release_subtrees()?; + // Build data space map let data_sm = self.data_sm.as_ref().unwrap(); let data_sm_root = build_data_sm(self.w, data_sm.lock().unwrap().deref())?; diff --git a/src/write_batcher.rs b/src/write_batcher.rs index 2300c87..9e64577 100644 --- a/src/write_batcher.rs +++ b/src/write_batcher.rs @@ -18,9 +18,11 @@ pub struct WriteBatcher { batch_size: usize, queue: Vec, - // The reserved range covers all the blocks allocated or reserved by this - // WriteBatcher, and the blocks already occupied. No blocks in this range - // are expected to be freed, hence a single range is used for the representation. + // The reserved range keeps track of all the blocks allocated. + // An allocated block won't be reused even though it was freed. + // In other words, the WriteBatcher performs allocation in + // transactional fashion, that simplifies block allocationas + // as well as tracking. reserved: std::ops::Range, } @@ -83,22 +85,6 @@ impl WriteBatcher { Ok(Block::zeroed(b)) } - pub fn reserve(&mut self) -> Result { - let mut sm = self.sm.lock().unwrap(); - let b = find_free(sm.deref_mut(), &self.reserved)?; - self.reserved.end = b + 1; - - Ok(Block::new(b)) - } - - pub fn reserve_zeroed(&mut self) -> Result { - let mut sm = self.sm.lock().unwrap(); - let b = find_free(sm.deref_mut(), &self.reserved)?; - self.reserved.end = b + 1; - - Ok(Block::zeroed(b)) - } - pub fn get_reserved_range(&self) -> std::ops::Range { std::ops::Range { start: self.reserved.start,