diff --git a/src/io_engine.rs b/src/io_engine.rs index c36c46c..45f0d77 100644 --- a/src/io_engine.rs +++ b/src/io_engine.rs @@ -50,6 +50,7 @@ unsafe impl Send for Block {} pub trait IoEngine { fn get_nr_blocks(&self) -> u64; + fn get_batch_size(&self) -> usize; fn read(&self, b: u64) -> Result; // The whole io could fail, or individual blocks @@ -167,6 +168,10 @@ impl IoEngine for SyncIoEngine { self.nr_blocks } + fn get_batch_size(&self) -> usize { + 1 + } + fn read(&self, loc: u64) -> Result { SyncIoEngine::read_(&mut self.get(), loc) } @@ -346,6 +351,10 @@ impl IoEngine for AsyncIoEngine { inner.nr_blocks } + fn get_batch_size(&self) -> usize { + self.inner.lock().unwrap().queue_len as usize + } + fn read(&self, b: u64) -> Result { let mut inner = self.inner.lock().unwrap(); let fd = types::Target::Fd(inner.input.as_raw_fd()); diff --git a/src/lib.rs b/src/lib.rs index 0dbdfab..451257f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,3 +25,4 @@ pub mod report; pub mod shrink; pub mod thin; pub mod version; +pub mod write_batcher; diff --git a/src/pdata/btree.rs b/src/pdata/btree.rs index bee9f91..68cb6b3 100644 --- a/src/pdata/btree.rs +++ b/src/pdata/btree.rs @@ -2,24 +2,28 @@ use anyhow::anyhow; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use data_encoding::BASE64; use nom::{number::complete::*, IResult}; -use std::collections::BTreeMap; use std::fmt; -use std::sync::{Arc, Mutex}; use thiserror::Error; -use threadpool::ThreadPool; -use crate::checksum; use crate::io_engine::*; use crate::pack::vm; -use crate::pdata::space_map::*; use crate::pdata::unpack::*; //------------------------------------------ #[derive(Clone, Debug, PartialEq)] pub struct KeyRange { - start: Option, - end: Option, // This is the one-past-the-end value + pub start: Option, + pub end: Option, // This is the one-past-the-end value +} + +impl KeyRange { + pub fn new() -> KeyRange { + KeyRange { + start: None, + end: None, + } + } } impl fmt::Display for KeyRange { @@ -190,7 +194,7 @@ fn split_one(path: &Vec, kr: &KeyRange, k: u64) -> Result<(KeyRange, KeyRan } } -fn split_key_ranges(path: &Vec, kr: &KeyRange, keys: &[u64]) -> Result> { +pub fn split_key_ranges(path: &Vec, kr: &KeyRange, keys: &[u64]) -> Result> { let mut krs = Vec::with_capacity(keys.len()); if keys.len() == 0 { @@ -352,7 +356,7 @@ pub fn node_err(path: &Vec, msg: &str) -> BTreeError { ) } -fn node_err_s(path: &Vec, msg: String) -> BTreeError { +pub fn node_err_s(path: &Vec, msg: String) -> BTreeError { BTreeError::Path(path.clone(), Box::new(BTreeError::NodeError(msg))) } @@ -579,565 +583,3 @@ pub fn unpack_node( } //------------------------------------------ - -pub trait NodeVisitor { - // &self is deliberately non mut to allow the walker to use multiple threads. - fn visit( - &self, - path: &Vec, - keys: &KeyRange, - header: &NodeHeader, - keys: &[u64], - values: &[V], - ) -> Result<()>; - - // Nodes may be shared and thus visited multiple times. The walker avoids - // doing repeated IO, but it does call this method to keep the visitor up to - // date. - fn visit_again(&self, path: &Vec, b: u64) -> Result<()>; - - fn end_walk(&self) -> Result<()>; -} - -#[derive(Clone)] -pub struct BTreeWalker { - engine: Arc, - sm: Arc>, - fails: Arc>>, - ignore_non_fatal: bool, -} - -impl BTreeWalker { - pub fn new(engine: Arc, ignore_non_fatal: bool) -> BTreeWalker { - let nr_blocks = engine.get_nr_blocks() as usize; - let r: BTreeWalker = BTreeWalker { - engine, - sm: Arc::new(Mutex::new(RestrictedSpaceMap::new(nr_blocks as u64))), - fails: Arc::new(Mutex::new(BTreeMap::new())), - ignore_non_fatal, - }; - r - } - - pub fn new_with_sm( - engine: Arc, - sm: Arc>, - ignore_non_fatal: bool, - ) -> Result { - { - let sm = sm.lock().unwrap(); - assert_eq!(sm.get_nr_blocks().unwrap(), engine.get_nr_blocks()); - } - - Ok(BTreeWalker { - engine, - sm, - fails: Arc::new(Mutex::new(BTreeMap::new())), - ignore_non_fatal, - }) - } - - fn failed(&self, b: u64) -> Option { - let fails = self.fails.lock().unwrap(); - match fails.get(&b) { - None => None, - Some(e) => Some(e.clone()), - } - } - - fn set_fail(&self, b: u64, err: BTreeError) { - // FIXME: should we monitor the size of fails, and abort if too many errors? - let mut fails = self.fails.lock().unwrap(); - fails.insert(b, err); - } - - // Atomically increments the ref count, and returns the _old_ count. - fn sm_inc(&self, b: u64) -> u32 { - let mut sm = self.sm.lock().unwrap(); - let count = sm.get(b).unwrap(); - sm.inc(b, 1).unwrap(); - count - } - - fn build_aggregate(&self, b: u64, errs: Vec) -> Result<()> { - match errs.len() { - 0 => Ok(()), - 1 => { - let e = errs[0].clone(); - self.set_fail(b, e.clone()); - Err(e) - } - _ => { - let e = aggregate_error(errs); - self.set_fail(b, e.clone()); - Err(e) - } - } - } - - fn walk_nodes( - &self, - path: &mut Vec, - visitor: &NV, - krs: &[KeyRange], - bs: &[u64], - ) -> Vec - where - NV: NodeVisitor, - V: Unpack, - { - assert_eq!(krs.len(), bs.len()); - let mut errs: Vec = Vec::new(); - - let mut blocks = Vec::with_capacity(bs.len()); - let mut filtered_krs = Vec::with_capacity(krs.len()); - for i in 0..bs.len() { - if self.sm_inc(bs[i]) == 0 { - // Node not yet seen - blocks.push(bs[i]); - filtered_krs.push(krs[i].clone()); - } else { - // This node has already been checked ... - match self.failed(bs[i]) { - None => { - // ... it was clean. - if let Err(e) = visitor.visit_again(path, bs[i]) { - // ... but the visitor isn't happy - errs.push(e.clone()); - } - } - Some(e) => { - // ... there was an error - errs.push(e.clone()); - } - } - } - } - - match self.engine.read_many(&blocks[0..]) { - Err(_) => { - // IO completely failed, error every block - for (i, b) in blocks.iter().enumerate() { - let e = io_err(path).keys_context(&filtered_krs[i]); - errs.push(e.clone()); - self.set_fail(*b, e); - } - } - Ok(rblocks) => { - let mut i = 0; - for rb in rblocks { - match rb { - Err(_) => { - let e = io_err(path).keys_context(&filtered_krs[i]); - errs.push(e.clone()); - self.set_fail(blocks[i], e); - } - Ok(b) => match self.walk_node(path, visitor, &filtered_krs[i], &b, false) { - Err(e) => { - errs.push(e); - } - Ok(()) => {} - }, - } - - i += 1; - } - } - } - - errs - } - - fn walk_node_( - &self, - path: &mut Vec, - visitor: &NV, - kr: &KeyRange, - b: &Block, - is_root: bool, - ) -> Result<()> - where - NV: NodeVisitor, - V: Unpack, - { - use Node::*; - - let bt = checksum::metadata_block_type(b.get_data()); - if bt != checksum::BT::NODE { - return Err(node_err_s( - path, - format!("checksum failed for node {}, {:?}", b.loc, bt), - ) - .keys_context(kr)); - } - - let node = unpack_node::(path, &b.get_data(), self.ignore_non_fatal, is_root)?; - - match node { - Internal { keys, values, .. } => { - let krs = split_key_ranges(path, &kr, &keys)?; - let errs = self.walk_nodes(path, visitor, &krs, &values); - return self.build_aggregate(b.loc, errs); - } - Leaf { - header, - keys, - values, - } => { - if let Err(e) = visitor.visit(path, &kr, &header, &keys, &values) { - let e = BTreeError::Path(path.clone(), Box::new(e.clone())); - self.set_fail(b.loc, e.clone()); - return Err(e); - } - } - } - - Ok(()) - } - - fn walk_node( - &self, - path: &mut Vec, - visitor: &NV, - kr: &KeyRange, - b: &Block, - is_root: bool, - ) -> Result<()> - where - NV: NodeVisitor, - V: Unpack, - { - path.push(b.loc); - let r = self.walk_node_(path, visitor, kr, b, is_root); - path.pop(); - visitor.end_walk()?; - r - } - - pub fn walk(&self, path: &mut Vec, visitor: &NV, root: u64) -> Result<()> - where - NV: NodeVisitor, - V: Unpack, - { - if self.sm_inc(root) > 0 { - if let Some(e) = self.failed(root) { - Err(e.clone()) - } else { - visitor.visit_again(path, root) - } - } else { - let root = self.engine.read(root).map_err(|_| io_err(path))?; - let kr = KeyRange { - start: None, - end: None, - }; - self.walk_node(path, visitor, &kr, &root, true) - } - } -} - -//-------------------------------- - -fn walk_node_threaded_( - w: Arc, - path: &mut Vec, - pool: &ThreadPool, - visitor: Arc, - kr: &KeyRange, - b: &Block, - is_root: bool, -) -> Result<()> -where - NV: NodeVisitor + Send + Sync + 'static, - V: Unpack, -{ - use Node::*; - - let bt = checksum::metadata_block_type(b.get_data()); - if bt != checksum::BT::NODE { - return Err(node_err_s( - path, - format!("checksum failed for node {}, {:?}", b.loc, bt), - ) - .keys_context(kr)); - } - - let node = unpack_node::(path, &b.get_data(), w.ignore_non_fatal, is_root)?; - - match node { - Internal { keys, values, .. } => { - let krs = split_key_ranges(path, &kr, &keys)?; - let errs = walk_nodes_threaded(w.clone(), path, pool, visitor, &krs, &values); - return w.build_aggregate(b.loc, errs); - } - Leaf { - header, - keys, - values, - } => { - visitor.visit(path, kr, &header, &keys, &values)?; - } - } - - Ok(()) -} - -fn walk_node_threaded( - w: Arc, - path: &mut Vec, - pool: &ThreadPool, - visitor: Arc, - kr: &KeyRange, - b: &Block, - is_root: bool, -) -> Result<()> -where - NV: NodeVisitor + Send + Sync + 'static, - V: Unpack, -{ - path.push(b.loc); - let r = walk_node_threaded_(w, path, pool, visitor.clone(), kr, b, is_root); - path.pop(); - visitor.end_walk()?; - r -} - -fn walk_nodes_threaded( - w: Arc, - path: &mut Vec, - pool: &ThreadPool, - visitor: Arc, - krs: &[KeyRange], - bs: &[u64], -) -> Vec -where - NV: NodeVisitor + Send + Sync + 'static, - V: Unpack, -{ - assert_eq!(krs.len(), bs.len()); - let mut errs: Vec = Vec::new(); - - let mut blocks = Vec::with_capacity(bs.len()); - let mut filtered_krs = Vec::with_capacity(krs.len()); - for i in 0..bs.len() { - if w.sm_inc(bs[i]) == 0 { - // Node not yet seen - blocks.push(bs[i]); - filtered_krs.push(krs[i].clone()); - } else { - // This node has already been checked ... - match w.failed(bs[i]) { - None => { - // ... it was clean. - if let Err(e) = visitor.visit_again(path, bs[i]) { - // ... but the visitor isn't happy - errs.push(e.clone()); - } - } - Some(e) => { - // ... there was an error - errs.push(e.clone()); - } - } - } - } - - match w.engine.read_many(&blocks[0..]) { - Err(_) => { - // IO completely failed error every block - for (i, b) in blocks.iter().enumerate() { - let e = io_err(path).keys_context(&filtered_krs[i]); - errs.push(e.clone()); - w.set_fail(*b, e); - } - } - Ok(rblocks) => { - let mut i = 0; - let errs = Arc::new(Mutex::new(Vec::new())); - - for rb in rblocks { - match rb { - Err(_) => { - let e = io_err(path).keys_context(&filtered_krs[i]); - let mut errs = errs.lock().unwrap(); - errs.push(e.clone()); - w.set_fail(blocks[i], e); - } - Ok(b) => { - let w = w.clone(); - let visitor = visitor.clone(); - let kr = filtered_krs[i].clone(); - let errs = errs.clone(); - let mut path = path.clone(); - - pool.execute(move || { - match w.walk_node(&mut path, visitor.as_ref(), &kr, &b, false) { - Err(e) => { - let mut errs = errs.lock().unwrap(); - errs.push(e); - } - Ok(()) => {} - } - }); - } - } - - i += 1; - } - - pool.join(); - } - } - - errs -} - -pub fn walk_threaded( - path: &mut Vec, - w: Arc, - pool: &ThreadPool, - visitor: Arc, - root: u64, -) -> Result<()> -where - NV: NodeVisitor + Send + Sync + 'static, - V: Unpack, -{ - if w.sm_inc(root) > 0 { - if let Some(e) = w.failed(root) { - Err(e.clone()) - } else { - visitor.visit_again(path, root) - } - } else { - let root = w.engine.read(root).map_err(|_| io_err(path))?; - let kr = KeyRange { - start: None, - end: None, - }; - walk_node_threaded(w, path, pool, visitor, &kr, &root, true) - } -} - -//------------------------------------------ - -struct ValueCollector { - values: Mutex>, -} - -impl ValueCollector { - fn new() -> ValueCollector { - ValueCollector { - values: Mutex::new(BTreeMap::new()), - } - } -} - -// FIXME: should we be using Copy rather than clone? (Yes) -impl NodeVisitor for ValueCollector { - fn visit( - &self, - _path: &Vec, - _kr: &KeyRange, - _h: &NodeHeader, - keys: &[u64], - values: &[V], - ) -> Result<()> { - let mut vals = self.values.lock().unwrap(); - for n in 0..keys.len() { - vals.insert(keys[n], values[n].clone()); - } - - Ok(()) - } - - fn visit_again(&self, _path: &Vec, _b: u64) -> Result<()> { - Ok(()) - } - - fn end_walk(&self) -> Result<()> { - Ok(()) - } -} - -pub fn btree_to_map( - path: &mut Vec, - engine: Arc, - ignore_non_fatal: bool, - root: u64, -) -> Result> { - let walker = BTreeWalker::new(engine, ignore_non_fatal); - let visitor = ValueCollector::::new(); - walker.walk(path, &visitor, root)?; - Ok(visitor.values.into_inner().unwrap()) -} - -pub fn btree_to_map_with_sm( - path: &mut Vec, - engine: Arc, - sm: Arc>, - ignore_non_fatal: bool, - root: u64, -) -> Result> { - let walker = BTreeWalker::new_with_sm(engine, sm, ignore_non_fatal)?; - let visitor = ValueCollector::::new(); - - walker.walk(path, &visitor, root)?; - Ok(visitor.values.into_inner().unwrap()) -} - -//------------------------------------------ - -struct ValuePathCollector { - values: Mutex, V)>>, -} - -impl ValuePathCollector { - fn new() -> ValuePathCollector { - ValuePathCollector { - values: Mutex::new(BTreeMap::new()), - } - } -} - -impl NodeVisitor for ValuePathCollector { - fn visit( - &self, - path: &Vec, - _kr: &KeyRange, - _h: &NodeHeader, - keys: &[u64], - values: &[V], - ) -> Result<()> { - let mut vals = self.values.lock().unwrap(); - for n in 0..keys.len() { - vals.insert(keys[n], (path.clone(), values[n].clone())); - } - - Ok(()) - } - - fn visit_again(&self, _path: &Vec, _b: u64) -> Result<()> { - Ok(()) - } - - fn end_walk(&self) -> Result<()> { - Ok(()) - } -} - -pub fn btree_to_map_with_path( - path: &mut Vec, - engine: Arc, - sm: Arc>, - ignore_non_fatal: bool, - root: u64, -) -> Result, V)>> { - let walker = BTreeWalker::new_with_sm(engine, sm, ignore_non_fatal)?; - let visitor = ValuePathCollector::::new(); - - walker.walk(path, &visitor, root)?; - Ok(visitor.values.into_inner().unwrap()) -} - -//------------------------------------------ diff --git a/src/pdata/btree_builder.rs b/src/pdata/btree_builder.rs index 389b685..cbc619c 100644 --- a/src/pdata/btree_builder.rs +++ b/src/pdata/btree_builder.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, Result}; +use anyhow::Result; use byteorder::{LittleEndian, WriteBytesExt}; use std::collections::VecDeque; use std::io::Cursor; @@ -9,6 +9,7 @@ use crate::io_engine::*; use crate::pdata::btree::*; use crate::pdata::space_map::*; use crate::pdata::unpack::*; +use crate::write_batcher::*; //------------------------------------------ @@ -67,24 +68,16 @@ fn calc_max_entries() -> usize { //------------------------------------------ struct Entries { - max_entries: usize, + pub max_entries: usize, entries: VecDeque<(u64, V)>, } enum Action { - Noop, - WriteSingle { - keys: Vec, - values: Vec, - }, - WritePair { - keys1: Vec, - values1: Vec, - keys2: Vec, - values2: Vec, - }, + EmitNode(Vec, Vec), // keys, values } +use Action::*; + impl Entries { pub fn new(max_entries: usize) -> Entries { Entries { @@ -93,20 +86,19 @@ impl Entries { } } - pub fn add_entry(&mut self, k: u64, v: V) -> Action { - let result = if self.full() { + pub fn add_entry(&mut self, k: u64, v: V) -> Vec> { + let mut result = Vec::new(); + + if self.full() { let (keys, values) = self.pop(self.max_entries); - Action::WriteSingle { keys, values } - } else { - Action::Noop - }; + result.push(EmitNode(keys, values)); + } self.entries.push_back((k, v)); - result } - pub fn complete(&mut self) -> Action { + fn complete_(&mut self, result: &mut Vec>) { let n = self.entries.len(); if n >= self.max_entries { @@ -115,20 +107,20 @@ impl Entries { let (keys1, values1) = self.pop(n1); let (keys2, values2) = self.pop(n2); - Action::WritePair { - keys1, - values1, - keys2, - values2, - } + result.push(EmitNode(keys1, values1)); + result.push(EmitNode(keys2, values2)); } else if n > 0 { let (keys, values) = self.pop(n); - Action::WriteSingle { keys, values } - } else { - Action::Noop + result.push(EmitNode(keys, values)); } } + pub fn complete(&mut self) -> Vec> { + let mut result = Vec::new(); + self.complete_(&mut result); + result + } + fn full(&self) -> bool { self.entries.len() >= 2 * self.max_entries } @@ -149,55 +141,11 @@ impl Entries { //------------------------------------------ -struct WriteBatcher { - engine: Arc>, - sm: Arc>, - - batch_size: usize, - queue: Vec, -} - -impl WriteBatcher { - fn new( - engine: Arc>, - sm: Arc>, - batch_size: usize, - ) -> WriteBatcher { - WriteBatcher { - engine, - sm, - batch_size, - queue: Vec::with_capacity(batch_size), - } - } - - fn alloc(&mut self) -> Result { - let mut sm = self.sm.lock().unwrap(); - let b = sm.alloc()?; - - if b.is_none() { - return Err(anyhow!("out of metadata space")); - } - - Ok(b.unwrap()) - } - - fn write(&mut self, b: Block) -> Result<()> { - checksum::write_checksum(&mut b.get_data(), checksum::BT::NODE)?; - - if self.queue.len() == self.batch_size { - self.flush()?; - } - - self.queue.push(b); - Ok(()) - } - - fn flush(&mut self) -> Result<()> { - self.engine.write_many(&self.queue)?; - self.queue.clear(); - Ok(()) - } +pub struct NodeSummary { + block: u64, + nr_entries: usize, + key_low: u64, + key_high: u64, // inclusive } //------------------------------------------ @@ -208,11 +156,11 @@ fn write_node_(w: &mut WriteBatcher, mut node: Node) -> Res let loc = w.alloc()?; node.set_block(loc); - + let b = Block::new(loc); let mut cursor = Cursor::new(b.get_data()); pack_node(&node, &mut cursor)?; - w.write(b)?; + w.write(b, checksum::BT::NODE)?; Ok((first_key, loc)) } @@ -268,7 +216,10 @@ pub struct Builder { } impl Builder { - pub fn new(engine: Arc>, sm: Arc>) -> Builder { + pub fn new( + engine: Arc, + sm: Arc>, + ) -> Builder { let max_entries = calc_max_entries::(); let max_internal_entries = calc_max_entries::(); @@ -282,13 +233,41 @@ impl Builder { } pub fn add_entry(&mut self, k: u64, v: V) -> Result<()> { - let action = self.entries.add_entry(k, v); - self.perform_action(action) + let actions = self.entries.add_entry(k, v); + for a in actions { + self.perform_action(a)?; + } + + Ok(()) + } + + pub fn add_leaf_node(&mut self, leaf: &NodeSummary) -> Result<()> { + match leaf.nr_entries { + n if n == 0 => { + // Do nothing + }, + n if n < (self.entries.max_entries / 2) => { + // FIXME: what if we've already queued a handful of entries for a node? + // Add the entries individually + todo!(); + }, + n => { + let actions = self.entries.complete(); + for a in actions { + self.perform_action(a)?; + } + self.add_internal_entry(0, leaf.key_low, leaf.block)?; + } + } + + Ok(()) } pub fn complete(mut self) -> Result { - let action = self.entries.complete(); - self.perform_action(action)?; + let actions = self.entries.complete(); + for a in actions { + self.perform_action(a)?; + } self.w.flush()?; Ok(self.root) } @@ -297,33 +276,26 @@ impl Builder { fn add_internal_entry(&mut self, level: usize, k: u64, v: u64) -> Result<()> { if self.internal_entries.len() == level { - self.internal_entries.push(Entries::new(self.max_internal_entries)); + self.internal_entries + .push(Entries::new(self.max_internal_entries)); } - let action = self.internal_entries[level].add_entry(k, v); - self.perform_internal_action(level, action) + let actions = self.internal_entries[level].add_entry(k, v); + + for a in actions { + self.perform_internal_action(level, a)?; + } + + Ok(()) } fn perform_internal_action(&mut self, level: usize, action: Action) -> Result<()> { match action { - Action::Noop => {} - Action::WriteSingle { keys, values } => { + EmitNode(keys, values) => { let (k, loc) = write_internal(&mut self.w, keys, values)?; self.add_internal_entry(level + 1, k, loc)?; self.root = loc; - } - Action::WritePair { - keys1, - values1, - keys2, - values2, - } => { - let (k, loc) = write_leaf(&mut self.w, keys1, values1)?; - self.add_internal_entry(level + 1, k, loc)?; - - let (k, loc) = write_leaf(&mut self.w, keys2, values2)?; - self.add_internal_entry(level + 1, k, loc)?; - } + }, } Ok(()) @@ -331,23 +303,10 @@ impl Builder { fn perform_action(&mut self, action: Action) -> Result<()> { match action { - Action::Noop => {} - Action::WriteSingle { keys, values } => { + EmitNode(keys, values) => { let (k, loc) = write_leaf(&mut self.w, keys, values)?; self.add_internal_entry(0, k, loc)?; - } - Action::WritePair { - keys1, - values1, - keys2, - values2, - } => { - let (k, loc) = write_leaf(&mut self.w, keys1, values1)?; - self.add_internal_entry(0, k, loc)?; - - let (k, loc) = write_leaf(&mut self.w, keys2, values2)?; - self.add_internal_entry(0, k, loc)?; - } + }, } Ok(()) @@ -355,10 +314,3 @@ impl Builder { } //------------------------------------------ - -#[test] -fn fail() { - assert!(false); -} - -//------------------------------------------ diff --git a/src/pdata/btree_leaf_walker.rs b/src/pdata/btree_leaf_walker.rs new file mode 100644 index 0000000..9070580 --- /dev/null +++ b/src/pdata/btree_leaf_walker.rs @@ -0,0 +1,245 @@ +use fixedbitset::FixedBitSet; +use std::sync::{Arc, Mutex}; + +use crate::checksum; +use crate::io_engine::*; +use crate::pdata::btree::*; +use crate::pdata::space_map::*; +use crate::pdata::unpack::*; + +//------------------------------------------ + +pub trait LeafVisitor { + fn visit(&mut self, kr: &KeyRange, b: u64) -> Result<()>; + + // Nodes may be shared and thus visited multiple times. The walker avoids + // doing repeated IO, but it does call this method to keep the visitor up to + // date. b may be an internal node obviously. + fn visit_again(&mut self, b: u64) -> Result<()>; + fn end_walk(&mut self) -> Result<()>; +} + +// This is useful if you just want to get the space map counts from the walk. +pub struct NoopLeafVisitor {} + +impl LeafVisitor for NoopLeafVisitor { + fn visit(&mut self, kr: &KeyRange, b: u64) -> Result<()> { + Ok(()) + } + + fn visit_again(&mut self, b: u64) -> Result<()> { + Ok(()) + } + + fn end_walk(&mut self) -> Result<()> { + Ok(()) + } +} + +pub struct LeafWalker<'a> { + engine: Arc, + sm: &'a mut dyn SpaceMap, + leaves: FixedBitSet, + ignore_non_fatal: bool, +} + +impl<'a> LeafWalker<'a> { + pub fn new( + engine: Arc, + sm: &'a mut dyn SpaceMap, + ignore_non_fatal: bool, + ) -> LeafWalker<'a> { + let nr_blocks = engine.get_nr_blocks() as usize; + LeafWalker { + engine, + sm, + leaves: FixedBitSet::with_capacity(nr_blocks), + ignore_non_fatal, + } + } + + // Atomically increments the ref count, and returns the _old_ count. + fn sm_inc(&mut self, b: u64) -> u32 { + let sm = &mut self.sm; + let count = sm.get(b).unwrap(); + sm.inc(b, 1).unwrap(); + count + } + + fn walk_nodes( + &mut self, + depth: usize, + path: &mut Vec, + visitor: &mut LV, + krs: &[KeyRange], + bs: &[u64], + ) -> Result<()> + where + LV: LeafVisitor, + V: Unpack, + { + assert_eq!(krs.len(), bs.len()); + let mut errs: Vec = Vec::new(); + + let mut blocks = Vec::with_capacity(bs.len()); + let mut filtered_krs = Vec::with_capacity(krs.len()); + for i in 0..bs.len() { + if self.sm_inc(bs[i]) == 0 { + // Node not yet seen + blocks.push(bs[i]); + filtered_krs.push(krs[i].clone()); + } else { + // This node has already been checked ... + if let Err(e) = visitor.visit_again(bs[i]) { + // ... but the visitor isn't happy + errs.push(e.clone()); + } + } + } + + let rblocks = self + .engine + .read_many(&blocks[0..]) + .map_err(|_e| io_err(path))?; + + let mut i = 0; + for rb in rblocks { + match rb { + Err(_) => { + return Err(io_err(path).keys_context(&filtered_krs[i])); + } + Ok(b) => { + self.walk_node(depth - 1, path, visitor, &filtered_krs[i], &b, false)?; + } + } + + i += 1; + } + + Ok(()) + } + + fn walk_node_( + &mut self, + depth: usize, + path: &mut Vec, + visitor: &mut LV, + kr: &KeyRange, + b: &Block, + is_root: bool, + ) -> Result<()> + where + LV: LeafVisitor, + V: Unpack, + { + use Node::*; + + let bt = checksum::metadata_block_type(b.get_data()); + if bt != checksum::BT::NODE { + return Err(node_err_s( + path, + format!("checksum failed for node {}, {:?}", b.loc, bt), + ) + .keys_context(kr)); + } + + let node = unpack_node::(path, &b.get_data(), self.ignore_non_fatal, is_root)?; + + if let Internal { keys, values, .. } = node { + let krs = split_key_ranges(path, &kr, &keys)?; + if depth == 0 { + for i in 0..krs.len() { + self.sm.inc(values[i], 1).expect("sm.inc() failed"); + for v in &values { + self.leaves.insert(*v as usize); + } + visitor.visit(&krs[i], values[i])?; + } + Ok(()) + } else { + self.walk_nodes(depth, path, visitor, &krs, &values) + } + } else { + Err(node_err(path, "btree nodes are not all at the same depth.")) + } + } + + fn walk_node( + &mut self, + depth: usize, + path: &mut Vec, + visitor: &mut LV, + kr: &KeyRange, + b: &Block, + is_root: bool, + ) -> Result<()> + where + LV: LeafVisitor, + V: Unpack, + { + path.push(b.loc); + let r = self.walk_node_(depth, path, visitor, kr, b, is_root); + path.pop(); + visitor.end_walk()?; + r + } + + fn get_depth(&self, path: &mut Vec, root: u64, is_root: bool) -> Result { + use Node::*; + + let b = self.engine.read(root).map_err(|_| io_err(path))?; + + let bt = checksum::metadata_block_type(b.get_data()); + if bt != checksum::BT::NODE { + return Err(node_err_s( + path, + format!("checksum failed for node {}, {:?}", root, bt), + )); + } + + let node = unpack_node::(path, &b.get_data(), self.ignore_non_fatal, is_root)?; + + match node { + Internal { values, .. } => { + let n = self.get_depth::(path, values[0], false)?; + Ok(n + 1) + } + Leaf { .. } => Ok(0), + } + } + + pub fn walk(&mut self, path: &mut Vec, visitor: &mut LV, root: u64) -> Result<()> + where + LV: LeafVisitor, + V: Unpack, + { + let kr = KeyRange { + start: None, + end: None, + }; + + let depth = self.get_depth::(path, root, true)?; + + if depth == 0 { + self.sm_inc(root); + self.leaves.insert(root as usize); + visitor.visit(&kr, root)?; + Ok(()) + } else { + if self.sm_inc(root) > 0 { + visitor.visit_again(root) + } else { + let root = self.engine.read(root).map_err(|_| io_err(path))?; + + self.walk_node(depth - 1, path, visitor, &kr, &root, true) + } + } + } + + // Call this to extract the leaves bitset after you've done your walking. + pub fn get_leaves(self) -> FixedBitSet { + self.leaves + } +} + +//------------------------------------------ diff --git a/src/pdata/btree_merge.rs b/src/pdata/btree_merge.rs new file mode 100644 index 0000000..874db2f --- /dev/null +++ b/src/pdata/btree_merge.rs @@ -0,0 +1,136 @@ +use anyhow::{anyhow, Result}; +use byteorder::{LittleEndian, WriteBytesExt}; +use std::collections::VecDeque; +use std::io::Cursor; +use std::sync::{Arc, Mutex}; + +use crate::checksum; +use crate::io_engine::*; +use crate::pdata::btree; +use crate::pdata::btree::*; +use crate::pdata::btree_walker::*; +use crate::pdata::space_map::*; +use crate::pdata::unpack::*; +use crate::write_batcher::*; + +//------------------------------------------ + +// The subtrees will often consist of a single under populated leaf node. Given this +// we're going to merge by: +// i) Building an ordered list of all leaf nodes across all subtrees. +// ii) Merge leaf nodes where they can be packed more efficiently (non destructively to original subtrees). +// iii) Build higher levels from scratch. There are very few of these internal nodes compared to leaves anyway. + +struct NodeSummary { + block: u64, + nr_entries: usize, + key_low: u64, + key_high: u64, // inclusive +} + +struct LVInner { + last_key: Option, + leaves: Vec, +} + +struct LeafVisitor { + inner: Mutex, +} + +impl LeafVisitor { + fn new() -> LeafVisitor { + LeafVisitor { + inner: Mutex::new(LVInner { + last_key: None, + leaves: Vec::new(), + }), + } + } +} + +impl NodeVisitor for LeafVisitor { + fn visit( + &self, + path: &Vec, + kr: &KeyRange, + header: &NodeHeader, + keys: &[u64], + values: &[V], + ) -> btree::Result<()> { + // ignore empty nodes + if keys.len() == 0 { + return Ok(()); + } + + let mut inner = self.inner.lock().unwrap(); + + // Check keys are ordered. + if inner.leaves.len() > 0 { + let last_key = inner.leaves.last().unwrap().key_high; + if keys[0] <= last_key { + return Err(BTreeError::NodeError( + "unable to merge btrees: sub trees out of order".to_string(), + )); + } + } + + let l = NodeSummary { + block: *path.last().unwrap(), + nr_entries: keys.len(), + key_low: keys[0], + key_high: *keys.last().unwrap(), + }; + + inner.leaves.push(l); + Ok(()) + } + + fn visit_again(&self, path: &Vec, b: u64) -> btree::Result<()> { + Ok(()) + } + + fn end_walk(&self) -> btree::Result<()> { + Ok(()) + } +} + +pub type AEngine = Arc; + +fn collect_leaves(engine: AEngine, roots: &[u64]) -> Result> { + let lv = LeafVisitor::new(); + let walker = BTreeWalker::new(engine, false); + + let mut path = Vec::new(); + for root in roots { + walker.walk::(&mut path, &lv, *root)?; + } + + Ok(lv.inner.into_inner().unwrap().leaves) +} + +//------------------------------------------ + +fn optimise_leaves( + batcher: &mut WriteBatcher, + lvs: Vec, +) -> Result> { + // FIXME: implement + Ok(lvs) +} + +//------------------------------------------ + +pub fn merge( + engine: AEngine, + sm: Arc>, + roots: &[u64], +) -> Result { + let lvs = collect_leaves::(engine.clone(), roots)?; + + let mut batcher = WriteBatcher::new(engine, sm, 256); + let lvs = optimise_leaves::(&mut batcher, lvs)?; + + todo!(); +} + +//------------------------------------------ diff --git a/src/pdata/btree_walker.rs b/src/pdata/btree_walker.rs new file mode 100644 index 0000000..6834ccf --- /dev/null +++ b/src/pdata/btree_walker.rs @@ -0,0 +1,573 @@ +use std::collections::BTreeMap; +use std::sync::{Arc, Mutex}; +use threadpool::ThreadPool; + +use crate::checksum; +use crate::io_engine::*; +use crate::pdata::btree::*; +use crate::pdata::space_map::*; +use crate::pdata::unpack::*; + +//------------------------------------------ + +pub trait NodeVisitor { + // &self is deliberately non mut to allow the walker to use multiple threads. + fn visit( + &self, + path: &Vec, + kr: &KeyRange, + header: &NodeHeader, + keys: &[u64], + values: &[V], + ) -> Result<()>; + + // Nodes may be shared and thus visited multiple times. The walker avoids + // doing repeated IO, but it does call this method to keep the visitor up to + // date. + fn visit_again(&self, path: &Vec, b: u64) -> Result<()>; + + fn end_walk(&self) -> Result<()>; +} + +#[derive(Clone)] +pub struct BTreeWalker { + engine: Arc, + sm: Arc>, + fails: Arc>>, + ignore_non_fatal: bool, +} + +impl BTreeWalker { + pub fn new(engine: Arc, ignore_non_fatal: bool) -> BTreeWalker { + let nr_blocks = engine.get_nr_blocks() as usize; + let r: BTreeWalker = BTreeWalker { + engine, + sm: Arc::new(Mutex::new(RestrictedSpaceMap::new(nr_blocks as u64))), + fails: Arc::new(Mutex::new(BTreeMap::new())), + ignore_non_fatal, + }; + r + } + + pub fn new_with_sm( + engine: Arc, + sm: Arc>, + ignore_non_fatal: bool, + ) -> Result { + { + let sm = sm.lock().unwrap(); + assert_eq!(sm.get_nr_blocks().unwrap(), engine.get_nr_blocks()); + } + + Ok(BTreeWalker { + engine, + sm, + fails: Arc::new(Mutex::new(BTreeMap::new())), + ignore_non_fatal, + }) + } + + fn failed(&self, b: u64) -> Option { + let fails = self.fails.lock().unwrap(); + match fails.get(&b) { + None => None, + Some(e) => Some(e.clone()), + } + } + + fn set_fail(&self, b: u64, err: BTreeError) { + // FIXME: should we monitor the size of fails, and abort if too many errors? + let mut fails = self.fails.lock().unwrap(); + fails.insert(b, err); + } + + // Atomically increments the ref count, and returns the _old_ count. + fn sm_inc(&self, b: u64) -> u32 { + let mut sm = self.sm.lock().unwrap(); + let count = sm.get(b).unwrap(); + sm.inc(b, 1).unwrap(); + count + } + + fn build_aggregate(&self, b: u64, errs: Vec) -> Result<()> { + match errs.len() { + 0 => Ok(()), + 1 => { + let e = errs[0].clone(); + self.set_fail(b, e.clone()); + Err(e) + } + _ => { + let e = aggregate_error(errs); + self.set_fail(b, e.clone()); + Err(e) + } + } + } + + fn walk_nodes( + &self, + path: &mut Vec, + visitor: &NV, + krs: &[KeyRange], + bs: &[u64], + ) -> Vec + where + NV: NodeVisitor, + V: Unpack, + { + assert_eq!(krs.len(), bs.len()); + let mut errs: Vec = Vec::new(); + + let mut blocks = Vec::with_capacity(bs.len()); + let mut filtered_krs = Vec::with_capacity(krs.len()); + for i in 0..bs.len() { + if self.sm_inc(bs[i]) == 0 { + // Node not yet seen + blocks.push(bs[i]); + filtered_krs.push(krs[i].clone()); + } else { + // This node has already been checked ... + match self.failed(bs[i]) { + None => { + // ... it was clean. + if let Err(e) = visitor.visit_again(path, bs[i]) { + // ... but the visitor isn't happy + errs.push(e.clone()); + } + } + Some(e) => { + // ... there was an error + errs.push(e.clone()); + } + } + } + } + + match self.engine.read_many(&blocks[0..]) { + Err(_) => { + // IO completely failed, error every block + for (i, b) in blocks.iter().enumerate() { + let e = io_err(path).keys_context(&filtered_krs[i]); + errs.push(e.clone()); + self.set_fail(*b, e); + } + } + Ok(rblocks) => { + let mut i = 0; + for rb in rblocks { + match rb { + Err(_) => { + let e = io_err(path).keys_context(&filtered_krs[i]); + errs.push(e.clone()); + self.set_fail(blocks[i], e); + } + Ok(b) => match self.walk_node(path, visitor, &filtered_krs[i], &b, false) { + Err(e) => { + errs.push(e); + } + Ok(()) => {} + }, + } + + i += 1; + } + } + } + + errs + } + + fn walk_node_( + &self, + path: &mut Vec, + visitor: &NV, + kr: &KeyRange, + b: &Block, + is_root: bool, + ) -> Result<()> + where + NV: NodeVisitor, + V: Unpack, + { + use Node::*; + + let bt = checksum::metadata_block_type(b.get_data()); + if bt != checksum::BT::NODE { + return Err(node_err_s( + path, + format!("checksum failed for node {}, {:?}", b.loc, bt), + ) + .keys_context(kr)); + } + + let node = unpack_node::(path, &b.get_data(), self.ignore_non_fatal, is_root)?; + + match node { + Internal { keys, values, .. } => { + let krs = split_key_ranges(path, &kr, &keys)?; + let errs = self.walk_nodes(path, visitor, &krs, &values); + return self.build_aggregate(b.loc, errs); + } + Leaf { + header, + keys, + values, + } => { + if let Err(e) = visitor.visit(path, &kr, &header, &keys, &values) { + let e = BTreeError::Path(path.clone(), Box::new(e.clone())); + self.set_fail(b.loc, e.clone()); + return Err(e); + } + } + } + + Ok(()) + } + + fn walk_node( + &self, + path: &mut Vec, + visitor: &NV, + kr: &KeyRange, + b: &Block, + is_root: bool, + ) -> Result<()> + where + NV: NodeVisitor, + V: Unpack, + { + path.push(b.loc); + let r = self.walk_node_(path, visitor, kr, b, is_root); + path.pop(); + visitor.end_walk()?; + r + } + + pub fn walk(&self, path: &mut Vec, visitor: &NV, root: u64) -> Result<()> + where + NV: NodeVisitor, + V: Unpack, + { + if self.sm_inc(root) > 0 { + if let Some(e) = self.failed(root) { + Err(e.clone()) + } else { + visitor.visit_again(path, root) + } + } else { + let root = self.engine.read(root).map_err(|_| io_err(path))?; + let kr = KeyRange { + start: None, + end: None, + }; + self.walk_node(path, visitor, &kr, &root, true) + } + } +} + +//-------------------------------- + +fn walk_node_threaded_( + w: Arc, + path: &mut Vec, + pool: &ThreadPool, + visitor: Arc, + kr: &KeyRange, + b: &Block, + is_root: bool, +) -> Result<()> +where + NV: NodeVisitor + Send + Sync + 'static, + V: Unpack, +{ + use Node::*; + + let bt = checksum::metadata_block_type(b.get_data()); + if bt != checksum::BT::NODE { + return Err(node_err_s( + path, + format!("checksum failed for node {}, {:?}", b.loc, bt), + ) + .keys_context(kr)); + } + + let node = unpack_node::(path, &b.get_data(), w.ignore_non_fatal, is_root)?; + + match node { + Internal { keys, values, .. } => { + let krs = split_key_ranges(path, &kr, &keys)?; + let errs = walk_nodes_threaded(w.clone(), path, pool, visitor, &krs, &values); + return w.build_aggregate(b.loc, errs); + } + Leaf { + header, + keys, + values, + } => { + visitor.visit(path, kr, &header, &keys, &values)?; + } + } + + Ok(()) +} + +fn walk_node_threaded( + w: Arc, + path: &mut Vec, + pool: &ThreadPool, + visitor: Arc, + kr: &KeyRange, + b: &Block, + is_root: bool, +) -> Result<()> +where + NV: NodeVisitor + Send + Sync + 'static, + V: Unpack, +{ + path.push(b.loc); + let r = walk_node_threaded_(w, path, pool, visitor.clone(), kr, b, is_root); + path.pop(); + visitor.end_walk()?; + r +} + +fn walk_nodes_threaded( + w: Arc, + path: &mut Vec, + pool: &ThreadPool, + visitor: Arc, + krs: &[KeyRange], + bs: &[u64], +) -> Vec +where + NV: NodeVisitor + Send + Sync + 'static, + V: Unpack, +{ + assert_eq!(krs.len(), bs.len()); + let mut errs: Vec = Vec::new(); + + let mut blocks = Vec::with_capacity(bs.len()); + let mut filtered_krs = Vec::with_capacity(krs.len()); + for i in 0..bs.len() { + if w.sm_inc(bs[i]) == 0 { + // Node not yet seen + blocks.push(bs[i]); + filtered_krs.push(krs[i].clone()); + } else { + // This node has already been checked ... + match w.failed(bs[i]) { + None => { + // ... it was clean. + if let Err(e) = visitor.visit_again(path, bs[i]) { + // ... but the visitor isn't happy + errs.push(e.clone()); + } + } + Some(e) => { + // ... there was an error + errs.push(e.clone()); + } + } + } + } + + match w.engine.read_many(&blocks[0..]) { + Err(_) => { + // IO completely failed error every block + for (i, b) in blocks.iter().enumerate() { + let e = io_err(path).keys_context(&filtered_krs[i]); + errs.push(e.clone()); + w.set_fail(*b, e); + } + } + Ok(rblocks) => { + let mut i = 0; + let errs = Arc::new(Mutex::new(Vec::new())); + + for rb in rblocks { + match rb { + Err(_) => { + let e = io_err(path).keys_context(&filtered_krs[i]); + let mut errs = errs.lock().unwrap(); + errs.push(e.clone()); + w.set_fail(blocks[i], e); + } + Ok(b) => { + let w = w.clone(); + let visitor = visitor.clone(); + let kr = filtered_krs[i].clone(); + let errs = errs.clone(); + let mut path = path.clone(); + + pool.execute(move || { + match w.walk_node(&mut path, visitor.as_ref(), &kr, &b, false) { + Err(e) => { + let mut errs = errs.lock().unwrap(); + errs.push(e); + } + Ok(()) => {} + } + }); + } + } + + i += 1; + } + + pool.join(); + } + } + + errs +} + +pub fn walk_threaded( + path: &mut Vec, + w: Arc, + pool: &ThreadPool, + visitor: Arc, + root: u64, +) -> Result<()> +where + NV: NodeVisitor + Send + Sync + 'static, + V: Unpack, +{ + if w.sm_inc(root) > 0 { + if let Some(e) = w.failed(root) { + Err(e.clone()) + } else { + visitor.visit_again(path, root) + } + } else { + let root = w.engine.read(root).map_err(|_| io_err(path))?; + let kr = KeyRange { + start: None, + end: None, + }; + walk_node_threaded(w, path, pool, visitor, &kr, &root, true) + } +} + +//------------------------------------------ + +struct ValueCollector { + values: Mutex>, +} + +impl ValueCollector { + fn new() -> ValueCollector { + ValueCollector { + values: Mutex::new(BTreeMap::new()), + } + } +} + +// FIXME: should we be using Copy rather than clone? (Yes) +impl NodeVisitor for ValueCollector { + fn visit( + &self, + _path: &Vec, + _kr: &KeyRange, + _h: &NodeHeader, + keys: &[u64], + values: &[V], + ) -> Result<()> { + let mut vals = self.values.lock().unwrap(); + for n in 0..keys.len() { + vals.insert(keys[n], values[n].clone()); + } + + Ok(()) + } + + fn visit_again(&self, _path: &Vec, _b: u64) -> Result<()> { + Ok(()) + } + + fn end_walk(&self) -> Result<()> { + Ok(()) + } +} + +pub fn btree_to_map( + path: &mut Vec, + engine: Arc, + ignore_non_fatal: bool, + root: u64, +) -> Result> { + let walker = BTreeWalker::new(engine, ignore_non_fatal); + let visitor = ValueCollector::::new(); + walker.walk(path, &visitor, root)?; + Ok(visitor.values.into_inner().unwrap()) +} + +pub fn btree_to_map_with_sm( + path: &mut Vec, + engine: Arc, + sm: Arc>, + ignore_non_fatal: bool, + root: u64, +) -> Result> { + let walker = BTreeWalker::new_with_sm(engine, sm, ignore_non_fatal)?; + let visitor = ValueCollector::::new(); + + walker.walk(path, &visitor, root)?; + Ok(visitor.values.into_inner().unwrap()) +} + +//------------------------------------------ + +struct ValuePathCollector { + values: Mutex, V)>>, +} + +impl ValuePathCollector { + fn new() -> ValuePathCollector { + ValuePathCollector { + values: Mutex::new(BTreeMap::new()), + } + } +} + +impl NodeVisitor for ValuePathCollector { + fn visit( + &self, + path: &Vec, + _kr: &KeyRange, + _h: &NodeHeader, + keys: &[u64], + values: &[V], + ) -> Result<()> { + let mut vals = self.values.lock().unwrap(); + for n in 0..keys.len() { + vals.insert(keys[n], (path.clone(), values[n].clone())); + } + + Ok(()) + } + + fn visit_again(&self, _path: &Vec, _b: u64) -> Result<()> { + Ok(()) + } + + fn end_walk(&self) -> Result<()> { + Ok(()) + } +} + +pub fn btree_to_map_with_path( + path: &mut Vec, + engine: Arc, + sm: Arc>, + ignore_non_fatal: bool, + root: u64, +) -> Result, V)>> { + let walker = BTreeWalker::new_with_sm(engine, sm, ignore_non_fatal)?; + let visitor = ValuePathCollector::::new(); + + walker.walk(path, &visitor, root)?; + Ok(visitor.values.into_inner().unwrap()) +} + +//------------------------------------------ diff --git a/src/pdata/mod.rs b/src/pdata/mod.rs index e691b21..26a7318 100644 --- a/src/pdata/mod.rs +++ b/src/pdata/mod.rs @@ -1,5 +1,8 @@ pub mod btree; pub mod btree_builder; +pub mod btree_merge; +pub mod btree_leaf_walker; +pub mod btree_walker; pub mod space_map; pub mod unpack; diff --git a/src/pdata/space_map.rs b/src/pdata/space_map.rs index 34daa15..d060c7b 100644 --- a/src/pdata/space_map.rs +++ b/src/pdata/space_map.rs @@ -3,6 +3,7 @@ use byteorder::{LittleEndian, WriteBytesExt}; use fixedbitset::FixedBitSet; use nom::{multi::count, number::complete::*, IResult}; use std::sync::{Arc, Mutex}; +use std::boxed::Box; use crate::io_engine::*; use crate::pdata::unpack::{Pack, Unpack}; @@ -328,6 +329,16 @@ pub fn core_sm(nr_entries: u64, max_count: u32) -> Arc Box { + if max_count <= u8::MAX as u32 { + Box::new(CoreSpaceMap::::new(nr_entries)) + } else if max_count <= u16::MAX as u32 { + Box::new(CoreSpaceMap::::new(nr_entries)) + } else { + Box::new(CoreSpaceMap::::new(nr_entries)) + } +} + //------------------------------------------ // This in core space map can only count to one, useful when walking diff --git a/src/report.rs b/src/report.rs index dd87a48..0131f00 100644 --- a/src/report.rs +++ b/src/report.rs @@ -153,17 +153,17 @@ impl SimpleInner { impl ReportInner for SimpleInner { fn set_title(&mut self, txt: &str) { - println!("{}", txt); + eprintln!("{}", txt); } fn set_sub_title(&mut self, txt: &str) { - println!("{}", txt); + eprintln!("{}", txt); } fn progress(&mut self, percent: u8) { let elapsed = self.last_progress.elapsed().unwrap(); if elapsed > std::time::Duration::from_secs(5) { - println!("Progress: {}%", percent); + eprintln!("Progress: {}%", percent); self.last_progress = std::time::SystemTime::now(); } } diff --git a/src/thin/check.rs b/src/thin/check.rs index 08f7ff9..e5f3d46 100644 --- a/src/thin/check.rs +++ b/src/thin/check.rs @@ -9,6 +9,7 @@ use threadpool::ThreadPool; use crate::checksum; use crate::io_engine::{AsyncIoEngine, IoEngine, SyncIoEngine}; use crate::pdata::btree::{self, *}; +use crate::pdata::btree_walker::*; use crate::pdata::space_map::*; use crate::pdata::unpack::*; use crate::report::*; diff --git a/src/thin/dump.rs b/src/thin/dump.rs index 2de6cd4..cb62330 100644 --- a/src/thin/dump.rs +++ b/src/thin/dump.rs @@ -1,15 +1,21 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; use std::collections::{BTreeMap, BTreeSet}; +use std::io::Write; +use std::ops::DerefMut; use std::path::Path; use std::sync::{Arc, Mutex}; -use crate::io_engine::{AsyncIoEngine, IoEngine, SyncIoEngine}; +use crate::checksum; +use crate::io_engine::{AsyncIoEngine, Block, IoEngine, SyncIoEngine}; use crate::pdata::btree::{self, *}; +use crate::pdata::btree_leaf_walker::*; +use crate::pdata::btree_walker::*; use crate::pdata::space_map::*; use crate::pdata::unpack::*; use crate::report::*; use crate::thin::block_time::*; use crate::thin::device_detail::*; +use crate::thin::runs::*; use crate::thin::superblock::*; use crate::thin::xml::{self, MetadataVisitor}; @@ -166,51 +172,124 @@ fn mk_context(opts: &ThinDumpOptions) -> Result { //------------------------------------------ -struct NoopVisitor {} +type DefId = u64; +type ThinId = u32; -impl btree::NodeVisitor for NoopVisitor { - fn visit( - &self, - _path: &Vec, - _kr: &btree::KeyRange, - _h: &btree::NodeHeader, - _k: &[u64], - _values: &[V], - ) -> btree::Result<()> { +#[derive(Clone)] +enum Entry { + Leaf(u64), + Ref(DefId), +} + +#[derive(Clone)] +struct Mapping { + kr: KeyRange, + entries: Vec, +} + +#[derive(Clone)] +struct Device { + thin_id: ThinId, + detail: DeviceDetail, + map: Mapping, +} + +#[derive(Clone)] +struct Def { + def_id: DefId, + map: Mapping, +} + +#[derive(Clone)] +struct Metadata { + defs: Vec, + devs: Vec, +} + +//------------------------------------------ + +struct CollectLeaves { + leaves: Vec, +} + +impl CollectLeaves { + fn new() -> CollectLeaves { + CollectLeaves { leaves: Vec::new() } + } +} + +impl LeafVisitor for CollectLeaves { + fn visit(&mut self, _kr: &KeyRange, b: u64) -> btree::Result<()> { + self.leaves.push(Entry::Leaf(b)); Ok(()) } - fn visit_again(&self, _path: &Vec, _b: u64) -> btree::Result<()> { + fn visit_again(&mut self, b: u64) -> btree::Result<()> { + self.leaves.push(Entry::Ref(b)); Ok(()) } - fn end_walk(&self) -> btree::Result<()> { + fn end_walk(&mut self) -> btree::Result<()> { Ok(()) } } -fn find_shared_nodes( +fn collect_leaves( ctx: &Context, - nr_metadata_blocks: u64, - roots: &BTreeMap, -) -> Result<(BTreeSet, Arc>)> { - // By default the walker uses a restricted space map that can only count to 1. So - // we explicitly create a full sm. - let sm = core_sm(nr_metadata_blocks, roots.len() as u32); - let w = BTreeWalker::new_with_sm(ctx.engine.clone(), sm.clone(), false)?; + shared: &mut BTreeSet, + mut sm: Box, +) -> Result>> { + let mut map: BTreeMap> = BTreeMap::new(); - let mut path = Vec::new(); - path.push(0); + ctx.report.set_title(&format!( + "Collecting leaves for {} shared nodes", + shared.len() + )); - for (thin_id, root) in roots { - ctx.report.info(&format!("scanning {}", thin_id)); - let v = NoopVisitor {}; - w.walk::(&mut path, &v, *root)?; + // FIXME: we don't want any leaves in shared. + for r in shared.iter() { + let old_count = sm.get(*r).expect("couldn't get count from space map."); + sm.set(*r, 0).expect("couldn't set count in space map."); + + let mut w = LeafWalker::new(ctx.engine.clone(), sm.deref_mut(), false); + let mut v = CollectLeaves::new(); + + let mut path = Vec::new(); + path.push(0); + + // ctx.report.set_title(&format!("collecting {}", *r)); + w.walk::(&mut path, &mut v, *r)?; + sm.set(*r, old_count) + .expect("couldn't set count in space map."); + + map.insert(*r, v.leaves); } + Ok(map) +} + +//------------------------------------------ + +fn find_shared_nodes( + ctx: &Context, + roots: &BTreeMap, u64)>, +) -> Result<(BTreeSet, Box)> { + let nr_metadata_blocks = ctx.engine.get_nr_blocks(); + let mut sm = core_sm_without_mutex(nr_metadata_blocks, roots.len() as u32); + let mut v = NoopLeafVisitor {}; + let mut w = LeafWalker::new(ctx.engine.clone(), sm.deref_mut(), false); + + for (thin_id, (path, root)) in roots { + let mut path = path.clone(); + ctx.report.info(&format!("scanning {}", thin_id)); + w.walk::(&mut path, &mut v, *root)?; + } + + // We have to get the leaves so w is consumed and the &mut on sm + // is dropped. + let leaves = w.get_leaves(); let mut shared = BTreeSet::new(); { - let sm = sm.lock().unwrap(); for i in 0..sm.get_nr_blocks().unwrap() { if sm.get(i).expect("couldn't get count from space map.") > 1 { shared.insert(i); @@ -218,69 +297,280 @@ fn find_shared_nodes( } } - return Ok((shared, sm)); + // we're not interested in leaves (roots will get re-added later). + { + for i in 0..leaves.len() { + if leaves.contains(i) { + shared.remove(&(i as u64)); + } + } + } + + Ok((shared, sm)) } //------------------------------------------ -fn dump_node( - ctx: &Context, - out: &mut dyn xml::MetadataVisitor, - root: u64, - sm: &Arc>, - force: bool, // sets the ref count for the root to zero to force output. -) -> Result<()> { - let w = BTreeWalker::new_with_sm(ctx.engine.clone(), sm.clone(), false)?; - let mut path = Vec::new(); - path.push(0); - - let v = MappingVisitor::new(out); - - // Temporarily set the ref count for the root to zero. - let mut old_count = 0; - if force { - let mut sm = sm.lock().unwrap(); - old_count = sm.get(root).unwrap(); - sm.set(root, 0)?; - } - - w.walk::(&mut path, &v, root)?; - - // Reset the ref count for root. - if force { - let mut sm = sm.lock().unwrap(); - sm.set(root, old_count)?; - } - - Ok(()) -} - -//------------------------------------------ - -pub fn dump(opts: ThinDumpOptions) -> Result<()> { - let ctx = mk_context(&opts)?; - +fn build_metadata(ctx: &Context, sb: &Superblock) -> Result { let report = &ctx.report; let engine = &ctx.engine; // superblock report.set_title("Reading superblock"); - let sb = read_superblock(engine.as_ref(), SUPERBLOCK_LOCATION)?; - let metadata_root = unpack::(&sb.metadata_sm_root[0..])?; - let data_root = unpack::(&sb.data_sm_root[0..])?; + //let metadata_root = unpack::(&sb.metadata_sm_root[0..])?; + //let data_root = unpack::(&sb.data_sm_root[0..])?; let mut path = Vec::new(); path.push(0); report.set_title("Reading device details"); - let devs = btree_to_map::(&mut path, engine.clone(), true, sb.details_root)?; + let details = btree_to_map::(&mut path, engine.clone(), true, sb.details_root)?; report.set_title("Reading mappings roots"); - let roots = btree_to_map::(&mut path, engine.clone(), true, sb.mapping_root)?; + let roots; + { + let sm = Arc::new(Mutex::new(RestrictedSpaceMap::new(engine.get_nr_blocks()))); + roots = + btree_to_map_with_path::(&mut path, engine.clone(), sm, true, sb.mapping_root)?; + } report.set_title("Finding shared mappings"); - let (shared, sm) = find_shared_nodes(&ctx, metadata_root.nr_blocks, &roots)?; - report.info(&format!("{} shared nodes found", shared.len())); + let (mut shared, sm) = find_shared_nodes(ctx, &roots)?; + // Add in the roots, because they may not be shared. + for (_thin_id, (_path, root)) in &roots { + shared.insert(*root); + } + + let entry_map = collect_leaves(&ctx, &mut shared, sm)?; + let mut defs = Vec::new(); + let mut devs = Vec::new(); + + let mut seen = BTreeSet::new(); + for (thin_id, (_path, root)) in roots { + let id = thin_id as u64; + let detail = details.get(&id).expect("couldn't find device details"); + seen.insert(root); + let es = entry_map.get(&root).unwrap(); + let kr = KeyRange::new(); // FIXME: finish + devs.push(Device { + thin_id: thin_id as u32, + detail: detail.clone(), + map: Mapping { + kr, + entries: es.to_vec(), + }, + }); + } + + for b in shared { + if !seen.contains(&b) { + let es = entry_map.get(&b).unwrap(); + let kr = KeyRange::new(); // FIXME: finish + defs.push(Def { + def_id: b, + map: Mapping { + kr, + entries: es.to_vec(), + }, + }); + } + } + + Ok(Metadata { defs, devs }) +} + +//------------------------------------------ + +fn gather_entries(g: &mut Gatherer, es: &Vec) { + g.new_seq(); + for e in es { + match e { + Entry::Leaf(b) => { + g.next(*b); + } + Entry::Ref(_id) => { + g.new_seq(); + } + } + } +} + +fn entries_to_runs(runs: &BTreeMap>, es: &Vec) -> Vec { + use Entry::*; + + let mut result = Vec::new(); + let mut entry_index = 0; + while entry_index < es.len() { + match es[entry_index] { + Ref(id) => { + result.push(Ref(id)); + entry_index += 1; + } + Leaf(b) => { + if let Some(run) = runs.get(&b) { + result.push(Ref(b)); + entry_index += run.len(); + } else { + result.push(Leaf(b)); + entry_index += 1; + } + } + } + } + + result +} + +// FIXME: do we really need to track kr? +// FIXME: I think this may be better done as part of restore. +fn optimise_metadata(md: Metadata) -> Result { + use Entry::*; + + let mut g = Gatherer::new(); + for d in &md.defs { + gather_entries(&mut g, &d.map.entries); + } + + for d in &md.devs { + gather_entries(&mut g, &d.map.entries); + } + + let mut defs = Vec::new(); + let mut devs = Vec::new(); + let mut runs = BTreeMap::new(); + for run in g.gather() { + runs.insert(run[0], run); + } + eprintln!("{} runs", runs.len()); + + // The runs become additional defs that just contain leaves. + for (head, run) in runs.iter() { + let kr = KeyRange::new(); + let entries: Vec = run.iter().map(|b| Leaf(*b)).collect(); + defs.push(Def { + def_id: *head, + map: Mapping { kr, entries }, + }); + } + + // Expand old defs to use the new atomic runs + for d in &md.defs { + let kr = KeyRange::new(); + let entries = entries_to_runs(&runs, &d.map.entries); + + defs.push(Def { + def_id: d.def_id, + map: Mapping { kr, entries }, + }); + } + + // Expand old devs to use the new atomic runs + for d in &md.devs { + let kr = KeyRange::new(); + let entries = entries_to_runs(&runs, &d.map.entries); + devs.push(Device { + thin_id: d.thin_id, + detail: d.detail, + map: Mapping { kr, entries }, + }); + } + + Ok(Metadata { defs, devs }) +} + +//------------------------------------------ + +fn emit_leaf(out: &mut dyn xml::MetadataVisitor, b: &Block) -> Result<()> { + use Node::*; + let v = MappingVisitor::new(out); + let path = Vec::new(); + let kr = KeyRange::new(); + + let bt = checksum::metadata_block_type(b.get_data()); + if bt != checksum::BT::NODE { + return Err(anyhow!(format!( + "checksum failed for node {}, {:?}", + b.loc, bt + ))); + } + + let node = unpack_node::(&path, &b.get_data(), true, true)?; + + match node { + Internal { .. } => { + return Err(anyhow!("not a leaf")); + } + Leaf { + header, + keys, + values, + } => { + if let Err(_e) = v.visit(&path, &kr, &header, &keys, &values) { + return Err(anyhow!("couldn't emit leaf node")); + } + } + } + + Ok(()) +} + +fn read_for(engine: Arc, blocks: &[u64], mut t: T) -> Result<()> +where + T: FnMut(Block) -> Result<()>, +{ + for cs in blocks.chunks(engine.get_batch_size()) { + for b in engine + .read_many(cs) + .map_err(|_e| anyhow!("read_many failed"))? + { + t(b.map_err(|_e| anyhow!("read of individual block failed"))?)?; + } + } + + Ok(()) +} + +fn emit_leaves(ctx: &Context, out: &mut dyn xml::MetadataVisitor, ls: &[u64]) -> Result<()> { + let proc = |b| { + emit_leaf(out, &b)?; + Ok(()) + }; + + read_for(ctx.engine.clone(), ls, proc) +} + +fn emit_entries( + ctx: &Context, + out: &mut xml::XmlWriter, + entries: &Vec, +) -> Result<()> { + let mut leaves = Vec::new(); + + for e in entries { + match e { + Entry::Leaf(b) => { + leaves.push(*b); + } + Entry::Ref(id) => { + if leaves.len() > 0 { + emit_leaves(&ctx, out, &leaves[0..])?; + leaves.clear(); + } + let str = format!("{}", id); + out.ref_shared(&str)?; + } + } + } + + if leaves.len() > 0 { + emit_leaves(&ctx, out, &leaves[0..])?; + } + + Ok(()) +} + +fn dump_metadata(ctx: &Context, sb: &Superblock, md: &Metadata) -> Result<()> { + let data_root = unpack::(&sb.data_sm_root[0..])?; let mut out = xml::XmlWriter::new(std::io::stdout()); let xml_sb = xml::Superblock { uuid: "".to_string(), @@ -294,25 +584,24 @@ pub fn dump(opts: ThinDumpOptions) -> Result<()> { }; out.superblock_b(&xml_sb)?; - report.set_title("Dumping shared regions"); - for b in shared { - out.def_shared_b(&format!("{}", b))?; - dump_node(&ctx, &mut out, b, &sm, true)?; + ctx.report.set_title("Dumping shared regions"); + for d in &md.defs { + out.def_shared_b(&format!("{}", d.def_id))?; + emit_entries(ctx, &mut out, &d.map.entries)?; out.def_shared_e()?; } - report.set_title("Dumping mappings"); - for (thin_id, detail) in devs { - let d = xml::Device { - dev_id: thin_id as u32, - mapped_blocks: detail.mapped_blocks, - transaction: detail.transaction_id, - creation_time: detail.creation_time as u64, - snap_time: detail.snapshotted_time as u64, + ctx.report.set_title("Dumping devices"); + for dev in &md.devs { + let device = xml::Device { + dev_id: dev.thin_id, + mapped_blocks: dev.detail.mapped_blocks, + transaction: dev.detail.transaction_id, + creation_time: dev.detail.creation_time as u64, + snap_time: dev.detail.snapshotted_time as u64, }; - out.device_b(&d)?; - let root = roots.get(&thin_id).unwrap(); - dump_node(&ctx, &mut out, *root, &sm, false)?; + out.device_b(&device)?; + emit_entries(ctx, &mut out, &dev.map.entries)?; out.device_e()?; } out.superblock_e()?; @@ -321,3 +610,16 @@ pub fn dump(opts: ThinDumpOptions) -> Result<()> { } //------------------------------------------ + +pub fn dump(opts: ThinDumpOptions) -> Result<()> { + let ctx = mk_context(&opts)?; + let sb = read_superblock(ctx.engine.as_ref(), SUPERBLOCK_LOCATION)?; + let md = build_metadata(&ctx, &sb)?; + + ctx.report + .set_title("Optimising metadata to improve leaf packing"); + let md = optimise_metadata(md)?; + dump_metadata(&ctx, &sb, &md) +} + +//------------------------------------------ diff --git a/src/thin/mod.rs b/src/thin/mod.rs index b333e81..db3dfc4 100644 --- a/src/thin/mod.rs +++ b/src/thin/mod.rs @@ -1,7 +1,8 @@ pub mod block_time; -pub mod device_detail; -pub mod superblock; pub mod check; +pub mod device_detail; pub mod dump; pub mod restore; +pub mod runs; +pub mod superblock; pub mod xml; diff --git a/src/thin/runs.rs b/src/thin/runs.rs new file mode 100644 index 0000000..d81b49b --- /dev/null +++ b/src/thin/runs.rs @@ -0,0 +1,194 @@ +use anyhow::{anyhow, Result}; +use std::collections::{BTreeMap, BTreeSet}; +use std::mem; + +//------------------------------------------ + +#[derive(Clone, Debug)] +struct Entry { + neighbours: BTreeSet, +} + +impl Entry { + fn first_neighbour(&self) -> Option { + self.neighbours.iter().cloned().next() + } +} + +pub struct Gatherer { + prev: Option, + heads: BTreeSet, + tails: BTreeSet, + entries: BTreeMap, +} + +impl Gatherer { + pub fn new() -> Gatherer { + Gatherer { + prev: None, + heads: BTreeSet::new(), + tails: BTreeSet::new(), + entries: BTreeMap::new(), + } + } + + fn is_head(&self, b: u64) -> bool { + self.heads.contains(&b) + } + + fn mark_head(&mut self, b: u64) { + self.heads.insert(b); + } + + fn is_tail(&self, b: u64) -> bool { + self.tails.contains(&b) + } + + fn mark_tail(&mut self, b: u64) { + self.tails.insert(b); + } + + pub fn new_seq(&mut self) { + if let Some(b) = self.prev { + self.mark_tail(b); + } + + self.prev = None; + } + + pub fn next(&mut self, b: u64) { + if let Some(prev) = self.prev { + let e = self.entries.get_mut(&prev).unwrap(); + e.neighbours.insert(b); + } else { + self.mark_head(b); + } + + if self.entries.get(&b).is_none() { + let e = Entry { + neighbours: BTreeSet::new(), + }; + self.entries.insert(b, e); + } + + self.prev = Some(b); + } + + fn extract_seq(&self, mut b: u64) -> Vec { + let mut r = Vec::new(); + + // FIXME: remove + assert!(self.is_head(b)); + + loop { + r.push(b); + + if self.is_tail(b) { + return r; + } + + let e = self.entries.get(&b).unwrap(); + + b = e.first_neighbour().unwrap(); + } + } + + fn complete_heads_and_tails(&mut self) { + let mut tails = BTreeSet::new(); + + // add extra tails + for (b, e) in self.entries.iter() { + if e.neighbours.len() != 1 { + tails.insert(*b); + } + + if let Some(n) = e.first_neighbour() { + if self.is_head(n) { + tails.insert(*b); + } + } + } + + for t in tails { + self.mark_tail(t); + } + + // Now we need to mark entries that follow a tail as heads. + let mut heads = mem::take(&mut self.heads); + for t in &self.tails { + if let Some(e) = self.entries.get(&t) { + for n in &e.neighbours { + heads.insert(*n); + } + } + } + mem::swap(&mut heads, &mut self.heads); + } + + // Returns atomic subsequences. + pub fn gather(&mut self) -> Vec> { + // close the last sequence. + self.new_seq(); + self.complete_heads_and_tails(); + + // FIXME: there must be a 'map' + let mut seqs = Vec::new(); + for b in &self.heads { + seqs.push(self.extract_seq(*b)); + } + seqs + } +} + +//------------------------------------------ + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn gather() { + struct Test(Vec>, Vec>); + + let tests = vec![ + Test(vec![], vec![]), + Test(vec![vec![1]], vec![vec![1]]), + Test(vec![vec![1, 2, 3]], vec![vec![1, 2, 3]]), + Test(vec![vec![1, 2], vec![1, 2, 3]], vec![vec![1, 2], vec![3]]), + Test( + vec![vec![1, 2, 3, 4], vec![2, 3, 4, 5]], + vec![vec![1], vec![2, 3, 4], vec![5]], + ), + Test( + vec![vec![2, 3, 4, 5], vec![1, 2, 3, 4]], + vec![vec![1], vec![2, 3, 4], vec![5]], + ), + Test( + vec![ + vec![1, 2, 3, 4], + vec![2, 3, 4, 5, 6], + vec![3, 4], + vec![5, 6], + ], + vec![vec![1], vec![2], vec![3, 4], vec![5, 6]], + ), + ]; + + for t in tests { + eprintln!("new test case"); + let mut g = Gatherer::new(); + for s in t.0 { + g.new_seq(); + for b in s { + g.next(b); + } + } + + let seqs = g.gather(); + + assert_eq!(seqs, t.1); + } + } +} + +//------------------------------------------ diff --git a/src/write_batcher.rs b/src/write_batcher.rs new file mode 100644 index 0000000..029a721 --- /dev/null +++ b/src/write_batcher.rs @@ -0,0 +1,61 @@ +use anyhow::{anyhow, Result}; +use std::sync::{Arc, Mutex}; + +use crate::checksum; +use crate::io_engine::*; +use crate::pdata::space_map::*; + +//------------------------------------------ + +pub struct WriteBatcher { + engine: Arc, + sm: Arc>, + + batch_size: usize, + queue: Vec, +} + +impl WriteBatcher { + pub fn new( + engine: Arc, + sm: Arc>, + batch_size: usize, + ) -> WriteBatcher { + WriteBatcher { + engine, + sm, + batch_size, + queue: Vec::with_capacity(batch_size), + } + } + + pub fn alloc(&mut self) -> Result { + let mut sm = self.sm.lock().unwrap(); + let b = sm.alloc()?; + + if b.is_none() { + return Err(anyhow!("out of metadata space")); + } + + Ok(b.unwrap()) + } + + pub fn write(&mut self, b: Block, kind: checksum::BT) -> Result<()> { + checksum::write_checksum(&mut b.get_data(), kind)?; + + if self.queue.len() == self.batch_size { + self.flush()?; + } + + self.queue.push(b); + Ok(()) + } + + pub fn flush(&mut self) -> Result<()> { + self.engine.write_many(&self.queue)?; + self.queue.clear(); + Ok(()) + } +} + +//------------------------------------------