From cda92de441f6ecd830a19f47611901edf3b62e26 Mon Sep 17 00:00:00 2001 From: Joe Thornber Date: Fri, 21 Aug 2020 10:10:49 +0100 Subject: [PATCH] [thin_check (rust)] Add a threaded version of btree walk. Bottom level mappings use this if there are few devices. Performance is a bit slower for io_uring, and much slower for sync io (which I think is due to io scheduling which I can't do much about). --- src/pdata/btree.rs | 134 +++++++++++++++++++++++++++++++++++++-------- src/thin/check.rs | 63 +++++++++++++-------- 2 files changed, 149 insertions(+), 48 deletions(-) diff --git a/src/pdata/btree.rs b/src/pdata/btree.rs index 081db0d..ab49954 100644 --- a/src/pdata/btree.rs +++ b/src/pdata/btree.rs @@ -2,6 +2,7 @@ use anyhow::{anyhow, Result}; use nom::{number::complete::*, IResult}; use std::collections::BTreeMap; use std::sync::{Arc, Mutex}; +use threadpool::ThreadPool; use crate::checksum; use crate::io_engine::*; @@ -153,8 +154,8 @@ pub trait NodeVisitor { #[derive(Clone)] pub struct BTreeWalker { - pub engine: Arc, - pub sm: Arc>, + engine: Arc, + sm: Arc>, ignore_non_fatal: bool, } @@ -194,7 +195,7 @@ impl BTreeWalker { Ok(count) } - fn walk_nodes(&mut self, visitor: &mut NV, bs: &[u64]) -> Result<()> + fn walk_nodes(&self, visitor: &NV, bs: &[u64]) -> Result<()> where NV: NodeVisitor, V: Unpack, @@ -215,7 +216,7 @@ impl BTreeWalker { Ok(()) } - fn walk_node(&mut self, visitor: &mut NV, b: &Block, is_root: bool) -> Result<()> + fn walk_node(&self, visitor: &NV, b: &Block, is_root: bool) -> Result<()> where NV: NodeVisitor, V: Unpack, @@ -237,7 +238,11 @@ impl BTreeWalker { } => { self.walk_nodes(visitor, &values)?; } - Leaf { header, keys, values } => { + Leaf { + header, + keys, + values, + } => { visitor.visit(&header, &keys, &values)?; } } @@ -245,19 +250,7 @@ impl BTreeWalker { Ok(()) } - pub fn walk_b(&mut self, visitor: &mut NV, root: &Block) -> Result<()> - where - NV: NodeVisitor, - V: Unpack, - { - if self.sm_inc(root.loc)? > 0 { - Ok(()) - } else { - self.walk_node(visitor, &root, true) - } - } - - pub fn walk(&mut self, visitor: &mut NV, root: u64) -> Result<()> + pub fn walk(&self, visitor: &NV, root: u64) -> Result<()> where NV: NodeVisitor, V: Unpack, @@ -272,6 +265,99 @@ impl BTreeWalker { } } +//-------------------------------- + +fn walk_node_threaded( + w: Arc, + pool: &ThreadPool, + visitor: Arc, + b: &Block, + is_root: bool, +) -> Result<()> +where + NV: NodeVisitor + Send + Sync + 'static, + V: Unpack, +{ + use Node::*; + + let bt = checksum::metadata_block_type(b.get_data()); + if bt != checksum::BT::NODE { + return Err(anyhow!("checksum failed for node {}, {:?}", b.loc, bt)); + } + + let node = unpack_node::(&b.get_data(), w.ignore_non_fatal, is_root)?; + + match node { + Internal { + header: _h, + keys: _k, + values, + } => { + walk_nodes_threaded(w, pool, visitor, &values)?; + } + Leaf { + header, + keys, + values, + } => { + visitor.visit(&header, &keys, &values)?; + } + } + + Ok(()) +} + +fn walk_nodes_threaded( + w: Arc, + pool: &ThreadPool, + visitor: Arc, + bs: &[u64], +) -> Result<()> +where + NV: NodeVisitor + Send + Sync + 'static, + V: Unpack, +{ + let mut blocks = Vec::new(); + for b in bs { + if w.sm_inc(*b)? == 0 { + blocks.push(Block::new(*b)); + } + } + + w.engine.read_many(&mut blocks)?; + + for b in blocks { + let w = w.clone(); + let visitor = visitor.clone(); + pool.execute(move || { + // FIXME: return result + w.walk_node(visitor.as_ref(), &b, false); + }); + } + pool.join(); + + Ok(()) +} + +pub fn walk_threaded( + w: Arc, + pool: &ThreadPool, + visitor: Arc, + root: u64, +) -> Result<()> +where + NV: NodeVisitor + Send + Sync + 'static, + V: Unpack, +{ + if w.sm_inc(root)? > 0 { + Ok(()) + } else { + let mut root = Block::new(root); + w.engine.read(&mut root)?; + walk_node_threaded(w, pool, visitor, &root, true) + } +} + //------------------------------------------ struct ValueCollector { @@ -302,10 +388,10 @@ pub fn btree_to_map( ignore_non_fatal: bool, root: u64, ) -> Result> { - let mut walker = BTreeWalker::new(engine, ignore_non_fatal); - let mut visitor = ValueCollector::::new(); + let walker = BTreeWalker::new(engine, ignore_non_fatal); + let visitor = ValueCollector::::new(); - walker.walk(&mut visitor, root)?; + walker.walk(&visitor, root)?; Ok(visitor.values.into_inner().unwrap()) } @@ -315,10 +401,10 @@ pub fn btree_to_map_with_sm( ignore_non_fatal: bool, root: u64, ) -> Result> { - let mut walker = BTreeWalker::new_with_sm(engine, sm, ignore_non_fatal)?; - let mut visitor = ValueCollector::::new(); + let walker = BTreeWalker::new_with_sm(engine, sm, ignore_non_fatal)?; + let visitor = ValueCollector::::new(); - walker.walk(&mut visitor, root)?; + walker.walk(&visitor, root)?; Ok(visitor.values.into_inner().unwrap()) } diff --git a/src/thin/check.rs b/src/thin/check.rs index b9d52a3..f1e0dc6 100644 --- a/src/thin/check.rs +++ b/src/thin/check.rs @@ -9,7 +9,7 @@ use threadpool::ThreadPool; use crate::checksum; use crate::io_engine::{AsyncIoEngine, Block, IoEngine, SyncIoEngine}; -use crate::pdata::btree::{btree_to_map, btree_to_map_with_sm, BTreeWalker, NodeHeader, NodeVisitor}; +use crate::pdata::btree::*; use crate::pdata::space_map::*; use crate::pdata::unpack::*; use crate::report::*; @@ -162,14 +162,14 @@ fn check_space_map( // overflow btree { - let mut v = OverflowChecker::new(&*sm); - let mut w; + let v = OverflowChecker::new(&*sm); + let w; if metadata_sm.is_none() { w = BTreeWalker::new(engine.clone(), false); } else { w = BTreeWalker::new_with_sm(engine.clone(), metadata_sm.unwrap().clone(), false)?; } - w.walk(&mut v, root.ref_count_root)?; + w.walk(&v, root.ref_count_root)?; } let mut blocks = Vec::with_capacity(entries.len()); @@ -258,14 +258,14 @@ fn repair_space_map(ctx: &Context, entries: Vec, sm: ASpaceMap) -> R if blocknr >= sm.get_nr_blocks()? { break; } - + if let BitmapEntry::Small(actual) = e { let expected = sm.get(blocknr)?; if *actual == 1 && expected == 0 { *e = BitmapEntry::Small(0); } } - + blocknr += 1; } @@ -357,24 +357,34 @@ fn check_mapping_bottom_level( ) -> Result<()> { ctx.report.set_sub_title("mapping tree"); - for (_thin_id, root) in roots { - let mut w = BTreeWalker::new_with_sm(ctx.engine.clone(), metadata_sm.clone(), false)?; - let data_sm = data_sm.clone(); - let root = *root; - ctx.pool.execute(move || { - let mut v = BottomLevelVisitor { data_sm }; + let w = Arc::new(BTreeWalker::new_with_sm( + ctx.engine.clone(), + metadata_sm.clone(), + false, + )?); - // FIXME: return error - match w.walk(&mut v, root) { - Err(e) => { - eprintln!("walk failed {:?}", e); - std::process::abort(); - } - Ok(_result) => {} - } - }); + if roots.len() > 64 { + ctx.report.info("spreading load across devices"); + for (_thin_id, root) in roots { + let data_sm = data_sm.clone(); + let root = *root; + let v = BottomLevelVisitor { data_sm }; + let w = w.clone(); + ctx.pool.execute(move || { + let _r = w.walk(&v, root); + }); + } + ctx.pool.join(); + } else { + ctx.report.info("spreading load within device"); + for (_thin_id, root) in roots { + let w = w.clone(); + let data_sm = data_sm.clone(); + let root = *root; + let v = Arc::new(BottomLevelVisitor { data_sm }); + walk_threaded(w, &ctx.pool, v, root)? + } } - ctx.pool.join(); Ok(()) } @@ -385,7 +395,11 @@ fn mk_context(opts: &ThinCheckOptions) -> Result { if opts.async_io { nr_threads = std::cmp::min(4, num_cpus::get()); - engine = Arc::new(AsyncIoEngine::new(opts.dev, MAX_CONCURRENT_IO, opts.auto_repair)?); + engine = Arc::new(AsyncIoEngine::new( + opts.dev, + MAX_CONCURRENT_IO, + opts.auto_repair, + )?); } else { nr_threads = num_cpus::get() * 2; engine = Arc::new(SyncIoEngine::new(opts.dev, nr_threads, opts.auto_repair)?); @@ -504,7 +518,8 @@ pub fn check(opts: ThinCheckOptions) -> Result<()> { )?; // Now the counts should be correct and we can check it. - let metadata_leaks = check_space_map(&ctx, "metadata", entries, None, metadata_sm.clone(), root)?; + let metadata_leaks = + check_space_map(&ctx, "metadata", entries, None, metadata_sm.clone(), root)?; bail_out(&ctx, "metadata space map")?; if opts.auto_repair {