[thin_check (rust)] Add error handling to io_engine interface

This commit is contained in:
Joe Thornber 2020-09-02 12:57:47 +01:00
parent b82307d8a5
commit 44142f657a
6 changed files with 295 additions and 185 deletions

48
Cargo.lock generated
View File

@ -49,9 +49,9 @@ dependencies = [
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.0.0" version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]] [[package]]
name = "base64" name = "base64"
@ -73,9 +73,9 @@ checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.58" version = "1.0.59"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9a06fb2e53271d7c279ec1efea6ab691c35a2ae67ec0d91d7acec0caf13b518" checksum = "66120af515773fb005778dc07c261bd201ec8ce50bd6e7144c927753fe013381"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
@ -85,9 +85,9 @@ checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]] [[package]]
name = "clap" name = "clap"
version = "2.33.2" version = "2.33.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10040cdf04294b565d9e0319955430099ec3813a64c952b86a41200ad714ae48" checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002"
dependencies = [ dependencies = [
"ansi_term", "ansi_term",
"atty", "atty",
@ -160,15 +160,15 @@ dependencies = [
[[package]] [[package]]
name = "fixedbitset" name = "fixedbitset"
version = "0.3.0" version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fc4fcacf5cd3681968f6524ea159383132937739c6c40dabab9e37ed515911b" checksum = "4e08c8bc7575d7e091fe0706963bd22e2a4be6a64da995f03b2a5a57d66ad015"
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.0.16" version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68c90b0fc46cf89d227cc78b40e494ff81287a92dd07631e5af0d06fe3cf885e" checksum = "766d0e77a2c1502169d4a93ff3b8c15a71fd946cd0126309752104e5f3c46d94"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"crc32fast", "crc32fast",
@ -340,9 +340,9 @@ dependencies = [
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.74" version = "0.2.76"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2f02823cf78b754822df5f7f268fb59822e7296276d3e069d8e8cb26a14bd10" checksum = "755456fae044e6fa1ebbbd1b3e902ae19e73097ed4ed87bb79934a867c007bc3"
[[package]] [[package]]
name = "log" name = "log"
@ -361,9 +361,9 @@ checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
[[package]] [[package]]
name = "miniz_oxide" name = "miniz_oxide"
version = "0.4.0" version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be0f75932c1f6cfae3c04000e40114adf955636e19040f9c0a2c380702aa1c7f" checksum = "4d7559a8a40d0f97e1edea3220f698f78b1c5ab67532e49f68fde3910323b722"
dependencies = [ dependencies = [
"adler", "adler",
] ]
@ -383,9 +383,7 @@ dependencies = [
[[package]] [[package]]
name = "nom" name = "nom"
version = "5.1.2" version = "6.0.0-alpha1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffb4262d26ed83a1c0a33a38fe2bb15797329c85770da05e6b828ddb782627af"
dependencies = [ dependencies = [
"lexical-core", "lexical-core",
"memchr", "memchr",
@ -394,9 +392,9 @@ dependencies = [
[[package]] [[package]]
name = "num-derive" name = "num-derive"
version = "0.3.1" version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0396233fb2d5b0ae3f05ff6aba9a09185f7f6e70f87fb01147d545f85364665" checksum = "6f09b9841adb6b5e1f89ef7087ea636e0fd94b2851f887c1e3eb5d5f8228fab3"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -430,9 +428,9 @@ checksum = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a"
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.4.0" version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" checksum = "260e51e7efe62b592207e9e13a68e43692a7a279171d6ba57abd208bf23645ad"
[[package]] [[package]]
name = "os_pipe" name = "os_pipe"
@ -472,9 +470,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.8" version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea" checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20"
[[package]] [[package]]
name = "proc-macro-hack" name = "proc-macro-hack"
@ -648,9 +646,9 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.38" version = "1.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e69abc24912995b3038597a7a593be5053eb0fb44f3cc5beec0deb421790c1f4" checksum = "891d8d6567fe7c7f8835a3a98af4208f3846fba258c1bc3c31d6e506239f11f9"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@ -20,7 +20,7 @@ io-uring = "0.3"
indicatif = "0.15" indicatif = "0.15"
libc = "0.2.71" libc = "0.2.71"
nix = "0.17" nix = "0.17"
nom = "5.1" nom = { path = "/home/ejt/builds/nom/" }
num_cpus = "1.13" num_cpus = "1.13"
num-derive = "0.3" num-derive = "0.3"
num-traits = "0.2" num-traits = "0.2"
@ -34,3 +34,6 @@ thiserror = "1.0"
json = "0.12" json = "0.12"
quickcheck = "0.9" quickcheck = "0.9"
quickcheck_macros = "0.9" quickcheck_macros = "0.9"
[profile.release]
debug = true

View File

@ -1,10 +1,11 @@
use anyhow::Result;
use io_uring::opcode::{self, types}; use io_uring::opcode::{self, types};
use io_uring::IoUring; use io_uring::IoUring;
use std::alloc::{alloc, dealloc, Layout}; use std::alloc::{alloc, dealloc, Layout};
use std::fs::File; use std::fs::File;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::Result;
use std::io::{self, Read, Seek, Write}; use std::io::{self, Read, Seek, Write};
use std::ops::{Deref, DerefMut};
use std::os::unix::fs::OpenOptionsExt; use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::io::{AsRawFd, RawFd};
use std::path::Path; use std::path::Path;
@ -15,10 +16,10 @@ use std::sync::{Arc, Condvar, Mutex};
pub const BLOCK_SIZE: usize = 4096; pub const BLOCK_SIZE: usize = 4096;
const ALIGN: usize = 4096; const ALIGN: usize = 4096;
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct Block { pub struct Block {
pub loc: u64, pub loc: u64,
pub data: *mut u8, data: *mut u8,
} }
impl Block { impl Block {
@ -50,13 +51,13 @@ unsafe impl Send for Block {}
pub trait IoEngine { pub trait IoEngine {
fn get_nr_blocks(&self) -> u64; fn get_nr_blocks(&self) -> u64;
// FIXME: we need to indicate an error per block. Add error field fn read(&self, b: u64) -> Result<Block>;
// to block? Data should not be accessible if a read failed. // The whole io could fail, or individual blocks
// We should support retry for failed writes. fn read_many(&self, blocks: &[u64]) -> Result<Vec<Result<Block>>>;
fn read(&self, block: &mut Block) -> Result<()>;
fn read_many(&self, blocks: &mut [Block]) -> Result<()>;
fn write(&self, block: &Block) -> Result<()>; fn write(&self, block: &Block) -> Result<()>;
fn write_many(&self, blocks: &[Block]) -> Result<()>; // The whole io could fail, or individual blocks
fn write_many(&self, blocks: &[Block]) -> Result<Vec<Result<()>>>;
} }
fn get_nr_blocks(path: &Path) -> io::Result<u64> { fn get_nr_blocks(path: &Path) -> io::Result<u64> {
@ -72,12 +73,49 @@ pub struct SyncIoEngine {
cvar: Condvar, cvar: Condvar,
} }
struct FileGuard<'a> {
engine: &'a SyncIoEngine,
file: Option<File>,
}
impl<'a> FileGuard<'a> {
fn new(engine: &'a SyncIoEngine, file: File) -> FileGuard<'a> {
FileGuard {
engine,
file: Some(file),
}
}
}
impl<'a> Deref for FileGuard<'a> {
// FIXME: do we need this? it's not in DerefMut
type Target = File;
fn deref(&self) -> &File {
&self.file.as_ref().expect("empty file guard")
}
}
impl<'a> DerefMut for FileGuard<'a> {
fn deref_mut(&mut self) -> &mut File {
match &mut self.file {
None => {
todo!();
}
Some(f) => f,
}
}
}
impl<'a> Drop for FileGuard<'a> {
fn drop(&mut self) {
self.engine.put(self.file.take().expect("empty file guard"));
}
}
impl SyncIoEngine { impl SyncIoEngine {
fn open_file(path: &Path, writeable: bool) -> Result<File> { fn open_file(path: &Path, writeable: bool) -> Result<File> {
let file = OpenOptions::new() let file = OpenOptions::new().read(true).write(writeable).open(path)?;
.read(true)
.write(writeable)
.open(path)?;
Ok(file) Ok(file)
} }
@ -95,13 +133,14 @@ impl SyncIoEngine {
}) })
} }
fn get(&self) -> File { fn get(&self) -> FileGuard {
let mut files = self.files.lock().unwrap(); let mut files = self.files.lock().unwrap();
while files.len() == 0 { while files.len() == 0 {
files = self.cvar.wait(files).unwrap(); files = self.cvar.wait(files).unwrap();
} }
files.pop().unwrap()
FileGuard::new(self, files.pop().unwrap())
} }
fn put(&self, f: File) { fn put(&self, f: File) {
@ -116,44 +155,37 @@ impl IoEngine for SyncIoEngine {
self.nr_blocks self.nr_blocks
} }
fn read(&self, b: &mut Block) -> Result<()> { fn read(&self, loc: u64) -> Result<Block> {
let b = Block::new(loc);
let mut input = self.get(); let mut input = self.get();
input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?;
input.read_exact(&mut b.get_data())?;
self.put(input);
Ok(()) input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?;
input.read_exact(b.get_data())?;
Ok(b)
} }
fn read_many(&self, blocks: &mut [Block]) -> Result<()> { // FIXME: *_many are getting and putting the input many times
let mut input = self.get(); fn read_many(&self, blocks: &[u64]) -> Result<Vec<Result<Block>>> {
let mut bs = Vec::new();
for b in blocks { for b in blocks {
input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; bs.push(self.read(*b));
input.read_exact(&mut b.get_data())?;
} }
self.put(input); Ok(bs)
Ok(())
} }
fn write(&self, b: &Block) -> Result<()> { fn write(&self, b: &Block) -> Result<()> {
let mut input = self.get(); let mut input = self.get();
input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?;
input.write_all(&b.get_data())?; input.write_all(&b.get_data())?;
self.put(input);
Ok(()) Ok(())
} }
fn write_many(&self, blocks: &[Block]) -> Result<()> { fn write_many(&self, blocks: &[Block]) -> Result<Vec<Result<()>>> {
let mut input = self.get(); let mut bs = Vec::new();
for b in blocks { for b in blocks {
input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; bs.push(self.write(b));
input.write_all(&b.get_data())?;
} }
self.put(input); Ok(bs)
Ok(())
} }
} }
@ -191,19 +223,21 @@ impl AsyncIoEngine {
} }
// FIXME: refactor next two fns // FIXME: refactor next two fns
fn read_many_(&self, blocks: &mut [Block]) -> Result<()> { fn read_many_(&self, blocks: Vec<Block>) -> Result<Vec<Result<Block>>> {
use std::io::*;
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
let count = blocks.len(); let count = blocks.len();
let fd = types::Target::Fd(inner.input.as_raw_fd()); let fd = types::Target::Fd(inner.input.as_raw_fd());
for b in blocks.iter_mut() { for (i, b) in blocks.iter().enumerate() {
let read_e = opcode::Read::new(fd, b.data, BLOCK_SIZE as u32) let read_e = opcode::Read::new(fd, b.data, BLOCK_SIZE as u32)
.offset(b.loc as i64 * BLOCK_SIZE as i64); .offset(b.loc as i64 * BLOCK_SIZE as i64);
unsafe { unsafe {
let mut queue = inner.ring.submission().available(); let mut queue = inner.ring.submission().available();
queue queue
.push(read_e.build().user_data(1)) .push(read_e.build().user_data(i as u64))
.ok() .ok()
.expect("queue is full"); .expect("queue is full");
} }
@ -211,30 +245,52 @@ impl AsyncIoEngine {
inner.ring.submit_and_wait(count)?; inner.ring.submit_and_wait(count)?;
let cqes = inner.ring.completion().available().collect::<Vec<_>>(); let mut cqes = inner.ring.completion().available().collect::<Vec<_>>();
// FIXME: return proper errors if cqes.len() != count {
assert_eq!(cqes.len(), count); return Err(Error::new(
for c in &cqes { ErrorKind::Other,
assert_eq!(c.result(), BLOCK_SIZE as i32); "insufficient io_uring completions",
));
} }
Ok(()) // reorder cqes
cqes.sort_by(|a, b| a.user_data().partial_cmp(&b.user_data()).unwrap());
let mut rs = Vec::new();
let mut i = 0;
for b in blocks {
let c = &cqes[i];
i += 1;
let r = c.result();
if r < 0 {
let error = Error::from_raw_os_error(-r);
rs.push(Err(error));
} else if c.result() != BLOCK_SIZE as i32 {
rs.push(Err(Error::new(ErrorKind::UnexpectedEof, "short read")));
} else {
rs.push(Ok(b));
}
}
Ok(rs)
} }
fn write_many_(&self, blocks: &[Block]) -> Result<()> { fn write_many_(&self, blocks: &[Block]) -> Result<Vec<Result<()>>> {
use std::io::*;
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
let count = blocks.len(); let count = blocks.len();
let fd = types::Target::Fd(inner.input.as_raw_fd()); let fd = types::Target::Fd(inner.input.as_raw_fd());
for b in blocks.iter() { for (i, b) in blocks.iter().enumerate() {
let write_e = opcode::Write::new(fd, b.data, BLOCK_SIZE as u32) let write_e = opcode::Write::new(fd, b.data, BLOCK_SIZE as u32)
.offset(b.loc as i64 * BLOCK_SIZE as i64); .offset(b.loc as i64 * BLOCK_SIZE as i64);
unsafe { unsafe {
let mut queue = inner.ring.submission().available(); let mut queue = inner.ring.submission().available();
queue queue
.push(write_e.build().user_data(1)) .push(write_e.build().user_data(i as u64))
.ok() .ok()
.expect("queue is full"); .expect("queue is full");
} }
@ -242,15 +298,24 @@ impl AsyncIoEngine {
inner.ring.submit_and_wait(count)?; inner.ring.submit_and_wait(count)?;
let cqes = inner.ring.completion().available().collect::<Vec<_>>(); let mut cqes = inner.ring.completion().available().collect::<Vec<_>>();
// FIXME: return proper errors // reorder cqes
assert_eq!(cqes.len(), count); cqes.sort_by(|a, b| a.user_data().partial_cmp(&b.user_data()).unwrap());
for c in &cqes {
assert_eq!(c.result(), BLOCK_SIZE as i32); let mut rs = Vec::new();
for c in cqes {
let r = c.result();
if r < 0 {
let error = Error::from_raw_os_error(-r);
rs.push(Err(error));
} else if r != BLOCK_SIZE as i32 {
rs.push(Err(Error::new(ErrorKind::UnexpectedEof, "short write")));
} else {
rs.push(Ok(()));
}
} }
Ok(rs)
Ok(())
} }
} }
@ -276,16 +341,17 @@ impl IoEngine for AsyncIoEngine {
inner.nr_blocks inner.nr_blocks
} }
fn read(&self, b: &mut Block) -> Result<()> { fn read(&self, b: u64) -> Result<Block> {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
let fd = types::Target::Fd(inner.input.as_raw_fd()); let fd = types::Target::Fd(inner.input.as_raw_fd());
let b = Block::new(b);
let read_e = opcode::Read::new(fd, b.data, BLOCK_SIZE as u32) let read_e = opcode::Read::new(fd, b.data, BLOCK_SIZE as u32)
.offset(b.loc as i64 * BLOCK_SIZE as i64); .offset(b.loc as i64 * BLOCK_SIZE as i64);
unsafe { unsafe {
let mut queue = inner.ring.submission().available(); let mut queue = inner.ring.submission().available();
queue queue
.push(read_e.build().user_data(1)) .push(read_e.build().user_data(0))
.ok() .ok()
.expect("queue is full"); .expect("queue is full");
} }
@ -294,26 +360,34 @@ impl IoEngine for AsyncIoEngine {
let cqes = inner.ring.completion().available().collect::<Vec<_>>(); let cqes = inner.ring.completion().available().collect::<Vec<_>>();
// FIXME: return proper errors let r = cqes[0].result();
assert_eq!(cqes.len(), 1); use std::io::*;
assert_eq!(cqes[0].user_data(), 1); if r < 0 {
assert_eq!(cqes[0].result(), BLOCK_SIZE as i32); let error = Error::from_raw_os_error(-r);
Err(error)
Ok(()) } else if r != BLOCK_SIZE as i32 {
Err(Error::new(ErrorKind::UnexpectedEof, "short write"))
} else {
Ok(b)
}
} }
fn read_many(&self, blocks: &mut [Block]) -> Result<()> { fn read_many(&self, blocks: &[u64]) -> Result<Vec<Result<Block>>> {
let inner = self.inner.lock().unwrap(); let inner = self.inner.lock().unwrap();
let queue_len = inner.queue_len as usize; let queue_len = inner.queue_len as usize;
drop(inner); drop(inner);
let mut done = 0; let mut results = Vec::new();
while done != blocks.len() { for cs in blocks.chunks(queue_len) {
let len = usize::min(blocks.len() - done, queue_len); let mut bs = Vec::new();
self.read_many_(&mut blocks[done..(done + len)])?; for b in cs {
done += len; bs.push(Block::new(*b));
}
results.append(&mut self.read_many_(bs)?);
} }
Ok(())
Ok(results)
} }
fn write(&self, b: &Block) -> Result<()> { fn write(&self, b: &Block) -> Result<()> {
@ -325,7 +399,7 @@ impl IoEngine for AsyncIoEngine {
unsafe { unsafe {
let mut queue = inner.ring.submission().available(); let mut queue = inner.ring.submission().available();
queue queue
.push(write_e.build().user_data(1)) .push(write_e.build().user_data(0))
.ok() .ok()
.expect("queue is full"); .expect("queue is full");
} }
@ -334,26 +408,32 @@ impl IoEngine for AsyncIoEngine {
let cqes = inner.ring.completion().available().collect::<Vec<_>>(); let cqes = inner.ring.completion().available().collect::<Vec<_>>();
// FIXME: return proper errors let r = cqes[0].result();
assert_eq!(cqes.len(), 1); use std::io::*;
assert_eq!(cqes[0].user_data(), 1); if r < 0 {
assert_eq!(cqes[0].result(), BLOCK_SIZE as i32); let error = Error::from_raw_os_error(-r);
Err(error)
Ok(()) } else if r != BLOCK_SIZE as i32 {
Err(Error::new(ErrorKind::UnexpectedEof, "short write"))
} else {
Ok(())
}
} }
fn write_many(&self, blocks: &[Block]) -> Result<()> { fn write_many(&self, blocks: &[Block]) -> Result<Vec<Result<()>>> {
let inner = self.inner.lock().unwrap(); let inner = self.inner.lock().unwrap();
let queue_len = inner.queue_len as usize; let queue_len = inner.queue_len as usize;
drop(inner); drop(inner);
let mut results = Vec::new();
let mut done = 0; let mut done = 0;
while done != blocks.len() { while done != blocks.len() {
let len = usize::min(blocks.len() - done, queue_len); let len = usize::min(blocks.len() - done, queue_len);
self.write_many_(&blocks[done..(done + len)])?; results.append(&mut self.write_many_(&blocks[done..(done + len)])?);
done += len; done += len;
} }
Ok(())
Ok(results)
} }
} }

View File

@ -203,14 +203,21 @@ impl BTreeWalker {
let mut blocks = Vec::with_capacity(bs.len()); let mut blocks = Vec::with_capacity(bs.len());
for b in bs { for b in bs {
if self.sm_inc(*b)? == 0 { if self.sm_inc(*b)? == 0 {
blocks.push(Block::new(*b)); blocks.push(*b);
} }
} }
self.engine.read_many(&mut blocks)?; let blocks = self.engine.read_many(&blocks[0..])?;
for b in blocks { for b in blocks {
self.walk_node(visitor, &b, false)?; match b {
Err(_e) => {
todo!();
},
Ok(b) => {
self.walk_node(visitor, &b, false)?;
},
}
} }
Ok(()) Ok(())
@ -258,8 +265,7 @@ impl BTreeWalker {
if self.sm_inc(root)? > 0 { if self.sm_inc(root)? > 0 {
Ok(()) Ok(())
} else { } else {
let mut root = Block::new(root); let root = self.engine.read(root)?;
self.engine.read(&mut root)?;
self.walk_node(visitor, &root, true) self.walk_node(visitor, &root, true)
} }
} }
@ -320,19 +326,26 @@ where
let mut blocks = Vec::new(); let mut blocks = Vec::new();
for b in bs { for b in bs {
if w.sm_inc(*b)? == 0 { if w.sm_inc(*b)? == 0 {
blocks.push(Block::new(*b)); blocks.push(*b);
} }
} }
w.engine.read_many(&mut blocks)?; let blocks = w.engine.read_many(&blocks[0..])?;
for b in blocks { for b in blocks {
let w = w.clone(); match b {
let visitor = visitor.clone(); Err(_e) => {
pool.execute(move || { todo!();
// FIXME: return result },
w.walk_node(visitor.as_ref(), &b, false); Ok(b) => {
}); let w = w.clone();
let visitor = visitor.clone();
pool.execute(move || {
// FIXME: return result
w.walk_node(visitor.as_ref(), &b, false);
});
}
}
} }
pool.join(); pool.join();
@ -352,8 +365,7 @@ where
if w.sm_inc(root)? > 0 { if w.sm_inc(root)? > 0 {
Ok(()) Ok(())
} else { } else {
let mut root = Block::new(root); let root = w.engine.read(root)?;
w.engine.read(&mut root)?;
walk_node_threaded(w, pool, visitor, &root, true) walk_node_threaded(w, pool, visitor, &root, true)
} }
} }

View File

@ -8,7 +8,7 @@ use std::thread::{self, JoinHandle};
use threadpool::ThreadPool; use threadpool::ThreadPool;
use crate::checksum; use crate::checksum;
use crate::io_engine::{AsyncIoEngine, Block, IoEngine, SyncIoEngine}; use crate::io_engine::{AsyncIoEngine, IoEngine, SyncIoEngine};
use crate::pdata::btree::*; use crate::pdata::btree::*;
use crate::pdata::space_map::*; use crate::pdata::space_map::*;
use crate::pdata::unpack::*; use crate::pdata::unpack::*;
@ -174,58 +174,65 @@ fn check_space_map(
let mut blocks = Vec::with_capacity(entries.len()); let mut blocks = Vec::with_capacity(entries.len());
for i in &entries { for i in &entries {
blocks.push(Block::new(i.blocknr)); blocks.push(i.blocknr);
} }
// FIXME: we should do this in batches // FIXME: we should do this in batches
engine.read_many(&mut blocks)?; let blocks = engine.read_many(&mut blocks)?;
let mut leaks = 0; let mut leaks = 0;
let mut blocknr = 0; let mut blocknr = 0;
let mut bitmap_leaks = Vec::new(); let mut bitmap_leaks = Vec::new();
for n in 0..entries.len() { for n in 0..entries.len() {
let b = &blocks[n]; let b = &blocks[n];
if checksum::metadata_block_type(&b.get_data()) != checksum::BT::BITMAP { match b {
report.fatal(&format!( Err(_e) => {
"Index entry points to block ({}) that isn't a bitmap", todo!();
b.loc },
)); Ok(b) => {
} if checksum::metadata_block_type(&b.get_data()) != checksum::BT::BITMAP {
report.fatal(&format!(
let bitmap = unpack::<Bitmap>(b.get_data())?; "Index entry points to block ({}) that isn't a bitmap",
let first_blocknr = blocknr; b.loc
let mut contains_leak = false; ));
for e in bitmap.entries.iter() {
if blocknr >= root.nr_blocks {
break;
}
match e {
BitmapEntry::Small(actual) => {
let expected = sm.get(blocknr)?;
if *actual == 1 && expected == 0 {
leaks += 1;
contains_leak = true;
} else if *actual != expected as u8 {
report.fatal(&format!("Bad reference count for {} block {}. Expected {}, but space map contains {}.",
kind, blocknr, expected, actual));
}
} }
BitmapEntry::Overflow => {
let expected = sm.get(blocknr)?; let bitmap = unpack::<Bitmap>(b.get_data())?;
if expected < 3 { let first_blocknr = blocknr;
report.fatal(&format!("Bad reference count for {} block {}. Expected {}, but space map says it's >= 3.", let mut contains_leak = false;
kind, blocknr, expected)); for e in bitmap.entries.iter() {
if blocknr >= root.nr_blocks {
break;
} }
match e {
BitmapEntry::Small(actual) => {
let expected = sm.get(blocknr)?;
if *actual == 1 && expected == 0 {
leaks += 1;
contains_leak = true;
} else if *actual != expected as u8 {
report.fatal(&format!("Bad reference count for {} block {}. Expected {}, but space map contains {}.",
kind, blocknr, expected, actual));
}
}
BitmapEntry::Overflow => {
let expected = sm.get(blocknr)?;
if expected < 3 {
report.fatal(&format!("Bad reference count for {} block {}. Expected {}, but space map says it's >= 3.",
kind, blocknr, expected));
}
}
}
blocknr += 1;
}
if contains_leak {
bitmap_leaks.push(BitmapLeak {
blocknr: first_blocknr,
loc: b.loc,
});
} }
} }
blocknr += 1;
}
if contains_leak {
bitmap_leaks.push(BitmapLeak {
blocknr: first_blocknr,
loc: b.loc,
});
} }
} }
@ -245,36 +252,48 @@ fn repair_space_map(ctx: &Context, entries: Vec<BitmapLeak>, sm: ASpaceMap) -> R
let mut blocks = Vec::with_capacity(entries.len()); let mut blocks = Vec::with_capacity(entries.len());
for i in &entries { for i in &entries {
blocks.push(Block::new(i.loc)); blocks.push(i.loc);
} }
// FIXME: we should do this in batches // FIXME: we should do this in batches
engine.read_many(&mut blocks)?; let rblocks = engine.read_many(&blocks[0..])?;
let mut write_blocks = Vec::new();
for (be, b) in entries.iter().zip(blocks.iter()) { let mut i = 0;
let mut blocknr = be.blocknr; for rb in rblocks {
let mut bitmap = unpack::<Bitmap>(b.get_data())?; if rb.is_err() {
for e in bitmap.entries.iter_mut() { todo!();
if blocknr >= sm.get_nr_blocks()? { } else {
break; let b = rb.unwrap();
} let be = &entries[i];
let mut blocknr = be.blocknr;
if let BitmapEntry::Small(actual) = e { let mut bitmap = unpack::<Bitmap>(b.get_data())?;
let expected = sm.get(blocknr)?; for e in bitmap.entries.iter_mut() {
if *actual == 1 && expected == 0 { if blocknr >= sm.get_nr_blocks()? {
*e = BitmapEntry::Small(0); break;
} }
if let BitmapEntry::Small(actual) = e {
let expected = sm.get(blocknr)?;
if *actual == 1 && expected == 0 {
*e = BitmapEntry::Small(0);
}
}
blocknr += 1;
} }
blocknr += 1; let mut out = Cursor::new(b.get_data());
bitmap.pack(&mut out)?;
checksum::write_checksum(b.get_data(), checksum::BT::BITMAP)?;
write_blocks.push(b);
} }
let mut out = Cursor::new(b.get_data()); i += 1;
bitmap.pack(&mut out)?;
checksum::write_checksum(b.get_data(), checksum::BT::BITMAP)?;
} }
engine.write_many(&blocks)?; engine.write_many(&write_blocks[0..])?;
Ok(()) Ok(())
} }
@ -401,7 +420,7 @@ fn mk_context(opts: &ThinCheckOptions) -> Result<Context> {
opts.auto_repair, opts.auto_repair,
)?); )?);
} else { } else {
nr_threads = num_cpus::get() * 2; nr_threads = std::cmp::max(8, num_cpus::get() * 2);
engine = Arc::new(SyncIoEngine::new(opts.dev, nr_threads, opts.auto_repair)?); engine = Arc::new(SyncIoEngine::new(opts.dev, nr_threads, opts.auto_repair)?);
} }
let pool = ThreadPool::new(nr_threads); let pool = ThreadPool::new(nr_threads);
@ -495,8 +514,7 @@ pub fn check(opts: ThinCheckOptions) -> Result<()> {
report.set_sub_title("metadata space map"); report.set_sub_title("metadata space map");
let root = unpack::<SMRoot>(&sb.metadata_sm_root[0..])?; let root = unpack::<SMRoot>(&sb.metadata_sm_root[0..])?;
let mut b = Block::new(root.bitmap_root); let b = engine.read(root.bitmap_root)?;
engine.read(&mut b)?;
metadata_sm.lock().unwrap().inc(root.bitmap_root, 1)?; metadata_sm.lock().unwrap().inc(root.bitmap_root, 1)?;
let entries = unpack::<MetadataIndex>(b.get_data())?.indexes; let entries = unpack::<MetadataIndex>(b.get_data())?.indexes;

View File

@ -93,8 +93,7 @@ fn unpack(data: &[u8]) -> IResult<&[u8], Superblock> {
} }
pub fn read_superblock(engine: &dyn IoEngine, loc: u64) -> Result<Superblock> { pub fn read_superblock(engine: &dyn IoEngine, loc: u64) -> Result<Superblock> {
let mut b = Block::new(loc); let b = engine.read(loc)?;
engine.read(&mut b)?;
if let Ok((_, sb)) = unpack(&b.get_data()) { if let Ok((_, sb)) = unpack(&b.get_data()) {
Ok(sb) Ok(sb)