diff --git a/Cargo.lock b/Cargo.lock index e6d5c20..610c998 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -704,6 +704,7 @@ name = "thinp" version = "0.1.0" dependencies = [ "anyhow", + "atty", "base64", "byteorder", "clap", diff --git a/Cargo.toml b/Cargo.toml index 86626a8..d58ec14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" license = "GPL3" [dependencies] +atty = "0.2" anyhow = "1.0" base64 = "0.12" byteorder = "1.3" diff --git a/src/bin/thin_check.rs b/src/bin/thin_check.rs index 2c74c46..59e9696 100644 --- a/src/bin/thin_check.rs +++ b/src/bin/thin_check.rs @@ -1,13 +1,15 @@ extern crate clap; extern crate thinp; +use atty::Stream; use clap::{App, Arg}; use std::path::Path; use std::process; -use thinp::file_utils; -use thinp::thin::check::{check, ThinCheckOptions}; - use std::process::exit; +use std::sync::Arc; +use thinp::file_utils; +use thinp::report::*; +use thinp::thin::check::{check, ThinCheckOptions}; fn main() { let parser = App::new("thin_check") @@ -17,26 +19,27 @@ fn main() { Arg::with_name("QUIET") .help("Suppress output messages, return only exit code.") .short("q") - .long("quiet") - .value_name("QUIET"), + .long("quiet"), ) .arg( Arg::with_name("SB_ONLY") .help("Only check the superblock.") .long("super-block-only") .value_name("SB_ONLY"), + ) .arg( + Arg::with_name("AUTO_REPAIR") + .help("Auto repair trivial issues.") + .long("auto-repair"), ) .arg( - Arg::with_name("ignore-non-fatal-errors") + Arg::with_name("IGNORE_NON_FATAL") .help("Only return a non-zero exit code if a fatal error is found.") - .long("ignore-non-fatal-errors") - .value_name("IGNORE_NON_FATAL"), + .long("ignore-non-fatal-errors"), ) .arg( - Arg::with_name("clear-needs-check-flag") + Arg::with_name("CLEAR_NEEDS_CHECK") .help("Clears the 'needs_check' flag in the superblock") - .long("clear-needs-check") - .value_name("CLEAR_NEEDS_CHECK"), + .long("clear-needs-check"), ) .arg( Arg::with_name("OVERRIDE_MAPPING_ROOT") @@ -61,9 +64,7 @@ fn main() { .arg( Arg::with_name("SYNC_IO") .help("Force use of synchronous io") - .long("sync-io") - .value_name("SYNC_IO") - .takes_value(false), + .long("sync-io"), ); let matches = parser.get_matches(); @@ -74,13 +75,26 @@ fn main() { exit(1); } + let report; + + if matches.is_present("QUIET") { + report = std::sync::Arc::new(mk_quiet_report()); + } else if atty::is(Stream::Stdout) { + report = std::sync::Arc::new(mk_progress_bar_report()); + } else { + report = Arc::new(mk_simple_report()); + } + let opts = ThinCheckOptions { dev: &input_file, async_io: !matches.is_present("SYNC_IO"), + ignore_non_fatal: matches.is_present("IGNORE_NON_FATAL"), + auto_repair: matches.is_present("AUTO_REPAIR"), + report, }; - if let Err(reason) = check(&opts) { - println!("Application error: {}", reason); + if let Err(reason) = check(opts) { + println!("{}", reason); process::exit(1); } } diff --git a/src/checksum.rs b/src/checksum.rs index 9cb3b89..0ffbd47 100644 --- a/src/checksum.rs +++ b/src/checksum.rs @@ -1,4 +1,5 @@ -use byteorder::{LittleEndian, ReadBytesExt}; +use anyhow::{anyhow, Result}; +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use crc32c::crc32c; use std::io::Cursor; @@ -44,3 +45,22 @@ pub fn metadata_block_type(buf: &[u8]) -> BT { } } +pub fn write_checksum(buf: &mut [u8], kind: BT) -> Result<()> { + if buf.len() != BLOCK_SIZE as usize { + return Err(anyhow!("block is wrong size")); + } + + use BT::*; + let salt = match kind { + SUPERBLOCK => SUPERBLOCK_CSUM_XOR, + NODE => BTREE_CSUM_XOR, + BITMAP => BITMAP_CSUM_XOR, + INDEX => INDEX_CSUM_XOR, + UNKNOWN => {return Err(anyhow!("Invalid block type"));} + }; + + let csum = checksum(buf) ^ salt; + let mut out = std::io::Cursor::new(buf); + out.write_u32::(csum)?; + Ok(()) +} diff --git a/src/io_engine.rs b/src/io_engine.rs index e89fae7..61a65ec 100644 --- a/src/io_engine.rs +++ b/src/io_engine.rs @@ -4,11 +4,11 @@ use io_uring::IoUring; use std::alloc::{alloc, dealloc, Layout}; use std::fs::File; use std::fs::OpenOptions; -use std::io::{self, Read, Seek}; +use std::io::{self, Read, Seek, Write}; use std::os::unix::fs::OpenOptionsExt; use std::os::unix::io::{AsRawFd, RawFd}; use std::path::Path; -use std::sync::{Arc, Mutex, Condvar}; +use std::sync::{Arc, Condvar, Mutex}; //------------------------------------------ @@ -50,7 +50,9 @@ unsafe impl Send for Block {} pub trait IoEngine { fn get_nr_blocks(&self) -> u64; fn read(&self, block: &mut Block) -> Result<()>; - fn read_many(&self, blocks: &mut Vec) -> Result<()>; + fn read_many(&self, blocks: &mut [Block]) -> Result<()>; + fn write(&self, block: &Block) -> Result<()>; + fn write_many(&self, blocks: &[Block]) -> Result<()>; } fn get_nr_blocks(path: &Path) -> io::Result { @@ -67,22 +69,22 @@ pub struct SyncIoEngine { } impl SyncIoEngine { - fn open_file(path: &Path) -> Result { + fn open_file(path: &Path, writeable: bool) -> Result { let file = OpenOptions::new() .read(true) - .write(false) + .write(writeable) .custom_flags(libc::O_DIRECT) .open(path)?; Ok(file) } - pub fn new(path: &Path, nr_files: usize) -> Result { + pub fn new(path: &Path, nr_files: usize, writeable: bool) -> Result { let mut files = Vec::new(); for _n in 0..nr_files { - files.push(SyncIoEngine::open_file(path)?); + files.push(SyncIoEngine::open_file(path, writeable)?); } - + Ok(SyncIoEngine { nr_blocks: get_nr_blocks(path)?, files: Mutex::new(files), @@ -120,7 +122,7 @@ impl IoEngine for SyncIoEngine { Ok(()) } - fn read_many(&self, blocks: &mut Vec) -> Result<()> { + fn read_many(&self, blocks: &mut [Block]) -> Result<()> { let mut input = self.get(); for b in blocks { input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; @@ -130,6 +132,26 @@ impl IoEngine for SyncIoEngine { Ok(()) } + + fn write(&self, b: &Block) -> Result<()> { + let mut input = self.get(); + input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; + input.write_all(&b.get_data())?; + self.put(input); + + Ok(()) + } + + fn write_many(&self, blocks: &[Block]) -> Result<()> { + let mut input = self.get(); + for b in blocks { + input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; + input.write_all(&b.get_data())?; + } + self.put(input); + + Ok(()) + } } //------------------------------------------ @@ -147,10 +169,10 @@ pub struct AsyncIoEngine { } impl AsyncIoEngine { - pub fn new(path: &Path, queue_len: u32) -> Result { + pub fn new(path: &Path, queue_len: u32, writeable: bool) -> Result { let input = OpenOptions::new() .read(true) - .write(false) + .write(writeable) .custom_flags(libc::O_DIRECT) .open(path)?; @@ -165,6 +187,7 @@ impl AsyncIoEngine { }) } + // FIXME: refactor next two fns fn read_many_(&self, blocks: &mut [Block]) -> Result<()> { let mut inner = self.inner.lock().unwrap(); let count = blocks.len(); @@ -195,6 +218,37 @@ impl AsyncIoEngine { Ok(()) } + + fn write_many_(&self, blocks: &[Block]) -> Result<()> { + let mut inner = self.inner.lock().unwrap(); + let count = blocks.len(); + let fd = types::Target::Fd(inner.input.as_raw_fd()); + + for b in blocks.iter() { + let write_e = opcode::Write::new(fd, b.data, BLOCK_SIZE as u32) + .offset(b.loc as i64 * BLOCK_SIZE as i64); + + unsafe { + let mut queue = inner.ring.submission().available(); + queue + .push(write_e.build().user_data(1)) + .ok() + .expect("queue is full"); + } + } + + inner.ring.submit_and_wait(count)?; + + let cqes = inner.ring.completion().available().collect::>(); + + // FIXME: return proper errors + assert_eq!(cqes.len(), count); + for c in &cqes { + assert_eq!(c.result(), BLOCK_SIZE as i32); + } + + Ok(()) + } } impl Clone for AsyncIoEngine { @@ -245,7 +299,7 @@ impl IoEngine for AsyncIoEngine { Ok(()) } - fn read_many(&self, blocks: &mut Vec) -> Result<()> { + fn read_many(&self, blocks: &mut [Block]) -> Result<()> { let inner = self.inner.lock().unwrap(); let queue_len = inner.queue_len as usize; drop(inner); @@ -258,6 +312,46 @@ impl IoEngine for AsyncIoEngine { } Ok(()) } + + fn write(&self, b: &Block) -> Result<()> { + let mut inner = self.inner.lock().unwrap(); + let fd = types::Target::Fd(inner.input.as_raw_fd()); + let write_e = opcode::Write::new(fd, b.data, BLOCK_SIZE as u32) + .offset(b.loc as i64 * BLOCK_SIZE as i64); + + unsafe { + let mut queue = inner.ring.submission().available(); + queue + .push(write_e.build().user_data(1)) + .ok() + .expect("queue is full"); + } + + inner.ring.submit_and_wait(1)?; + + let cqes = inner.ring.completion().available().collect::>(); + + // FIXME: return proper errors + assert_eq!(cqes.len(), 1); + assert_eq!(cqes[0].user_data(), 1); + assert_eq!(cqes[0].result(), BLOCK_SIZE as i32); + + Ok(()) + } + + fn write_many(&self, blocks: &[Block]) -> Result<()> { + let inner = self.inner.lock().unwrap(); + let queue_len = inner.queue_len as usize; + drop(inner); + + let mut done = 0; + while done != blocks.len() { + let len = usize::min(blocks.len() - done, queue_len); + self.write_many_(&blocks[done..(done + len)])?; + done += len; + } + Ok(()) + } } //------------------------------------------ diff --git a/src/lib.rs b/src/lib.rs index 529b14c..0dbdfab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,12 +15,13 @@ extern crate quickcheck; #[cfg(test)] extern crate quickcheck_macros; -pub mod io_engine; pub mod cache; pub mod checksum; pub mod file_utils; +pub mod io_engine; pub mod pack; pub mod pdata; +pub mod report; pub mod shrink; pub mod thin; pub mod version; diff --git a/src/pdata/space_map.rs b/src/pdata/space_map.rs index 7f36aba..a2978ad 100644 --- a/src/pdata/space_map.rs +++ b/src/pdata/space_map.rs @@ -2,9 +2,10 @@ use anyhow::{anyhow, Result}; use fixedbitset::FixedBitSet; use nom::{multi::count, number::complete::*, IResult}; use std::sync::{Arc, Mutex}; +use byteorder::{LittleEndian, WriteBytesExt}; use crate::io_engine::*; -use crate::pdata::unpack::Unpack; +use crate::pdata::unpack::{Pack, Unpack}; //------------------------------------------ @@ -78,7 +79,7 @@ impl Unpack for IndexEntry { //------------------------------------------ -const MAX_METADATA_BITMAPS: usize = 255; +pub const MAX_METADATA_BITMAPS: usize = 255; pub struct MetadataIndex { pub indexes: Vec, @@ -129,6 +130,15 @@ impl Unpack for BitmapHeader { } } +impl Pack for BitmapHeader { + fn pack(&self, out: &mut W) -> Result<()> { + out.write_u32::(self.csum)?; + out.write_u32::(self.not_used)?; + out.write_u64::(self.blocknr)?; + Ok(()) + } +} + #[derive(Clone, Debug, PartialEq)] pub enum BitmapEntry { Small(u8), @@ -175,6 +185,40 @@ impl Unpack for Bitmap { } } +impl Pack for Bitmap { + fn pack(&self, out: &mut W) -> Result<()> { + use BitmapEntry::*; + BitmapHeader::pack(&self.header, out)?; + + for chunk in self.entries.chunks(32) { + let mut w = 0u64; + for e in chunk { + w >>= 2; + match e { + Small(0) => { + }, + Small(1) => { + w |= 0x2 << 62; + }, + Small(2) => { + w |= 0x1 << 62; + }, + Small(_) => { + return Err(anyhow!("Bad small value in bitmap entry")); + }, + Overflow => { + w |= 0x3 << 62; + } + } + } + + u64::pack(&w, out)?; + } + + Ok(()) + } +} + //------------------------------------------ pub trait SpaceMap { @@ -184,6 +228,8 @@ pub trait SpaceMap { fn inc(&mut self, begin: u64, len: u64) -> Result<()>; } +pub type ASpaceMap = Arc>; + //------------------------------------------ pub struct CoreSpaceMap { diff --git a/src/pdata/unpack.rs b/src/pdata/unpack.rs index c2a80b5..6596c2b 100644 --- a/src/pdata/unpack.rs +++ b/src/pdata/unpack.rs @@ -1,5 +1,6 @@ use anyhow::{anyhow, Result}; use nom::{number::complete::*, IResult}; +use byteorder::{LittleEndian, WriteBytesExt}; //------------------------------------------ @@ -20,6 +21,12 @@ pub fn unpack(data: &[u8]) -> Result { //------------------------------------------ +pub trait Pack { + fn pack(&self, data: &mut W) -> Result<()>; +} + +//------------------------------------------ + impl Unpack for u64 { fn disk_size() -> u32 { 8 @@ -30,6 +37,13 @@ impl Unpack for u64 { } } +impl Pack for u64 { + fn pack(&self, out: &mut W) -> Result<()> { + out.write_u64::(*self)?; + Ok(()) + } +} + impl Unpack for u32 { fn disk_size() -> u32 { 4 @@ -40,4 +54,11 @@ impl Unpack for u32 { } } +impl Pack for u32 { + fn pack(&self, out: &mut W) -> Result<()> { + out.write_u32::(*self)?; + Ok(()) + } +} + //------------------------------------------ diff --git a/src/report.rs b/src/report.rs new file mode 100644 index 0000000..dd87a48 --- /dev/null +++ b/src/report.rs @@ -0,0 +1,202 @@ +use indicatif::{ProgressBar, ProgressStyle}; +use std::sync::Mutex; + +//------------------------------------------ + +#[derive(Clone, PartialEq)] +pub enum ReportOutcome { + Success, + NonFatal, + Fatal, +} + +use ReportOutcome::*; + +impl ReportOutcome { + pub fn combine(lhs: &ReportOutcome, rhs: &ReportOutcome) -> ReportOutcome { + match (lhs, rhs) { + (Success, rhs) => rhs.clone(), + (lhs, Success) => lhs.clone(), + (Fatal, _) => Fatal, + (_, Fatal) => Fatal, + (_, _) => NonFatal, + } + } +} + +pub struct Report { + outcome: Mutex, + inner: Mutex>, +} + +trait ReportInner { + fn set_title(&mut self, txt: &str); + fn set_sub_title(&mut self, txt: &str); + fn progress(&mut self, percent: u8); + fn log(&mut self, txt: &str); + fn complete(&mut self); +} + +impl Report { + fn new(inner: Box) -> Report { + Report { + outcome: Mutex::new(Success), + inner: Mutex::new(inner), + } + } + + fn update_outcome(&self, rhs: ReportOutcome) { + let mut lhs = self.outcome.lock().unwrap(); + *lhs = ReportOutcome::combine(&lhs, &rhs); + } + + pub fn set_title(&self, txt: &str) { + let mut inner = self.inner.lock().unwrap(); + inner.set_title(txt) + } + + pub fn set_sub_title(&self, txt: &str) { + let mut inner = self.inner.lock().unwrap(); + inner.set_sub_title(txt) + } + + pub fn progress(&self, percent: u8) { + let mut inner = self.inner.lock().unwrap(); + inner.progress(percent) + } + + pub fn info(&self, txt: &str) { + let mut inner = self.inner.lock().unwrap(); + inner.log(txt) + } + + pub fn non_fatal(&self, txt: &str) { + self.update_outcome(NonFatal); + let mut inner = self.inner.lock().unwrap(); + inner.log(txt) + } + + pub fn fatal(&self, txt: &str) { + self.update_outcome(Fatal); + let mut inner = self.inner.lock().unwrap(); + inner.log(txt) + } + + pub fn complete(&mut self) { + let mut inner = self.inner.lock().unwrap(); + inner.complete(); + } + + pub fn get_outcome(&self) -> ReportOutcome { + let outcome = self.outcome.lock().unwrap(); + outcome.clone() + } +} + +//------------------------------------------ + +struct PBInner { + title: String, + bar: ProgressBar, +} + +impl ReportInner for PBInner { + fn set_title(&mut self, txt: &str) { + self.title = txt.to_string(); + } + + fn set_sub_title(&mut self, txt: &str) { + //let mut fmt = "".to_string(); //Checking thin metadata".to_string(); //self.title.clone(); + let mut fmt = "Checking thin metadata [{bar:40}] Remaining {eta}, ".to_string(); + fmt.push_str(&txt); + self.bar.set_style( + ProgressStyle::default_bar() + .template(&fmt) + .progress_chars("=> "), + ); + } + + fn progress(&mut self, percent: u8) { + self.bar.set_position(percent as u64); + self.bar.tick(); + } + + fn log(&mut self, txt: &str) { + self.bar.println(txt); + } + + fn complete(&mut self) { + self.bar.finish(); + } +} + +pub fn mk_progress_bar_report() -> Report { + Report::new(Box::new(PBInner { + title: "".to_string(), + bar: ProgressBar::new(100), + })) +} + +//------------------------------------------ + +struct SimpleInner { + last_progress: std::time::SystemTime, +} + +impl SimpleInner { + fn new() -> SimpleInner { + SimpleInner { + last_progress: std::time::SystemTime::now(), + } + } +} + +impl ReportInner for SimpleInner { + fn set_title(&mut self, txt: &str) { + println!("{}", txt); + } + + fn set_sub_title(&mut self, txt: &str) { + println!("{}", txt); + } + + fn progress(&mut self, percent: u8) { + let elapsed = self.last_progress.elapsed().unwrap(); + if elapsed > std::time::Duration::from_secs(5) { + println!("Progress: {}%", percent); + self.last_progress = std::time::SystemTime::now(); + } + } + + fn log(&mut self, txt: &str) { + eprintln!("{}", txt); + } + + fn complete(&mut self) {} +} + +pub fn mk_simple_report() -> Report { + Report::new(Box::new(SimpleInner::new())) +} + +//------------------------------------------ + +struct QuietInner {} + +impl ReportInner for QuietInner { + fn set_title(&mut self, _txt: &str) {} + + fn set_sub_title(&mut self, _txt: &str) {} + + fn progress(&mut self, _percent: u8) {} + + fn log(&mut self, _txt: &str) {} + + fn complete(&mut self) {} +} + +pub fn mk_quiet_report() -> Report { + Report::new(Box::new(QuietInner {})) +} + +//------------------------------------------ diff --git a/src/thin/check.rs b/src/thin/check.rs index 017c4e4..afe0fb3 100644 --- a/src/thin/check.rs +++ b/src/thin/check.rs @@ -1,11 +1,10 @@ use anyhow::{anyhow, Result}; -use indicatif::{ProgressBar, ProgressStyle}; use nom::{number::complete::*, IResult}; use std::collections::BTreeMap; +use std::io::Cursor; use std::path::Path; -use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::sync::{Arc, Mutex}; -use std::{thread, time}; +use std::thread::{self, JoinHandle}; use threadpool::ThreadPool; use crate::checksum; @@ -13,6 +12,7 @@ use crate::io_engine::{AsyncIoEngine, Block, IoEngine, SyncIoEngine}; use crate::pdata::btree::{btree_to_map, btree_to_map_with_sm, BTreeWalker, Node, NodeVisitor}; use crate::pdata::space_map::*; use crate::pdata::unpack::*; +use crate::report::*; use crate::thin::superblock::*; //------------------------------------------ @@ -44,7 +44,7 @@ impl Unpack for BlockTime { } struct BottomLevelVisitor { - data_sm: Arc>, + data_sm: ASpaceMap, } //------------------------------------------ @@ -156,148 +156,23 @@ impl<'a> NodeVisitor for OverflowChecker<'a> { //------------------------------------------ -struct ReportOptions {} - -#[derive(Clone)] -enum ReportOutcome { - Success, - NonFatal, - Fatal, +struct BitmapLeak { + blocknr: u64, // blocknr for the first entry in the bitmap + loc: u64, // location of the bitmap } -use ReportOutcome::*; - -impl ReportOutcome { - fn combine(lhs: &ReportOutcome, rhs: &ReportOutcome) -> ReportOutcome { - match (lhs, rhs) { - (Success, rhs) => rhs.clone(), - (lhs, Success) => lhs.clone(), - (Fatal, _) => Fatal, - (_, Fatal) => Fatal, - (_, _) => NonFatal, - } - } -} - -enum ReportCmd { - Log(String), - Complete, - Title(String), -} - -struct Report { - opts: ReportOptions, - outcome: ReportOutcome, - tx: Sender, - tid: thread::JoinHandle<()>, -} - -impl Report { - fn new( - opts: ReportOptions, - sm: Arc>, - total_allocated: u64, - ) -> Result { - let (tx, rx) = channel(); - let tid = thread::spawn(move || report_thread(sm, total_allocated, rx)); - Ok(Report { - opts, - outcome: ReportOutcome::Success, - tx, - tid, - }) - } - - fn info>(&mut self, txt: I) -> Result<()> { - self.tx.send(ReportCmd::Log(txt.into()))?; - Ok(()) - } - - fn add_outcome(&mut self, rhs: ReportOutcome) { - self.outcome = ReportOutcome::combine(&self.outcome, &rhs); - } - - fn non_fatal>(&mut self, txt: I) -> Result<()> { - self.add_outcome(NonFatal); - self.tx.send(ReportCmd::Log(txt.into()))?; - Ok(()) - } - - fn fatal>(&mut self, txt: I) -> Result<()> { - self.add_outcome(Fatal); - self.tx.send(ReportCmd::Log(txt.into()))?; - Ok(()) - } - - fn complete(self) -> Result<()> { - self.tx.send(ReportCmd::Complete)?; - self.tid.join(); - Ok(()) - } - - fn set_title(&mut self, txt: &str) -> Result<()> { - self.tx.send(ReportCmd::Title(txt.to_string()))?; - Ok(()) - } -} - -fn report_thread( - sm: Arc>, - total_allocated: u64, - rx: Receiver, -) { - let interval = time::Duration::from_millis(250); - let bar = ProgressBar::new(total_allocated); - loop { - loop { - match rx.try_recv() { - Ok(ReportCmd::Log(txt)) => { - bar.println(txt); - } - Ok(ReportCmd::Complete) => { - bar.finish(); - return; - } - Ok(ReportCmd::Title(txt)) => { - let mut fmt = "Checking thin metadata [{bar:40}] Remaining {eta}, ".to_string(); - fmt.push_str(&txt); - bar.set_style( - ProgressStyle::default_bar() - .template(&fmt) - .progress_chars("=> "), - ); - } - Err(TryRecvError::Disconnected) => { - return; - } - Err(TryRecvError::Empty) => { - break; - } - } - } - - let sm = sm.lock().unwrap(); - let nr_allocated = sm.get_nr_allocated().unwrap(); - drop(sm); - - bar.set_position(nr_allocated); - bar.tick(); - - thread::sleep(interval); - } -} - -//------------------------------------------ - +// This checks the space map and returns any leak blocks for auto-repair to process. fn check_space_map( + ctx: &Context, kind: &str, - engine: Arc, - bar: &mut Report, entries: Vec, - metadata_sm: Option>>, - sm: Arc>, + metadata_sm: Option, + sm: ASpaceMap, root: SMRoot, -) -> Result<()> { +) -> Result> { + let report = ctx.report.clone(); + let engine = ctx.engine.clone(); + let sm = sm.lock().unwrap(); // overflow btree @@ -321,19 +196,21 @@ fn check_space_map( engine.read_many(&mut blocks)?; let mut leaks = 0; - let mut fail = false; let mut blocknr = 0; + let mut bitmap_leaks = Vec::new(); for n in 0..entries.len() { let b = &blocks[n]; if checksum::metadata_block_type(&b.get_data()) != checksum::BT::BITMAP { - return Err(anyhow!( + report.fatal(&format!( "Index entry points to block ({}) that isn't a bitmap", b.loc )); } let bitmap = unpack::(b.get_data())?; - for e in bitmap.entries { + let first_blocknr = blocknr; + let mut contains_leak = false; + for e in bitmap.entries.iter() { if blocknr >= root.nr_blocks { break; } @@ -341,43 +218,84 @@ fn check_space_map( match e { BitmapEntry::Small(actual) => { let expected = sm.get(blocknr)?; - if actual == 1 && expected == 0 { + if *actual == 1 && expected == 0 { leaks += 1; - } else if actual != expected as u8 { - bar.fatal(format!("Bad reference count for {} block {}. Expected {}, but space map contains {}.", - kind, blocknr, expected, actual))?; - fail = true; + contains_leak = true; + } else if *actual != expected as u8 { + report.fatal(&format!("Bad reference count for {} block {}. Expected {}, but space map contains {}.", + kind, blocknr, expected, actual)); } } BitmapEntry::Overflow => { let expected = sm.get(blocknr)?; if expected < 3 { - bar.fatal(format!("Bad reference count for {} block {}. Expected {}, but space map says it's >= 3.", - kind, blocknr, expected))?; - fail = true; + report.fatal(&format!("Bad reference count for {} block {}. Expected {}, but space map says it's >= 3.", + kind, blocknr, expected)); } } } blocknr += 1; } + if contains_leak { + bitmap_leaks.push(BitmapLeak { + blocknr: first_blocknr, + loc: b.loc, + }); + } } if leaks > 0 { - bar.non_fatal(format!( - "{} {} blocks have leaked. Use --auto-repair to fix.", - leaks, kind - ))?; + report.non_fatal(&format!("{} {} blocks have leaked.", leaks, kind)); } - if fail { - return Err(anyhow!("Inconsistent data space map")); + Ok(bitmap_leaks) +} + +// This assumes the only errors in the space map are leaks. Entries should just be +// those that contain leaks. +fn repair_space_map(ctx: &Context, entries: Vec, sm: ASpaceMap) -> Result<()> { + let engine = ctx.engine.clone(); + + let sm = sm.lock().unwrap(); + + let mut blocks = Vec::new(); + for i in &entries { + blocks.push(Block::new(i.loc)); } + + // FIXME: we should do this in batches + engine.read_many(&mut blocks)?; + + for (be, b) in entries.iter().zip(blocks.iter()) { + let mut blocknr = be.blocknr; + let mut bitmap = unpack::(b.get_data())?; + for e in bitmap.entries.iter_mut() { + if blocknr >= sm.get_nr_blocks()? { + break; + } + + if let BitmapEntry::Small(actual) = e { + let expected = sm.get(blocknr)?; + if *actual == 1 && expected == 0 { + *e = BitmapEntry::Small(0); + } + } + + blocknr += 1; + } + + let mut out = Cursor::new(b.get_data()); + bitmap.pack(&mut out)?; + checksum::write_checksum(b.get_data(), checksum::BT::BITMAP)?; + } + + engine.write_many(&blocks)?; Ok(()) } //------------------------------------------ -fn inc_entries(sm: &Arc>, entries: &[IndexEntry]) -> Result<()> { +fn inc_entries(sm: &ASpaceMap, entries: &[IndexEntry]) -> Result<()> { let mut sm = sm.lock().unwrap(); for ie in entries { sm.inc(ie.blocknr, 1)?; @@ -385,6 +303,12 @@ fn inc_entries(sm: &Arc>, entries: &[IndexEntr Ok(()) } +fn inc_superblock(sm: &ASpaceMap) -> Result<()> { + let mut sm = sm.lock().unwrap(); + sm.inc(SUPERBLOCK_LOCATION, 1)?; + Ok(()) +} + //------------------------------------------ const MAX_CONCURRENT_IO: u32 = 1024; @@ -392,28 +316,128 @@ const MAX_CONCURRENT_IO: u32 = 1024; pub struct ThinCheckOptions<'a> { pub dev: &'a Path, pub async_io: bool, + pub ignore_non_fatal: bool, + pub auto_repair: bool, + pub report: Arc, } -pub fn check(opts: &ThinCheckOptions) -> Result<()> { - let engine: Arc; +fn spawn_progress_thread( + sm: Arc>, + nr_allocated_metadata: u64, + report: Arc, +) -> Result<(JoinHandle<()>, Arc>)> { + let tid; + let stop_progress = Arc::new(Mutex::new(false)); + { + let stop_progress = stop_progress.clone(); + tid = thread::spawn(move || { + let interval = std::time::Duration::from_millis(250); + loop { + { + let stop_progress = stop_progress.lock().unwrap(); + if *stop_progress { + break; + } + } + + let sm = sm.lock().unwrap(); + let mut n = sm.get_nr_allocated().unwrap(); + drop(sm); + + n *= 100; + n /= nr_allocated_metadata; + + let _r = report.progress(n as u8); + thread::sleep(interval); + } + }); + } + + Ok((tid, stop_progress)) +} + +struct Context { + report: Arc, + engine: Arc, + pool: ThreadPool, +} + +// Check the mappings filling in the data_sm as we go. +fn check_mapping_bottom_level( + ctx: &Context, + metadata_sm: &Arc>, + data_sm: &Arc>, + roots: &BTreeMap, +) -> Result<()> { + ctx.report.set_sub_title("mapping tree"); + + for (_thin_id, root) in roots { + let mut w = BTreeWalker::new_with_sm(ctx.engine.clone(), metadata_sm.clone(), false)?; + let data_sm = data_sm.clone(); + let root = *root; + ctx.pool.execute(move || { + let mut v = BottomLevelVisitor { data_sm }; + + // FIXME: return error + match w.walk(&mut v, root) { + Err(e) => { + eprintln!("walk failed {:?}", e); + std::process::abort(); + } + Ok(_result) => {} + } + }); + } + ctx.pool.join(); + + Ok(()) +} + +fn mk_context(opts: &ThinCheckOptions) -> Result { + let engine: Arc; let nr_threads; + if opts.async_io { nr_threads = std::cmp::min(4, num_cpus::get()); - engine = Arc::new(AsyncIoEngine::new(opts.dev, MAX_CONCURRENT_IO)?); + engine = Arc::new(AsyncIoEngine::new(opts.dev, MAX_CONCURRENT_IO, opts.auto_repair)?); } else { nr_threads = num_cpus::get() * 2; - engine = Arc::new(SyncIoEngine::new(opts.dev, nr_threads)?); + engine = Arc::new(SyncIoEngine::new(opts.dev, nr_threads, opts.auto_repair)?); } + let pool = ThreadPool::new(nr_threads); + + Ok(Context { + report: opts.report.clone(), + engine, + pool, + }) +} + +fn bail_out(ctx: &Context, task: &str) -> Result<()> { + use ReportOutcome::*; + + match ctx.report.get_outcome() { + Fatal => Err(anyhow!(format!( + "Check of {} failed, ending check early.", + task + ))), + _ => Ok(()), + } +} + +pub fn check(opts: ThinCheckOptions) -> Result<()> { + let ctx = mk_context(&opts)?; + + // FIXME: temporarily get these out + let report = &ctx.report; + let engine = &ctx.engine; + + report.set_title("Checking thin metadata"); // superblock let sb = read_superblock(engine.as_ref(), SUPERBLOCK_LOCATION)?; - - let nr_allocated_metadata; - { - let root = unpack::(&sb.metadata_sm_root[0..])?; - nr_allocated_metadata = root.nr_allocated; - } + let metadata_root = unpack::(&sb.metadata_sm_root[0..])?; // Device details. We read this once to get the number of thin devices, and hence the // maximum metadata ref count. Then create metadata space map, and reread to increment @@ -421,10 +445,9 @@ pub fn check(opts: &ThinCheckOptions) -> Result<()> { let devs = btree_to_map::(engine.clone(), false, sb.details_root)?; let nr_devs = devs.len(); let metadata_sm = core_sm(engine.get_nr_blocks(), nr_devs as u32); - let opts = ReportOptions {}; - let mut report = Report::new(opts, metadata_sm.clone(), nr_allocated_metadata)?; + inc_superblock(&metadata_sm)?; - report.set_title("device details tree")?; + report.set_sub_title("device details tree"); let _devs = btree_to_map_with_sm::( engine.clone(), metadata_sm.clone(), @@ -432,49 +455,24 @@ pub fn check(opts: &ThinCheckOptions) -> Result<()> { sb.details_root, )?; - // increment superblock - { - let mut sm = metadata_sm.lock().unwrap(); - sm.inc(SUPERBLOCK_LOCATION, 1)?; - } + let (tid, stop_progress) = spawn_progress_thread( + metadata_sm.clone(), + metadata_root.nr_allocated, + report.clone(), + )?; // mapping top level - let roots = btree_to_map_with_sm::(engine.clone(), metadata_sm.clone(), false, sb.mapping_root)?; + let roots = + btree_to_map_with_sm::(engine.clone(), metadata_sm.clone(), false, sb.mapping_root)?; - // Check the mappings filling in the data_sm as we go. - report.set_title("mapping tree")?; - let data_sm; - { - // FIXME: with a thread pool we need to return errors another way. - let nr_workers = nr_threads; - let pool = ThreadPool::new(nr_workers); + // mapping bottom level + report.set_sub_title("mapping tree"); + let root = unpack::(&sb.data_sm_root[0..])?; + let data_sm = core_sm(root.nr_blocks, nr_devs as u32); + check_mapping_bottom_level(&ctx, &metadata_sm, &data_sm, &roots)?; + bail_out(&ctx, "mapping tree")?; - let root = unpack::(&sb.data_sm_root[0..])?; - data_sm = core_sm(root.nr_blocks, nr_devs as u32); - - for (_thin_id, root) in roots { - let mut w = BTreeWalker::new_with_sm(engine.clone(), metadata_sm.clone(), false)?; - let data_sm = data_sm.clone(); - pool.execute(move || { - let mut v = BottomLevelVisitor { data_sm }; - - // FIXME: return error - match w.walk(&mut v, root) { - Err(e) => { - eprintln!("walk failed {:?}", e); - std::process::abort(); - } - Ok(_result) => { - //eprintln!("checked thin_dev {} -> {:?}", thin_id, result); - } - } - }); - } - - pool.join(); - } - - report.set_title("data space map")?; + report.set_sub_title("data space map"); let root = unpack::(&sb.data_sm_root[0..])?; let entries = btree_to_map_with_sm::( @@ -486,17 +484,17 @@ pub fn check(opts: &ThinCheckOptions) -> Result<()> { let entries: Vec = entries.values().cloned().collect(); inc_entries(&metadata_sm, &entries[0..])?; - check_space_map( + let data_leaks = check_space_map( + &ctx, "data", - engine.clone(), - &mut report, entries, Some(metadata_sm.clone()), data_sm.clone(), root, )?; + bail_out(&ctx, "data space map")?; - report.set_title("metadata space map")?; + report.set_sub_title("metadata space map"); let root = unpack::(&sb.metadata_sm_root[0..])?; let mut b = Block::new(root.bitmap_root); engine.read(&mut b)?; @@ -521,17 +519,29 @@ pub fn check(opts: &ThinCheckOptions) -> Result<()> { )?; // Now the counts should be correct and we can check it. - check_space_map( - "metadata", - engine.clone(), - &mut report, - entries, - None, - metadata_sm.clone(), - root, - )?; + let metadata_leaks = check_space_map(&ctx, "metadata", entries, None, metadata_sm.clone(), root)?; + + if opts.auto_repair { + if data_leaks.len() > 0 { + ctx.report.info("Repairing data leaks."); + repair_space_map(&ctx, data_leaks, data_sm.clone()); + } + + if metadata_leaks.len() > 0 { + ctx.report.info("Repairing metadata leaks."); + repair_space_map(&ctx, metadata_leaks, metadata_sm.clone()); + } + } + + // Completing consumes the report. + { + let mut stop_progress = stop_progress.lock().unwrap(); + *stop_progress = true; + } + + tid.join(); + bail_out(&ctx, "metadata space map")?; - report.complete()?; Ok(()) } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 940058b..b139090 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -295,7 +295,7 @@ pub fn generate_metadata_leaks(md: &PathBuf, nr_blocks: u64, expected: u32, actu pub fn get_needs_check(md: &PathBuf) -> Result { use thinp::thin::superblock::*; - let engine = SyncIoEngine::new(&md, 1)?; + let engine = SyncIoEngine::new(&md, 1, false)?; let sb = read_superblock(&engine, SUPERBLOCK_LOCATION)?; Ok(sb.flags.needs_check) }