diff --git a/Cargo.lock b/Cargo.lock index 0ac75d3..0478876 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -250,6 +250,16 @@ dependencies = [ "libc", ] +[[package]] +name = "io-uring" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31e11f8867575fc79a3e73e5f554d0b7386bc4a6f469039e8a83136c724fd81" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -527,15 +537,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "rio" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce02a35f6fbcc9c5ce0674f17d33fb56afbe0bec6f6263affed4b1ebf594d95d" -dependencies = [ - "libc", -] - [[package]] name = "ryu" version = "1.0.5" @@ -605,6 +606,7 @@ dependencies = [ "fixedbitset", "flate2", "futures", + "io-uring", "libc", "nix", "nom", @@ -615,7 +617,6 @@ dependencies = [ "quickcheck", "quickcheck_macros", "rand", - "rio", "tempfile", "thiserror", ] diff --git a/Cargo.toml b/Cargo.toml index 0dd5c32..ddcf6bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ crc32c = "0.4" fixedbitset = "0.3" futures = "0.3" flate2 = "1.0" +io-uring = "0.3" libc = "0.2.71" nix = "0.17" nom = "5.1" @@ -21,7 +22,6 @@ num-derive = "0.3" num-traits = "0.2" quick-xml = "0.18" rand = "0.7" -rio = "0.9" tempfile = "3.1" thiserror = "1.0" diff --git a/src/block_manager.rs b/src/block_manager.rs index 2ce48cf..f813012 100644 --- a/src/block_manager.rs +++ b/src/block_manager.rs @@ -1,5 +1,4 @@ use anyhow::{anyhow, Result}; -use rio::{self, Completion, Rio}; use std::alloc::{alloc, dealloc, Layout}; use std::collections::HashMap; use std::fs::File; @@ -7,14 +6,17 @@ use std::fs::OpenOptions; use std::io; use std::io::{Read, Seek}; use std::os::unix::fs::OpenOptionsExt; +use std::os::unix::io::AsRawFd; use std::path::Path; use std::sync::{Arc, Mutex}; +use io_uring::opcode::{self, types}; +use io_uring::IoUring; + +//------------------------------------------ pub const BLOCK_SIZE: usize = 4096; const ALIGN: usize = 4096; -// FIXME: introduce a cache -// FIXME: use O_DIRECT #[derive(Debug)] pub struct Block { pub loc: u64, @@ -84,7 +86,8 @@ impl IoEngine for SyncIoEngine { } fn read(&mut self, b: &mut Block) -> Result<()> { - self.input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; + self.input + .seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; self.input.read_exact(&mut b.get_data())?; Ok(()) @@ -101,53 +104,83 @@ impl IoEngine for SyncIoEngine { //------------------------------------------ -/* pub struct AsyncIoEngine { - ring: Rio, + ring: IoUring, nr_blocks: u64, input: File, } impl AsyncIoEngine { - pub fn new(path: &Path) -> Result { + pub fn new(path: &Path, queue_len: u32) -> Result { let input = OpenOptions::new() .read(true) .write(false) .custom_flags(libc::O_DIRECT) .open(path)?; - let ring = rio::new()?; - - Ok(IoEngine { - ring, + Ok(AsyncIoEngine { + ring: IoUring::new(queue_len)?, nr_blocks: get_nr_blocks(path)?, input, }) } +} - pub fn read(&self, blocks: &mut Vec) -> Result<()> { - // FIXME: using a bounce buffer as a hack, since b.get_data() will not have - // a big enough lifetime. - let mut bounce_buffer = vec![0; blocks.len() * BLOCK_SIZE]; - let mut completions = Vec::new(); +impl IoEngine for AsyncIoEngine { + fn get_nr_blocks(&self) -> u64 { + self.nr_blocks + } - for n in 0..blocks.len() { - let b = &blocks[n]; - let at = b.loc * BLOCK_SIZE as u64; - let completion = self.ring.read_at(&self.input, &slice, at); - completions.push(completion); + fn read(&mut self, b: &mut Block) -> Result<()> { + let fd = types::Target::Fd(self.input.as_raw_fd()); + let read_e = opcode::Read::new(fd, b.data, BLOCK_SIZE as u32).offset(b.loc as i64 * BLOCK_SIZE as i64); + + unsafe { + let mut queue = self.ring.submission().available(); + queue.push(read_e.build().user_data(1)) + .ok() + .expect("queue is full"); } - for c in completions { - let n = c.wait()?; - if n != BLOCK_SIZE { - return Err(anyhow!("short read")); + self.ring.submit_and_wait(1)?; + + let cqes = self.ring.completion().available().collect::>(); + + // FIXME: return proper errors + assert_eq!(cqes.len(), 1); + assert_eq!(cqes[0].user_data(), 1); + assert_eq!(cqes[0].result(), BLOCK_SIZE as i32); + + Ok(()) + } + + fn read_many(&mut self, blocks: &mut Vec) -> Result<()> { + let count = blocks.len(); + let fd = types::Target::Fd(self.input.as_raw_fd()); + + for b in blocks.into_iter() { + let read_e = opcode::Read::new(fd, b.data, BLOCK_SIZE as u32).offset(b.loc as i64 * BLOCK_SIZE as i64); + + unsafe { + let mut queue = self.ring.submission().available(); + queue.push(read_e.build().user_data(1)) + .ok() + .expect("queue is full"); } } - // copy out of the bounce buffer + self.ring.submit_and_wait(count)?; + + let cqes = self.ring.completion().available().collect::>(); + + // FIXME: return proper errors + assert_eq!(cqes.len(), count); + for c in &cqes { + assert_eq!(c.result(), BLOCK_SIZE as i32); + } Ok(()) } } -*/ + +//------------------------------------------ diff --git a/src/thin/check.rs b/src/thin/check.rs index a46385a..e22af6f 100644 --- a/src/thin/check.rs +++ b/src/thin/check.rs @@ -7,7 +7,7 @@ use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; -use crate::block_manager::{Block, IoEngine, SyncIoEngine, BLOCK_SIZE}; +use crate::block_manager::{Block, IoEngine, AsyncIoEngine, SyncIoEngine, BLOCK_SIZE}; use crate::checksum; use crate::thin::superblock::*; @@ -220,7 +220,8 @@ fn walk_node( } pub fn check(dev: &Path) -> Result<()> { - let mut engine = SyncIoEngine::new(dev)?; + //let mut engine = SyncIoEngine::new(dev)?; + let mut engine = AsyncIoEngine::new(dev, 256)?; let now = Instant::now(); let sb = read_superblock(&mut engine, SUPERBLOCK_LOCATION)?; @@ -232,8 +233,7 @@ pub fn check(dev: &Path) -> Result<()> { walk_node(&mut engine, &mut seen, MappingLevel::Top, &root)?; println!( - "read superblock, mapping root at {}, {} ms", - sb.mapping_root, + "read mapping tree in {} ms", now.elapsed().as_millis() );