[btree_builder] Fix reference counts of unshifted child values

The ref counts now are handled in a straightforward method:
The pre-built subtrees serve as snapshots of potentially shared nodes.
They hold an initial ref count on the pre-built nodes and their children.
The temporary ref counts are later dropped at the end of device building.

This way fixes the ref counts of unshifted nodes, thus the 'reserve'
operation introduced by commit 6d16c58 is reverted.
This commit is contained in:
Ming-Hung Tsai 2021-07-30 14:26:29 +08:00
parent 052c9f90ea
commit bd39b570ef
3 changed files with 66 additions and 41 deletions

View File

@ -141,16 +141,12 @@ pub struct WriteResult {
loc: u64, loc: u64,
} }
/// Write a node to a free metadata block, and mark the block as reserved, /// Write a node to a free metadata block.
/// without increasing its reference count. fn write_node_<V: Unpack + Pack>(w: &mut WriteBatcher, mut node: Node<V>) -> Result<WriteResult> {
fn write_reserved_node_<V: Unpack + Pack>(
w: &mut WriteBatcher,
mut node: Node<V>,
) -> Result<WriteResult> {
let keys = node.get_keys(); let keys = node.get_keys();
let first_key = *keys.first().unwrap_or(&0u64); let first_key = *keys.first().unwrap_or(&0u64);
let b = w.reserve()?; let b = w.alloc()?;
node.set_block(b.loc); node.set_block(b.loc);
let mut cursor = Cursor::new(b.get_data()); let mut cursor = Cursor::new(b.get_data());
@ -187,7 +183,7 @@ impl<V: Unpack + Pack> NodeIO<V> for LeafIO {
values, values,
}; };
write_reserved_node_(w, node) write_node_(w, node)
} }
fn read(&self, w: &mut WriteBatcher, block: u64) -> Result<(Vec<u64>, Vec<V>)> { fn read(&self, w: &mut WriteBatcher, block: u64) -> Result<(Vec<u64>, Vec<V>)> {
@ -220,7 +216,7 @@ impl NodeIO<u64> for InternalIO {
values, values,
}; };
write_reserved_node_(w, node) write_node_(w, node)
} }
fn read(&self, w: &mut WriteBatcher, block: u64) -> Result<(Vec<u64>, Vec<u64>)> { fn read(&self, w: &mut WriteBatcher, block: u64) -> Result<(Vec<u64>, Vec<u64>)> {
@ -246,6 +242,7 @@ pub struct NodeBuilder<V: Pack + Unpack> {
max_entries_per_node: usize, max_entries_per_node: usize,
values: VecDeque<(u64, V)>, values: VecDeque<(u64, V)>,
nodes: Vec<NodeSummary>, nodes: Vec<NodeSummary>,
shared: bool,
} }
/// When the builder is including pre-built nodes it has to decide whether /// When the builder is including pre-built nodes it has to decide whether
@ -265,13 +262,14 @@ pub struct NodeSummary {
impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> { impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> {
/// Create a new NodeBuilder /// Create a new NodeBuilder
pub fn new(nio: Box<dyn NodeIO<V>>, value_rc: Box<dyn RefCounter<V>>) -> Self { pub fn new(nio: Box<dyn NodeIO<V>>, value_rc: Box<dyn RefCounter<V>>, shared: bool) -> Self {
NodeBuilder { NodeBuilder {
nio, nio,
value_rc, value_rc,
max_entries_per_node: calc_max_entries::<V>(), max_entries_per_node: calc_max_entries::<V>(),
values: VecDeque::new(), values: VecDeque::new(),
nodes: Vec::new(), nodes: Vec::new(),
shared,
} }
} }
/// Push a single value. This may emit a new node, hence the Result /// Push a single value. This may emit a new node, hence the Result
@ -324,6 +322,7 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> {
// Add the remaining nodes. // Add the remaining nodes.
for i in 1..nodes.len() { for i in 1..nodes.len() {
let n = nodes.get(i).unwrap(); let n = nodes.get(i).unwrap();
w.sm.lock().unwrap().inc(n.block, 1)?;
self.nodes.push(n.clone()); self.nodes.push(n.clone());
} }
} else { } else {
@ -332,6 +331,7 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> {
// add the nodes // add the nodes
for n in nodes { for n in nodes {
w.sm.lock().unwrap().inc(n.block, 1)?;
self.nodes.push(n.clone()); self.nodes.push(n.clone());
} }
} }
@ -388,7 +388,7 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> {
block: wresult.loc, block: wresult.loc,
key: wresult.first_key, key: wresult.first_key,
nr_entries, nr_entries,
shared: false, shared: self.shared,
}); });
Ok(()) Ok(())
} }
@ -433,6 +433,7 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> {
fn unshift_node(&mut self, w: &mut WriteBatcher) -> Result<()> { fn unshift_node(&mut self, w: &mut WriteBatcher) -> Result<()> {
let ls = self.nodes.pop().unwrap(); let ls = self.nodes.pop().unwrap();
let (keys, values) = self.read_node(w, ls.block)?; let (keys, values) = self.read_node(w, ls.block)?;
w.sm.lock().unwrap().dec(ls.block)?;
let mut vals = VecDeque::new(); let mut vals = VecDeque::new();
@ -460,7 +461,7 @@ pub struct BTreeBuilder<V: Unpack + Pack> {
impl<V: Unpack + Pack + Clone> BTreeBuilder<V> { impl<V: Unpack + Pack + Clone> BTreeBuilder<V> {
pub fn new(value_rc: Box<dyn RefCounter<V>>) -> BTreeBuilder<V> { pub fn new(value_rc: Box<dyn RefCounter<V>>) -> BTreeBuilder<V> {
BTreeBuilder { BTreeBuilder {
leaf_builder: NodeBuilder::new(Box::new(LeafIO {}), value_rc), leaf_builder: NodeBuilder::new(Box::new(LeafIO {}), value_rc, false),
} }
} }
@ -486,10 +487,7 @@ pub fn build_btree(w: &mut WriteBatcher, leaves: Vec<NodeSummary>) -> Result<u64
// up with a single root. // up with a single root.
let mut nodes = leaves; let mut nodes = leaves;
while nodes.len() > 1 { while nodes.len() > 1 {
let mut builder = NodeBuilder::new( let mut builder = NodeBuilder::new(Box::new(InternalIO {}), Box::new(NoopRC {}), false);
Box::new(InternalIO {}),
Box::new(SMRefCounter::new(w.sm.clone())),
);
for n in nodes { for n in nodes {
builder.push_value(w, n.key, n.block)?; builder.push_value(w, n.key, n.block)?;
@ -500,13 +498,36 @@ pub fn build_btree(w: &mut WriteBatcher, leaves: Vec<NodeSummary>) -> Result<u64
assert!(nodes.len() == 1); assert!(nodes.len() == 1);
// The root is expected to be referenced by only one parent,
// hence the ref count is increased before the availability
// of it's parent.
let root = nodes[0].block; let root = nodes[0].block;
w.sm.lock().unwrap().inc(root, 1)?;
Ok(root) Ok(root)
} }
//------------------------------------------ //------------------------------------------
// The pre-built nodes and the contained values were initialized with
// a ref count 1, which is analogous to a "tempoaray snapshot" of
// potentially shared leaves. We have to drop those temporary references
// to pre-built nodes at the end of device building, and also decrease
// ref counts of the contained values if a pre-built leaf is no longer
// referenced.
pub fn release_leaves<V: Pack + Unpack>(
w: &mut WriteBatcher,
leaves: &[NodeSummary],
value_rc: &mut dyn RefCounter<V>,
) -> Result<()> {
let nio = LeafIO {};
for n in leaves {
let deleted = w.sm.lock().unwrap().dec(n.block)?;
if deleted {
let (_, values) = nio.read(w, n.block)?;
for v in values {
value_rc.dec(&v)?;
}
}
}
Ok(())
}
//------------------------------------------

View File

@ -101,7 +101,8 @@ impl<'a> Restorer<'a> {
let value_rc = Box::new(MappingRC { let value_rc = Box::new(MappingRC {
sm: self.data_sm.as_ref().unwrap().clone(), sm: self.data_sm.as_ref().unwrap().clone(),
}); });
let leaf_builder = NodeBuilder::new(Box::new(LeafIO {}), value_rc); let shared = matches!(section, MappedSection::Def(_));
let leaf_builder = NodeBuilder::new(Box::new(LeafIO {}), value_rc, shared);
self.current_map = Some((section, leaf_builder)); self.current_map = Some((section, leaf_builder));
Ok(Visit::Continue) Ok(Visit::Continue)
@ -134,9 +135,26 @@ impl<'a> Restorer<'a> {
Ok((details_root, mapping_root)) Ok((details_root, mapping_root))
} }
// Release the temporary references to the leaves of pre-built subtrees.
// The contained child values will also be decreased if the leaf is
// no longer referenced.
fn release_subtrees(&mut self) -> Result<()> {
let mut value_rc = MappingRC {
sm: self.data_sm.as_ref().unwrap().clone(),
};
for (_, leaves) in self.sub_trees.iter() {
release_leaves(self.w, &leaves, &mut value_rc)?;
}
Ok(())
}
fn finalize(&mut self) -> Result<()> { fn finalize(&mut self) -> Result<()> {
let (details_root, mapping_root) = self.build_device_details()?; let (details_root, mapping_root) = self.build_device_details()?;
self.release_subtrees()?;
// Build data space map // Build data space map
let data_sm = self.data_sm.as_ref().unwrap(); let data_sm = self.data_sm.as_ref().unwrap();
let data_sm_root = build_data_sm(self.w, data_sm.lock().unwrap().deref())?; let data_sm_root = build_data_sm(self.w, data_sm.lock().unwrap().deref())?;

View File

@ -18,9 +18,11 @@ pub struct WriteBatcher {
batch_size: usize, batch_size: usize,
queue: Vec<Block>, queue: Vec<Block>,
// The reserved range covers all the blocks allocated or reserved by this // The reserved range keeps track of all the blocks allocated.
// WriteBatcher, and the blocks already occupied. No blocks in this range // An allocated block won't be reused even though it was freed.
// are expected to be freed, hence a single range is used for the representation. // In other words, the WriteBatcher performs allocation in
// transactional fashion, that simplifies block allocationas
// as well as tracking.
reserved: std::ops::Range<u64>, reserved: std::ops::Range<u64>,
} }
@ -83,22 +85,6 @@ impl WriteBatcher {
Ok(Block::zeroed(b)) Ok(Block::zeroed(b))
} }
pub fn reserve(&mut self) -> Result<Block> {
let mut sm = self.sm.lock().unwrap();
let b = find_free(sm.deref_mut(), &self.reserved)?;
self.reserved.end = b + 1;
Ok(Block::new(b))
}
pub fn reserve_zeroed(&mut self) -> Result<Block> {
let mut sm = self.sm.lock().unwrap();
let b = find_free(sm.deref_mut(), &self.reserved)?;
self.reserved.end = b + 1;
Ok(Block::zeroed(b))
}
pub fn get_reserved_range(&self) -> std::ops::Range<u64> { pub fn get_reserved_range(&self) -> std::ops::Range<u64> {
std::ops::Range { std::ops::Range {
start: self.reserved.start, start: self.reserved.start,