From 44142f657abd2aeb6f26bfdaf9565bf5ff7b7bf9 Mon Sep 17 00:00:00 2001 From: Joe Thornber Date: Wed, 2 Sep 2020 12:57:47 +0100 Subject: [PATCH] [thin_check (rust)] Add error handling to io_engine interface --- Cargo.lock | 48 ++++----- Cargo.toml | 5 +- src/io_engine.rs | 240 +++++++++++++++++++++++++++-------------- src/pdata/btree.rs | 42 +++++--- src/thin/check.rs | 142 +++++++++++++----------- src/thin/superblock.rs | 3 +- 6 files changed, 295 insertions(+), 185 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 610c998..bd80e7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,9 +49,9 @@ dependencies = [ [[package]] name = "autocfg" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" +checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" [[package]] name = "base64" @@ -73,9 +73,9 @@ checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" [[package]] name = "cc" -version = "1.0.58" +version = "1.0.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a06fb2e53271d7c279ec1efea6ab691c35a2ae67ec0d91d7acec0caf13b518" +checksum = "66120af515773fb005778dc07c261bd201ec8ce50bd6e7144c927753fe013381" [[package]] name = "cfg-if" @@ -85,9 +85,9 @@ checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" [[package]] name = "clap" -version = "2.33.2" +version = "2.33.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10040cdf04294b565d9e0319955430099ec3813a64c952b86a41200ad714ae48" +checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" dependencies = [ "ansi_term", "atty", @@ -160,15 +160,15 @@ dependencies = [ [[package]] name = "fixedbitset" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fc4fcacf5cd3681968f6524ea159383132937739c6c40dabab9e37ed515911b" +checksum = "4e08c8bc7575d7e091fe0706963bd22e2a4be6a64da995f03b2a5a57d66ad015" [[package]] name = "flate2" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68c90b0fc46cf89d227cc78b40e494ff81287a92dd07631e5af0d06fe3cf885e" +checksum = "766d0e77a2c1502169d4a93ff3b8c15a71fd946cd0126309752104e5f3c46d94" dependencies = [ "cfg-if", "crc32fast", @@ -340,9 +340,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.74" +version = "0.2.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2f02823cf78b754822df5f7f268fb59822e7296276d3e069d8e8cb26a14bd10" +checksum = "755456fae044e6fa1ebbbd1b3e902ae19e73097ed4ed87bb79934a867c007bc3" [[package]] name = "log" @@ -361,9 +361,9 @@ checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" [[package]] name = "miniz_oxide" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be0f75932c1f6cfae3c04000e40114adf955636e19040f9c0a2c380702aa1c7f" +checksum = "4d7559a8a40d0f97e1edea3220f698f78b1c5ab67532e49f68fde3910323b722" dependencies = [ "adler", ] @@ -383,9 +383,7 @@ dependencies = [ [[package]] name = "nom" -version = "5.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb4262d26ed83a1c0a33a38fe2bb15797329c85770da05e6b828ddb782627af" +version = "6.0.0-alpha1" dependencies = [ "lexical-core", "memchr", @@ -394,9 +392,9 @@ dependencies = [ [[package]] name = "num-derive" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0396233fb2d5b0ae3f05ff6aba9a09185f7f6e70f87fb01147d545f85364665" +checksum = "6f09b9841adb6b5e1f89ef7087ea636e0fd94b2851f887c1e3eb5d5f8228fab3" dependencies = [ "proc-macro2", "quote", @@ -430,9 +428,9 @@ checksum = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a" [[package]] name = "once_cell" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" +checksum = "260e51e7efe62b592207e9e13a68e43692a7a279171d6ba57abd208bf23645ad" [[package]] name = "os_pipe" @@ -472,9 +470,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "ppv-lite86" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea" +checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20" [[package]] name = "proc-macro-hack" @@ -648,9 +646,9 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" [[package]] name = "syn" -version = "1.0.38" +version = "1.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e69abc24912995b3038597a7a593be5053eb0fb44f3cc5beec0deb421790c1f4" +checksum = "891d8d6567fe7c7f8835a3a98af4208f3846fba258c1bc3c31d6e506239f11f9" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index d58ec14..0ecfe46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ io-uring = "0.3" indicatif = "0.15" libc = "0.2.71" nix = "0.17" -nom = "5.1" +nom = { path = "/home/ejt/builds/nom/" } num_cpus = "1.13" num-derive = "0.3" num-traits = "0.2" @@ -34,3 +34,6 @@ thiserror = "1.0" json = "0.12" quickcheck = "0.9" quickcheck_macros = "0.9" + +[profile.release] +debug = true diff --git a/src/io_engine.rs b/src/io_engine.rs index 22477e9..aa61c43 100644 --- a/src/io_engine.rs +++ b/src/io_engine.rs @@ -1,10 +1,11 @@ -use anyhow::Result; use io_uring::opcode::{self, types}; use io_uring::IoUring; use std::alloc::{alloc, dealloc, Layout}; use std::fs::File; use std::fs::OpenOptions; +use std::io::Result; use std::io::{self, Read, Seek, Write}; +use std::ops::{Deref, DerefMut}; use std::os::unix::fs::OpenOptionsExt; use std::os::unix::io::{AsRawFd, RawFd}; use std::path::Path; @@ -15,10 +16,10 @@ use std::sync::{Arc, Condvar, Mutex}; pub const BLOCK_SIZE: usize = 4096; const ALIGN: usize = 4096; -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Block { pub loc: u64, - pub data: *mut u8, + data: *mut u8, } impl Block { @@ -50,13 +51,13 @@ unsafe impl Send for Block {} pub trait IoEngine { fn get_nr_blocks(&self) -> u64; - // FIXME: we need to indicate an error per block. Add error field - // to block? Data should not be accessible if a read failed. - // We should support retry for failed writes. - fn read(&self, block: &mut Block) -> Result<()>; - fn read_many(&self, blocks: &mut [Block]) -> Result<()>; + fn read(&self, b: u64) -> Result; + // The whole io could fail, or individual blocks + fn read_many(&self, blocks: &[u64]) -> Result>>; + fn write(&self, block: &Block) -> Result<()>; - fn write_many(&self, blocks: &[Block]) -> Result<()>; + // The whole io could fail, or individual blocks + fn write_many(&self, blocks: &[Block]) -> Result>>; } fn get_nr_blocks(path: &Path) -> io::Result { @@ -72,12 +73,49 @@ pub struct SyncIoEngine { cvar: Condvar, } +struct FileGuard<'a> { + engine: &'a SyncIoEngine, + file: Option, +} + +impl<'a> FileGuard<'a> { + fn new(engine: &'a SyncIoEngine, file: File) -> FileGuard<'a> { + FileGuard { + engine, + file: Some(file), + } + } +} + +impl<'a> Deref for FileGuard<'a> { + // FIXME: do we need this? it's not in DerefMut + type Target = File; + + fn deref(&self) -> &File { + &self.file.as_ref().expect("empty file guard") + } +} + +impl<'a> DerefMut for FileGuard<'a> { + fn deref_mut(&mut self) -> &mut File { + match &mut self.file { + None => { + todo!(); + } + Some(f) => f, + } + } +} + +impl<'a> Drop for FileGuard<'a> { + fn drop(&mut self) { + self.engine.put(self.file.take().expect("empty file guard")); + } +} + impl SyncIoEngine { fn open_file(path: &Path, writeable: bool) -> Result { - let file = OpenOptions::new() - .read(true) - .write(writeable) - .open(path)?; + let file = OpenOptions::new().read(true).write(writeable).open(path)?; Ok(file) } @@ -95,13 +133,14 @@ impl SyncIoEngine { }) } - fn get(&self) -> File { + fn get(&self) -> FileGuard { let mut files = self.files.lock().unwrap(); while files.len() == 0 { files = self.cvar.wait(files).unwrap(); } - files.pop().unwrap() + + FileGuard::new(self, files.pop().unwrap()) } fn put(&self, f: File) { @@ -116,44 +155,37 @@ impl IoEngine for SyncIoEngine { self.nr_blocks } - fn read(&self, b: &mut Block) -> Result<()> { + fn read(&self, loc: u64) -> Result { + let b = Block::new(loc); let mut input = self.get(); - input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; - input.read_exact(&mut b.get_data())?; - self.put(input); - Ok(()) + input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; + input.read_exact(b.get_data())?; + Ok(b) } - fn read_many(&self, blocks: &mut [Block]) -> Result<()> { - let mut input = self.get(); + // FIXME: *_many are getting and putting the input many times + fn read_many(&self, blocks: &[u64]) -> Result>> { + let mut bs = Vec::new(); for b in blocks { - input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; - input.read_exact(&mut b.get_data())?; + bs.push(self.read(*b)); } - self.put(input); - - Ok(()) + Ok(bs) } fn write(&self, b: &Block) -> Result<()> { let mut input = self.get(); input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; input.write_all(&b.get_data())?; - self.put(input); - Ok(()) } - fn write_many(&self, blocks: &[Block]) -> Result<()> { - let mut input = self.get(); + fn write_many(&self, blocks: &[Block]) -> Result>> { + let mut bs = Vec::new(); for b in blocks { - input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; - input.write_all(&b.get_data())?; + bs.push(self.write(b)); } - self.put(input); - - Ok(()) + Ok(bs) } } @@ -191,19 +223,21 @@ impl AsyncIoEngine { } // FIXME: refactor next two fns - fn read_many_(&self, blocks: &mut [Block]) -> Result<()> { + fn read_many_(&self, blocks: Vec) -> Result>> { + use std::io::*; + let mut inner = self.inner.lock().unwrap(); let count = blocks.len(); let fd = types::Target::Fd(inner.input.as_raw_fd()); - for b in blocks.iter_mut() { + for (i, b) in blocks.iter().enumerate() { let read_e = opcode::Read::new(fd, b.data, BLOCK_SIZE as u32) .offset(b.loc as i64 * BLOCK_SIZE as i64); unsafe { let mut queue = inner.ring.submission().available(); queue - .push(read_e.build().user_data(1)) + .push(read_e.build().user_data(i as u64)) .ok() .expect("queue is full"); } @@ -211,30 +245,52 @@ impl AsyncIoEngine { inner.ring.submit_and_wait(count)?; - let cqes = inner.ring.completion().available().collect::>(); + let mut cqes = inner.ring.completion().available().collect::>(); - // FIXME: return proper errors - assert_eq!(cqes.len(), count); - for c in &cqes { - assert_eq!(c.result(), BLOCK_SIZE as i32); + if cqes.len() != count { + return Err(Error::new( + ErrorKind::Other, + "insufficient io_uring completions", + )); } - Ok(()) + // reorder cqes + cqes.sort_by(|a, b| a.user_data().partial_cmp(&b.user_data()).unwrap()); + + let mut rs = Vec::new(); + let mut i = 0; + for b in blocks { + let c = &cqes[i]; + i += 1; + + let r = c.result(); + if r < 0 { + let error = Error::from_raw_os_error(-r); + rs.push(Err(error)); + } else if c.result() != BLOCK_SIZE as i32 { + rs.push(Err(Error::new(ErrorKind::UnexpectedEof, "short read"))); + } else { + rs.push(Ok(b)); + } + } + Ok(rs) } - fn write_many_(&self, blocks: &[Block]) -> Result<()> { + fn write_many_(&self, blocks: &[Block]) -> Result>> { + use std::io::*; + let mut inner = self.inner.lock().unwrap(); let count = blocks.len(); let fd = types::Target::Fd(inner.input.as_raw_fd()); - for b in blocks.iter() { + for (i, b) in blocks.iter().enumerate() { let write_e = opcode::Write::new(fd, b.data, BLOCK_SIZE as u32) .offset(b.loc as i64 * BLOCK_SIZE as i64); unsafe { let mut queue = inner.ring.submission().available(); queue - .push(write_e.build().user_data(1)) + .push(write_e.build().user_data(i as u64)) .ok() .expect("queue is full"); } @@ -242,15 +298,24 @@ impl AsyncIoEngine { inner.ring.submit_and_wait(count)?; - let cqes = inner.ring.completion().available().collect::>(); + let mut cqes = inner.ring.completion().available().collect::>(); - // FIXME: return proper errors - assert_eq!(cqes.len(), count); - for c in &cqes { - assert_eq!(c.result(), BLOCK_SIZE as i32); + // reorder cqes + cqes.sort_by(|a, b| a.user_data().partial_cmp(&b.user_data()).unwrap()); + + let mut rs = Vec::new(); + for c in cqes { + let r = c.result(); + if r < 0 { + let error = Error::from_raw_os_error(-r); + rs.push(Err(error)); + } else if r != BLOCK_SIZE as i32 { + rs.push(Err(Error::new(ErrorKind::UnexpectedEof, "short write"))); + } else { + rs.push(Ok(())); + } } - - Ok(()) + Ok(rs) } } @@ -276,16 +341,17 @@ impl IoEngine for AsyncIoEngine { inner.nr_blocks } - fn read(&self, b: &mut Block) -> Result<()> { + fn read(&self, b: u64) -> Result { let mut inner = self.inner.lock().unwrap(); let fd = types::Target::Fd(inner.input.as_raw_fd()); + let b = Block::new(b); let read_e = opcode::Read::new(fd, b.data, BLOCK_SIZE as u32) .offset(b.loc as i64 * BLOCK_SIZE as i64); unsafe { let mut queue = inner.ring.submission().available(); queue - .push(read_e.build().user_data(1)) + .push(read_e.build().user_data(0)) .ok() .expect("queue is full"); } @@ -294,26 +360,34 @@ impl IoEngine for AsyncIoEngine { let cqes = inner.ring.completion().available().collect::>(); - // FIXME: return proper errors - assert_eq!(cqes.len(), 1); - assert_eq!(cqes[0].user_data(), 1); - assert_eq!(cqes[0].result(), BLOCK_SIZE as i32); - - Ok(()) + let r = cqes[0].result(); + use std::io::*; + if r < 0 { + let error = Error::from_raw_os_error(-r); + Err(error) + } else if r != BLOCK_SIZE as i32 { + Err(Error::new(ErrorKind::UnexpectedEof, "short write")) + } else { + Ok(b) + } } - fn read_many(&self, blocks: &mut [Block]) -> Result<()> { + fn read_many(&self, blocks: &[u64]) -> Result>> { let inner = self.inner.lock().unwrap(); let queue_len = inner.queue_len as usize; drop(inner); - let mut done = 0; - while done != blocks.len() { - let len = usize::min(blocks.len() - done, queue_len); - self.read_many_(&mut blocks[done..(done + len)])?; - done += len; + let mut results = Vec::new(); + for cs in blocks.chunks(queue_len) { + let mut bs = Vec::new(); + for b in cs { + bs.push(Block::new(*b)); + } + + results.append(&mut self.read_many_(bs)?); } - Ok(()) + + Ok(results) } fn write(&self, b: &Block) -> Result<()> { @@ -325,7 +399,7 @@ impl IoEngine for AsyncIoEngine { unsafe { let mut queue = inner.ring.submission().available(); queue - .push(write_e.build().user_data(1)) + .push(write_e.build().user_data(0)) .ok() .expect("queue is full"); } @@ -334,26 +408,32 @@ impl IoEngine for AsyncIoEngine { let cqes = inner.ring.completion().available().collect::>(); - // FIXME: return proper errors - assert_eq!(cqes.len(), 1); - assert_eq!(cqes[0].user_data(), 1); - assert_eq!(cqes[0].result(), BLOCK_SIZE as i32); - - Ok(()) + let r = cqes[0].result(); + use std::io::*; + if r < 0 { + let error = Error::from_raw_os_error(-r); + Err(error) + } else if r != BLOCK_SIZE as i32 { + Err(Error::new(ErrorKind::UnexpectedEof, "short write")) + } else { + Ok(()) + } } - fn write_many(&self, blocks: &[Block]) -> Result<()> { + fn write_many(&self, blocks: &[Block]) -> Result>> { let inner = self.inner.lock().unwrap(); let queue_len = inner.queue_len as usize; drop(inner); + let mut results = Vec::new(); let mut done = 0; while done != blocks.len() { let len = usize::min(blocks.len() - done, queue_len); - self.write_many_(&blocks[done..(done + len)])?; + results.append(&mut self.write_many_(&blocks[done..(done + len)])?); done += len; } - Ok(()) + + Ok(results) } } diff --git a/src/pdata/btree.rs b/src/pdata/btree.rs index ab49954..fca0ae3 100644 --- a/src/pdata/btree.rs +++ b/src/pdata/btree.rs @@ -203,14 +203,21 @@ impl BTreeWalker { let mut blocks = Vec::with_capacity(bs.len()); for b in bs { if self.sm_inc(*b)? == 0 { - blocks.push(Block::new(*b)); + blocks.push(*b); } } - self.engine.read_many(&mut blocks)?; + let blocks = self.engine.read_many(&blocks[0..])?; for b in blocks { - self.walk_node(visitor, &b, false)?; + match b { + Err(_e) => { + todo!(); + }, + Ok(b) => { + self.walk_node(visitor, &b, false)?; + }, + } } Ok(()) @@ -258,8 +265,7 @@ impl BTreeWalker { if self.sm_inc(root)? > 0 { Ok(()) } else { - let mut root = Block::new(root); - self.engine.read(&mut root)?; + let root = self.engine.read(root)?; self.walk_node(visitor, &root, true) } } @@ -320,19 +326,26 @@ where let mut blocks = Vec::new(); for b in bs { if w.sm_inc(*b)? == 0 { - blocks.push(Block::new(*b)); + blocks.push(*b); } } - w.engine.read_many(&mut blocks)?; + let blocks = w.engine.read_many(&blocks[0..])?; for b in blocks { - let w = w.clone(); - let visitor = visitor.clone(); - pool.execute(move || { - // FIXME: return result - w.walk_node(visitor.as_ref(), &b, false); - }); + match b { + Err(_e) => { + todo!(); + }, + Ok(b) => { + let w = w.clone(); + let visitor = visitor.clone(); + pool.execute(move || { + // FIXME: return result + w.walk_node(visitor.as_ref(), &b, false); + }); + } + } } pool.join(); @@ -352,8 +365,7 @@ where if w.sm_inc(root)? > 0 { Ok(()) } else { - let mut root = Block::new(root); - w.engine.read(&mut root)?; + let root = w.engine.read(root)?; walk_node_threaded(w, pool, visitor, &root, true) } } diff --git a/src/thin/check.rs b/src/thin/check.rs index f1e0dc6..c36c7f8 100644 --- a/src/thin/check.rs +++ b/src/thin/check.rs @@ -8,7 +8,7 @@ use std::thread::{self, JoinHandle}; use threadpool::ThreadPool; use crate::checksum; -use crate::io_engine::{AsyncIoEngine, Block, IoEngine, SyncIoEngine}; +use crate::io_engine::{AsyncIoEngine, IoEngine, SyncIoEngine}; use crate::pdata::btree::*; use crate::pdata::space_map::*; use crate::pdata::unpack::*; @@ -174,58 +174,65 @@ fn check_space_map( let mut blocks = Vec::with_capacity(entries.len()); for i in &entries { - blocks.push(Block::new(i.blocknr)); + blocks.push(i.blocknr); } // FIXME: we should do this in batches - engine.read_many(&mut blocks)?; + let blocks = engine.read_many(&mut blocks)?; let mut leaks = 0; let mut blocknr = 0; let mut bitmap_leaks = Vec::new(); for n in 0..entries.len() { let b = &blocks[n]; - if checksum::metadata_block_type(&b.get_data()) != checksum::BT::BITMAP { - report.fatal(&format!( - "Index entry points to block ({}) that isn't a bitmap", - b.loc - )); - } - - let bitmap = unpack::(b.get_data())?; - let first_blocknr = blocknr; - let mut contains_leak = false; - for e in bitmap.entries.iter() { - if blocknr >= root.nr_blocks { - break; - } - - match e { - BitmapEntry::Small(actual) => { - let expected = sm.get(blocknr)?; - if *actual == 1 && expected == 0 { - leaks += 1; - contains_leak = true; - } else if *actual != expected as u8 { - report.fatal(&format!("Bad reference count for {} block {}. Expected {}, but space map contains {}.", - kind, blocknr, expected, actual)); - } + match b { + Err(_e) => { + todo!(); + }, + Ok(b) => { + if checksum::metadata_block_type(&b.get_data()) != checksum::BT::BITMAP { + report.fatal(&format!( + "Index entry points to block ({}) that isn't a bitmap", + b.loc + )); } - BitmapEntry::Overflow => { - let expected = sm.get(blocknr)?; - if expected < 3 { - report.fatal(&format!("Bad reference count for {} block {}. Expected {}, but space map says it's >= 3.", - kind, blocknr, expected)); + + let bitmap = unpack::(b.get_data())?; + let first_blocknr = blocknr; + let mut contains_leak = false; + for e in bitmap.entries.iter() { + if blocknr >= root.nr_blocks { + break; } + + match e { + BitmapEntry::Small(actual) => { + let expected = sm.get(blocknr)?; + if *actual == 1 && expected == 0 { + leaks += 1; + contains_leak = true; + } else if *actual != expected as u8 { + report.fatal(&format!("Bad reference count for {} block {}. Expected {}, but space map contains {}.", + kind, blocknr, expected, actual)); + } + } + BitmapEntry::Overflow => { + let expected = sm.get(blocknr)?; + if expected < 3 { + report.fatal(&format!("Bad reference count for {} block {}. Expected {}, but space map says it's >= 3.", + kind, blocknr, expected)); + } + } + } + blocknr += 1; + } + if contains_leak { + bitmap_leaks.push(BitmapLeak { + blocknr: first_blocknr, + loc: b.loc, + }); } } - blocknr += 1; - } - if contains_leak { - bitmap_leaks.push(BitmapLeak { - blocknr: first_blocknr, - loc: b.loc, - }); } } @@ -245,36 +252,48 @@ fn repair_space_map(ctx: &Context, entries: Vec, sm: ASpaceMap) -> R let mut blocks = Vec::with_capacity(entries.len()); for i in &entries { - blocks.push(Block::new(i.loc)); + blocks.push(i.loc); } // FIXME: we should do this in batches - engine.read_many(&mut blocks)?; + let rblocks = engine.read_many(&blocks[0..])?; + let mut write_blocks = Vec::new(); - for (be, b) in entries.iter().zip(blocks.iter()) { - let mut blocknr = be.blocknr; - let mut bitmap = unpack::(b.get_data())?; - for e in bitmap.entries.iter_mut() { - if blocknr >= sm.get_nr_blocks()? { - break; - } - - if let BitmapEntry::Small(actual) = e { - let expected = sm.get(blocknr)?; - if *actual == 1 && expected == 0 { - *e = BitmapEntry::Small(0); + let mut i = 0; + for rb in rblocks { + if rb.is_err() { + todo!(); + } else { + let b = rb.unwrap(); + let be = &entries[i]; + let mut blocknr = be.blocknr; + let mut bitmap = unpack::(b.get_data())?; + for e in bitmap.entries.iter_mut() { + if blocknr >= sm.get_nr_blocks()? { + break; } + + if let BitmapEntry::Small(actual) = e { + let expected = sm.get(blocknr)?; + if *actual == 1 && expected == 0 { + *e = BitmapEntry::Small(0); + } + } + + blocknr += 1; } - blocknr += 1; + let mut out = Cursor::new(b.get_data()); + bitmap.pack(&mut out)?; + checksum::write_checksum(b.get_data(), checksum::BT::BITMAP)?; + + write_blocks.push(b); } - let mut out = Cursor::new(b.get_data()); - bitmap.pack(&mut out)?; - checksum::write_checksum(b.get_data(), checksum::BT::BITMAP)?; + i += 1; } - engine.write_many(&blocks)?; + engine.write_many(&write_blocks[0..])?; Ok(()) } @@ -401,7 +420,7 @@ fn mk_context(opts: &ThinCheckOptions) -> Result { opts.auto_repair, )?); } else { - nr_threads = num_cpus::get() * 2; + nr_threads = std::cmp::max(8, num_cpus::get() * 2); engine = Arc::new(SyncIoEngine::new(opts.dev, nr_threads, opts.auto_repair)?); } let pool = ThreadPool::new(nr_threads); @@ -495,8 +514,7 @@ pub fn check(opts: ThinCheckOptions) -> Result<()> { report.set_sub_title("metadata space map"); let root = unpack::(&sb.metadata_sm_root[0..])?; - let mut b = Block::new(root.bitmap_root); - engine.read(&mut b)?; + let b = engine.read(root.bitmap_root)?; metadata_sm.lock().unwrap().inc(root.bitmap_root, 1)?; let entries = unpack::(b.get_data())?.indexes; diff --git a/src/thin/superblock.rs b/src/thin/superblock.rs index df69ff2..ab9eb3f 100644 --- a/src/thin/superblock.rs +++ b/src/thin/superblock.rs @@ -93,8 +93,7 @@ fn unpack(data: &[u8]) -> IResult<&[u8], Superblock> { } pub fn read_superblock(engine: &dyn IoEngine, loc: u64) -> Result { - let mut b = Block::new(loc); - engine.read(&mut b)?; + let b = engine.read(loc)?; if let Ok((_, sb)) = unpack(&b.get_data()) { Ok(sb)