From 34052c540c0c218a479f58e1292c50d9ee094b99 Mon Sep 17 00:00:00 2001 From: Joe Thornber Date: Wed, 23 Sep 2020 15:35:09 +0100 Subject: [PATCH] [thin_check (rust)] Reinstate walk_node_threaded --- src/pdata/btree.rs | 257 +++++++++++++++++++++++++++++++-------------- src/thin/check.rs | 76 +++++++++++--- 2 files changed, 238 insertions(+), 95 deletions(-) diff --git a/src/pdata/btree.rs b/src/pdata/btree.rs index 8904937..ac959d6 100644 --- a/src/pdata/btree.rs +++ b/src/pdata/btree.rs @@ -1,18 +1,18 @@ -use anyhow::{anyhow}; +use anyhow::anyhow; use byteorder::{ReadBytesExt, WriteBytesExt}; +use data_encoding::BASE64; use nom::{number::complete::*, IResult}; use std::collections::BTreeMap; use std::fmt; use std::sync::{Arc, Mutex}; use thiserror::Error; use threadpool::ThreadPool; -use data_encoding::BASE64; use crate::checksum; use crate::io_engine::*; +use crate::pack::vm; use crate::pdata::space_map::*; use crate::pdata::unpack::*; -use crate::pack::vm; //------------------------------------------ @@ -181,10 +181,10 @@ fn test_split_range() { fn split_one(path: &Vec, kr: &KeyRange, k: u64) -> Result<(KeyRange, KeyRange)> { match kr.split(k) { None => { - return Err(node_err(path, &format!( - "couldn't split key range {} at {}", - kr, k - ))); + return Err(node_err( + path, + &format!("couldn't split key range {} at {}", kr, k), + )); } Some(pair) => Ok(pair), } @@ -222,7 +222,7 @@ pub fn encode_node_path(path: &[u64]) -> String { let mut buffer: Vec = Vec::with_capacity(128); let mut cursor = std::io::Cursor::new(&mut buffer); assert!(path.len() < 256); - + // The first entry is normally the superblock (0), so we // special case this. if path.len() > 0 && path[0] == 0 { @@ -234,17 +234,19 @@ pub fn encode_node_path(path: &[u64]) -> String { cursor.write_u8(count as u8).unwrap(); vm::pack_u64s(&mut cursor, path).unwrap(); } - + BASE64.encode(&buffer) } pub fn decode_node_path(text: &str) -> anyhow::Result> { let mut buffer = vec![0; 128]; let bytes = &mut buffer[0..BASE64.decode_len(text.len()).unwrap()]; - BASE64.decode_mut(text.as_bytes(), &mut bytes[0..]).map_err(|_| anyhow!("bad node path. Unable to base64 decode."))?; - + BASE64 + .decode_mut(text.as_bytes(), &mut bytes[0..]) + .map_err(|_| anyhow!("bad node path. Unable to base64 decode."))?; + let mut input = std::io::Cursor::new(bytes); - + let mut count = input.read_u8()?; let mut prepend_zero = false; if (count & 0x1) == 0 { @@ -344,7 +346,10 @@ impl fmt::Display for BTreeError { } } pub fn node_err(path: &Vec, msg: &str) -> BTreeError { - BTreeError::Path(path.clone(), Box::new(BTreeError::NodeError(msg.to_string()))) + BTreeError::Path( + path.clone(), + Box::new(BTreeError::NodeError(msg.to_string())), + ) } fn node_err_s(path: &Vec, msg: String) -> BTreeError { @@ -359,7 +364,7 @@ pub fn value_err(msg: String) -> BTreeError { BTreeError::ValueError(msg) } -fn aggregate_error(rs: Vec) -> BTreeError { +pub fn aggregate_error(rs: Vec) -> BTreeError { BTreeError::Aggregate(rs) } @@ -457,19 +462,22 @@ pub fn unpack_node( NodeHeader::unpack(data).map_err(|_e| node_err(path, "couldn't parse node header"))?; if header.is_leaf && header.value_size != V::disk_size() { - return Err(node_err_s(path, format!( - "value_size mismatch: expected {}, was {}", - V::disk_size(), - header.value_size - ))); + return Err(node_err_s( + path, + format!( + "value_size mismatch: expected {}, was {}", + V::disk_size(), + header.value_size + ), + )); } let elt_size = header.value_size + 8; if elt_size as usize * header.max_entries as usize + NODE_HEADER_SIZE > BLOCK_SIZE { - return Err(node_err_s(path, format!( - "max_entries is too large ({})", - header.max_entries - ))); + return Err(node_err_s( + path, + format!("max_entries is too large ({})", header.max_entries), + )); } if header.nr_entries > header.max_entries { @@ -484,10 +492,13 @@ pub fn unpack_node( if !is_root { let min = header.max_entries / 3; if header.nr_entries < min { - return Err(node_err_s(path, format!( - "too few entries {}, expected at least {}", - header.nr_entries, min - ))); + return Err(node_err_s( + path, + format!( + "too few entries {}, expected at least {}", + header.nr_entries, min + ), + )); } } } @@ -530,8 +541,14 @@ pub fn unpack_node( pub trait NodeVisitor { // &self is deliberately non mut to allow the walker to use multiple threads. - fn visit(&self, path: &Vec, keys: &KeyRange, header: &NodeHeader, keys: &[u64], values: &[V]) - -> Result<()>; + fn visit( + &self, + path: &Vec, + keys: &KeyRange, + header: &NodeHeader, + keys: &[u64], + values: &[V], + ) -> Result<()>; } #[derive(Clone)] @@ -625,7 +642,7 @@ impl BTreeWalker { let mut errs: Vec = Vec::new(); let mut blocks = Vec::with_capacity(bs.len()); - let mut filtered_krs = Vec::with_capacity(bs.len()); + let mut filtered_krs = Vec::with_capacity(krs.len()); for i in 0..bs.len() { if self.sm_inc(bs[i]) == 0 { // Node not yet seen @@ -695,10 +712,11 @@ impl BTreeWalker { let bt = checksum::metadata_block_type(b.get_data()); if bt != checksum::BT::NODE { - return Err( - node_err_s(path, format!("checksum failed for node {}, {:?}", b.loc, bt)) - .keys_context(kr), - ); + return Err(node_err_s( + path, + format!("checksum failed for node {}, {:?}", b.loc, bt), + ) + .keys_context(kr)); } let node = unpack_node::(path, &b.get_data(), self.ignore_non_fatal, is_root)?; @@ -767,9 +785,9 @@ impl BTreeWalker { //-------------------------------- -/* -fn walk_node_threaded( +fn walk_node_threaded_( w: Arc, + path: &mut Vec, pool: &ThreadPool, visitor: Arc, kr: &KeyRange, @@ -784,80 +802,140 @@ where let bt = checksum::metadata_block_type(b.get_data()); if bt != checksum::BT::NODE { - return Err(node_err_s(format!( - "checksum failed for node {}, {:?}", - b.loc, bt - ))); + return Err(node_err_s( + path, + format!("checksum failed for node {}, {:?}", b.loc, bt), + ) + .keys_context(kr)); } - let node = unpack_node::(&b.get_data(), w.ignore_non_fatal, is_root)?; + let node = unpack_node::(path, &b.get_data(), w.ignore_non_fatal, is_root)?; match node { Internal { keys, values, .. } => { - let krs = BTreeWalker::split_key_ranges(&kr, &keys)?; - walk_nodes_threaded(w, pool, visitor, &krs, &values)?; + let krs = split_key_ranges(path, &kr, &keys)?; + let errs = walk_nodes_threaded(w.clone(), path, pool, visitor, &krs, &values); + return w.build_aggregate(b.loc, errs); } Leaf { header, keys, values, } => { - visitor.visit(kr, &header, &keys, &values)?; + visitor.visit(path, kr, &header, &keys, &values)?; } } Ok(()) } -fn walk_nodes_threaded( +fn walk_node_threaded( w: Arc, + path: &mut Vec, pool: &ThreadPool, visitor: Arc, - krs: &[KeyRange], - bs: &[u64], + kr: &KeyRange, + b: &Block, + is_root: bool, ) -> Result<()> where NV: NodeVisitor + Send + Sync + 'static, V: Unpack, { - let mut blocks = Vec::new(); - for b in bs { - if w.sm_inc(*b)? == 0 { - blocks.push(*b); + path.push(b.loc); + let r = walk_node_threaded_(w, path, pool, visitor, kr, b, is_root); + path.pop(); + r +} + +fn walk_nodes_threaded( + w: Arc, + path: &mut Vec, + pool: &ThreadPool, + visitor: Arc, + krs: &[KeyRange], + bs: &[u64], +) -> Vec +where + NV: NodeVisitor + Send + Sync + 'static, + V: Unpack, +{ + assert_eq!(krs.len(), bs.len()); + let mut errs: Vec = Vec::new(); + + let mut blocks = Vec::with_capacity(bs.len()); + let mut filtered_krs = Vec::with_capacity(krs.len()); + for i in 0..bs.len() { + if w.sm_inc(bs[i]) == 0 { + // Node not yet seen + blocks.push(bs[i]); + filtered_krs.push(krs[i].clone()); + } else { + // This node has already been checked ... + match w.failed(bs[i]) { + None => { + // ... it was clean so we can ignore. + } + Some(e) => { + // ... there was an error + errs.push(e.clone()); + } + } } } - let rblocks = convert_io_err(w.engine.read_many(&blocks[0..]))?; - - let mut i = 0; - for b in rblocks { - match b { - Err(_) => { - // FIXME: aggregate these - return Err(io_err()); + match w.engine.read_many(&blocks[0..]) { + Err(_) => { + // IO completely failed error every block + for (i, b) in blocks.iter().enumerate() { + let e = io_err(path).keys_context(&filtered_krs[i]); + errs.push(e.clone()); + w.set_fail(*b, e); } - Ok(b) => { - let w = w.clone(); - let visitor = visitor.clone(); - let kr = krs[i].clone(); + } + Ok(rblocks) => { + let mut i = 0; + let errs = Arc::new(Mutex::new(Vec::new())); - pool.execute(move || { - let result = w.walk_node(visitor.as_ref(), &kr, &b, false); - if result.is_err() { - todo!(); + for rb in rblocks { + match rb { + Err(_) => { + let e = io_err(path).keys_context(&filtered_krs[i]); + let mut errs = errs.lock().unwrap(); + errs.push(e.clone()); + w.set_fail(blocks[i], e); } - }); + Ok(b) => { + let w = w.clone(); + let visitor = visitor.clone(); + let kr = filtered_krs[i].clone(); + let errs = errs.clone(); + let mut path = path.clone(); + + pool.execute(move || { + match w.walk_node(&mut path, visitor.as_ref(), &kr, &b, false) { + Err(e) => { + let mut errs = errs.lock().unwrap(); + errs.push(e); + } + Ok(()) => {} + } + }); + } + } + + i += 1; } + + pool.join(); } - - i += 1; } - pool.join(); - Ok(()) + errs } pub fn walk_threaded( + path: &mut Vec, w: Arc, pool: &ThreadPool, visitor: Arc, @@ -867,19 +945,23 @@ where NV: NodeVisitor + Send + Sync + 'static, V: Unpack, { - if w.sm_inc(root)? > 0 { - Ok(()) + if w.sm_inc(root) > 0 { + if let Some(e) = w.failed(root) { + Err(e.clone()) + } else { + Ok(()) + } } else { - let root = convert_io_err(w.engine.read(root))?; + let root = w.engine.read(root).map_err(|_| io_err(path))?; let kr = KeyRange { start: None, end: None, }; - walk_node_threaded(w, pool, visitor, &kr, &root, true) + walk_node_threaded(w, path, pool, visitor, &kr, &root, true) } } -*/ +/* pub fn walk_threaded( path: &mut Vec, w: Arc, @@ -893,6 +975,7 @@ where { w.walk(path, visitor.as_ref(), root) } +*/ //------------------------------------------ @@ -910,7 +993,14 @@ impl ValueCollector { // FIXME: should we be using Copy rather than clone? (Yes) impl NodeVisitor for ValueCollector { - fn visit(&self, _path: &Vec, _kr: &KeyRange, _h: &NodeHeader, keys: &[u64], values: &[V]) -> Result<()> { + fn visit( + &self, + _path: &Vec, + _kr: &KeyRange, + _h: &NodeHeader, + keys: &[u64], + values: &[V], + ) -> Result<()> { let mut vals = self.values.lock().unwrap(); for n in 0..keys.len() { vals.insert(keys[n], values[n].clone()); @@ -950,7 +1040,7 @@ pub fn btree_to_map_with_sm( //------------------------------------------ struct ValuePathCollector { - values: Mutex, V)>> + values: Mutex, V)>>, } impl ValuePathCollector { @@ -962,7 +1052,14 @@ impl ValuePathCollector { } impl NodeVisitor for ValuePathCollector { - fn visit(&self, path: &Vec, _kr: &KeyRange, _h: &NodeHeader, keys: &[u64], values: &[V]) -> Result<()> { + fn visit( + &self, + path: &Vec, + _kr: &KeyRange, + _h: &NodeHeader, + keys: &[u64], + values: &[V], + ) -> Result<()> { let mut vals = self.values.lock().unwrap(); for n in 0..keys.len() { vals.insert(keys[n], (path.clone(), values[n].clone())); diff --git a/src/thin/check.rs b/src/thin/check.rs index 72db4cd..b8dacbf 100644 --- a/src/thin/check.rs +++ b/src/thin/check.rs @@ -13,8 +13,8 @@ use crate::pdata::btree::{self, *}; use crate::pdata::space_map::*; use crate::pdata::unpack::*; use crate::report::*; -use crate::thin::superblock::*; use crate::thin::block_time::*; +use crate::thin::superblock::*; //------------------------------------------ @@ -25,7 +25,14 @@ struct BottomLevelVisitor { //------------------------------------------ impl NodeVisitor for BottomLevelVisitor { - fn visit(&self, _path: &Vec, _kr: &KeyRange, _h: &NodeHeader, _k: &[u64], values: &[BlockTime]) -> btree::Result<()> { + fn visit( + &self, + _path: &Vec, + _kr: &KeyRange, + _h: &NodeHeader, + _k: &[u64], + values: &[BlockTime], + ) -> btree::Result<()> { // FIXME: do other checks if values.len() == 0 { @@ -99,7 +106,14 @@ impl<'a> OverflowChecker<'a> { } impl<'a> NodeVisitor for OverflowChecker<'a> { - fn visit(&self, _path: &Vec, _kr: &KeyRange, _h: &NodeHeader, keys: &[u64], values: &[u32]) -> btree::Result<()> { + fn visit( + &self, + _path: &Vec, + _kr: &KeyRange, + _h: &NodeHeader, + keys: &[u64], + values: &[u32], + ) -> btree::Result<()> { for n in 0..keys.len() { let k = keys[n]; let v = values[n]; @@ -164,7 +178,7 @@ fn check_space_map( match b { Err(_e) => { todo!(); - }, + } Ok(b) => { if checksum::metadata_block_type(&b.get_data()) != checksum::BT::BITMAP { report.fatal(&format!( @@ -358,20 +372,34 @@ fn check_mapping_bottom_level( false, )?); - if roots.len() > 64000 { + // We want to print out errors as we progress, so we aggregate for each thin and print + // at that point. + let mut failed = false; + + if roots.len() > 64 { ctx.report.info("spreading load across devices"); + let errs = Arc::new(Mutex::new(Vec::new())); for (_thin_id, (path, root)) in roots { let data_sm = data_sm.clone(); let root = *root; let v = BottomLevelVisitor { data_sm }; let w = w.clone(); let mut path = path.clone(); + let errs = errs.clone(); + ctx.pool.execute(move || { - // FIXME: propogate errors + share fails. - let _r = w.walk(&mut path, &v, root); + if let Err(e) = w.walk(&mut path, &v, root) { + let mut errs = errs.lock().unwrap(); + errs.push(e); + } }); } ctx.pool.join(); + let errs = Arc::try_unwrap(errs).unwrap().into_inner().unwrap(); + if errs.len() > 0 { + eprintln!("{}", aggregate_error(errs)); + failed = true; + } } else { ctx.report.info("spreading load within device"); for (_thin_id, (path, root)) in roots { @@ -380,12 +408,19 @@ fn check_mapping_bottom_level( let root = *root; let v = Arc::new(BottomLevelVisitor { data_sm }); let mut path = path.clone(); - // FIXME: propogate errors + share fails. - walk_threaded(&mut path, w, &ctx.pool, v, root)? + + if let Err(e) = walk_threaded(&mut path, w, &ctx.pool, v, root) { + failed = true; + eprintln!("{}", e); + } } } - Ok(()) + if failed { + Err(anyhow!("Check of mappings failed")) + } else { + Ok(()) + } } fn mk_context(opts: &ThinCheckOptions) -> Result { @@ -464,15 +499,19 @@ pub fn check(opts: ThinCheckOptions) -> Result<()> { // mapping top level report.set_sub_title("mapping tree"); - let roots = - btree_to_map_with_path::(&mut path, engine.clone(), metadata_sm.clone(), false, sb.mapping_root)?; + let roots = btree_to_map_with_path::( + &mut path, + engine.clone(), + metadata_sm.clone(), + false, + sb.mapping_root, + )?; // mapping bottom level let root = unpack::(&sb.data_sm_root[0..])?; let data_sm = core_sm(root.nr_blocks, nr_devs as u32); check_mapping_bottom_level(&ctx, &metadata_sm, &data_sm, &roots)?; bail_out(&ctx, "mapping tree")?; - eprintln!("checked mapping"); report.set_sub_title("data space map"); let root = unpack::(&sb.data_sm_root[0..])?; @@ -523,8 +562,15 @@ pub fn check(opts: ThinCheckOptions) -> Result<()> { )?; // Now the counts should be correct and we can check it. - let metadata_leaks = - check_space_map(&mut path, &ctx, "metadata", entries, None, metadata_sm.clone(), root)?; + let metadata_leaks = check_space_map( + &mut path, + &ctx, + "metadata", + entries, + None, + metadata_sm.clone(), + root, + )?; bail_out(&ctx, "metadata space map")?; if opts.auto_repair {