[cache_restore (rust)] First draft
This commit is contained in:
parent
e336b3a63f
commit
ce94ba73a5
77
src/bin/cache_restore.rs
Normal file
77
src/bin/cache_restore.rs
Normal file
@ -0,0 +1,77 @@
|
||||
extern crate clap;
|
||||
extern crate thinp;
|
||||
|
||||
use atty::Stream;
|
||||
use clap::{App, Arg};
|
||||
use std::path::Path;
|
||||
use std::process;
|
||||
use std::process::exit;
|
||||
use std::sync::Arc;
|
||||
use thinp::cache::restore::{restore, CacheRestoreOptions};
|
||||
use thinp::file_utils;
|
||||
use thinp::report::*;
|
||||
|
||||
fn main() {
|
||||
let parser = App::new("cache_restore")
|
||||
.version(thinp::version::tools_version())
|
||||
.about("Convert XML format metadata to binary.")
|
||||
.arg(
|
||||
Arg::with_name("OVERRIDE_MAPPING_ROOT")
|
||||
.help("Specify a mapping root to use")
|
||||
.long("override-mapping-root")
|
||||
.value_name("OVERRIDE_MAPPING_ROOT")
|
||||
.takes_value(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("INPUT")
|
||||
.help("Specify the input xml")
|
||||
.short("i")
|
||||
.long("input")
|
||||
.value_name("INPUT")
|
||||
.required(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("OUTPUT")
|
||||
.help("Specify the output device to check")
|
||||
.short("o")
|
||||
.long("output")
|
||||
.value_name("OUTPUT")
|
||||
.required(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("SYNC_IO")
|
||||
.help("Force use of synchronous io")
|
||||
.long("sync-io"),
|
||||
);
|
||||
|
||||
let matches = parser.get_matches();
|
||||
let input_file = Path::new(matches.value_of("INPUT").unwrap());
|
||||
let output_file = Path::new(matches.value_of("OUTPUT").unwrap());
|
||||
|
||||
if !file_utils::file_exists(input_file) {
|
||||
eprintln!("Couldn't find input file '{:?}'.", &input_file);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
let report;
|
||||
|
||||
if matches.is_present("QUIET") {
|
||||
report = std::sync::Arc::new(mk_quiet_report());
|
||||
} else if atty::is(Stream::Stdout) {
|
||||
report = std::sync::Arc::new(mk_progress_bar_report());
|
||||
} else {
|
||||
report = Arc::new(mk_simple_report());
|
||||
}
|
||||
|
||||
let opts = CacheRestoreOptions {
|
||||
input: &input_file,
|
||||
output: &output_file,
|
||||
async_io: !matches.is_present("SYNC_IO"),
|
||||
report,
|
||||
};
|
||||
|
||||
if let Err(reason) = restore(opts) {
|
||||
println!("{}", reason);
|
||||
process::exit(1);
|
||||
}
|
||||
}
|
1
src/cache/mod.rs
vendored
1
src/cache/mod.rs
vendored
@ -2,5 +2,6 @@ pub mod check;
|
||||
pub mod dump;
|
||||
pub mod hint;
|
||||
pub mod mapping;
|
||||
pub mod restore;
|
||||
pub mod superblock;
|
||||
pub mod xml;
|
||||
|
273
src/cache/restore.rs
vendored
Normal file
273
src/cache/restore.rs
vendored
Normal file
@ -0,0 +1,273 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
|
||||
use std::convert::TryInto;
|
||||
use std::fs::OpenOptions;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::cache::hint::Hint;
|
||||
use crate::cache::mapping::{Mapping, MappingFlags};
|
||||
use crate::cache::superblock::*;
|
||||
use crate::cache::xml::{self, MetadataVisitor, Visit};
|
||||
use crate::io_engine::*;
|
||||
use crate::math::*;
|
||||
use crate::pdata::array_builder::*;
|
||||
use crate::pdata::space_map::*;
|
||||
use crate::report::*;
|
||||
use crate::write_batcher::*;
|
||||
|
||||
//------------------------------------------
|
||||
|
||||
const MAX_CONCURRENT_IO: u32 = 1024;
|
||||
|
||||
//------------------------------------------
|
||||
|
||||
pub struct CacheRestoreOptions<'a> {
|
||||
pub input: &'a Path,
|
||||
pub output: &'a Path,
|
||||
pub async_io: bool,
|
||||
pub report: Arc<Report>,
|
||||
}
|
||||
|
||||
struct Context {
|
||||
_report: Arc<Report>,
|
||||
engine: Arc<dyn IoEngine + Send + Sync>,
|
||||
}
|
||||
|
||||
fn mk_context(opts: &CacheRestoreOptions) -> anyhow::Result<Context> {
|
||||
let engine: Arc<dyn IoEngine + Send + Sync>;
|
||||
|
||||
if opts.async_io {
|
||||
engine = Arc::new(AsyncIoEngine::new(opts.output, MAX_CONCURRENT_IO, true)?);
|
||||
} else {
|
||||
let nr_threads = std::cmp::max(8, num_cpus::get() * 2);
|
||||
engine = Arc::new(SyncIoEngine::new(opts.output, nr_threads, true)?);
|
||||
}
|
||||
|
||||
Ok(Context {
|
||||
_report: opts.report.clone(),
|
||||
engine,
|
||||
})
|
||||
}
|
||||
|
||||
//------------------------------------------
|
||||
|
||||
struct RestoreResult {
|
||||
sb: xml::Superblock,
|
||||
mapping_root: u64,
|
||||
dirty_root: Option<u64>,
|
||||
hint_root: u64,
|
||||
discard_root: u64,
|
||||
}
|
||||
|
||||
struct Restorer<'a> {
|
||||
write_batcher: &'a mut WriteBatcher,
|
||||
sb: Option<xml::Superblock>,
|
||||
mapping_builder: Option<ArrayBuilder<Mapping>>,
|
||||
dirty_builder: Option<ArrayBuilder<u64>>,
|
||||
hint_builder: Option<ArrayBuilder<Hint>>,
|
||||
mapping_root: Option<u64>,
|
||||
dirty_root: Option<u64>,
|
||||
hint_root: Option<u64>,
|
||||
discard_root: Option<u64>,
|
||||
dirty_bits: (u32, u64),
|
||||
}
|
||||
|
||||
impl<'a> Restorer<'a> {
|
||||
fn new(w: &'a mut WriteBatcher) -> Restorer<'a> {
|
||||
Restorer {
|
||||
write_batcher: w,
|
||||
sb: None,
|
||||
mapping_builder: None,
|
||||
dirty_builder: None,
|
||||
hint_builder: None,
|
||||
mapping_root: None,
|
||||
dirty_root: None,
|
||||
hint_root: None,
|
||||
discard_root: None,
|
||||
dirty_bits: (0, 0),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_result(self) -> Result<RestoreResult> {
|
||||
if self.sb.is_none() || self.discard_root.is_none() {
|
||||
return Err(anyhow!("No superblock found in xml file"));
|
||||
}
|
||||
if self.mapping_root.is_none() || self.hint_root.is_none() {
|
||||
return Err(anyhow!("No mappings or hints sections in xml file"));
|
||||
}
|
||||
Ok(RestoreResult {
|
||||
sb: self.sb.unwrap(),
|
||||
mapping_root: self.mapping_root.unwrap(),
|
||||
dirty_root: self.dirty_root,
|
||||
hint_root: self.hint_root.unwrap(),
|
||||
discard_root: self.discard_root.unwrap(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> MetadataVisitor for Restorer<'a> {
|
||||
fn superblock_b(&mut self, sb: &xml::Superblock) -> Result<Visit> {
|
||||
self.sb = Some(sb.clone());
|
||||
self.write_batcher.alloc()?;
|
||||
self.mapping_builder = Some(ArrayBuilder::new(sb.nr_cache_blocks as u64));
|
||||
self.dirty_builder = Some(ArrayBuilder::new(div_up(sb.nr_cache_blocks as u64, 64)));
|
||||
self.hint_builder = Some(ArrayBuilder::new(sb.nr_cache_blocks as u64));
|
||||
|
||||
let discard_builder = ArrayBuilder::<u64>::new(0); // discard bitset is optional
|
||||
self.discard_root = Some(discard_builder.complete(self.write_batcher)?);
|
||||
|
||||
Ok(Visit::Continue)
|
||||
}
|
||||
|
||||
fn superblock_e(&mut self) -> Result<Visit> {
|
||||
Ok(Visit::Continue)
|
||||
}
|
||||
|
||||
fn mappings_b(&mut self) -> Result<Visit> {
|
||||
Ok(Visit::Continue)
|
||||
}
|
||||
|
||||
fn mappings_e(&mut self) -> Result<Visit> {
|
||||
let mut mapping_builder = None;
|
||||
std::mem::swap(&mut self.mapping_builder, &mut mapping_builder);
|
||||
if let Some(builder) = mapping_builder {
|
||||
self.mapping_root = Some(builder.complete(self.write_batcher)?);
|
||||
}
|
||||
|
||||
// push the bufferred trailing bits
|
||||
let b = self.dirty_builder.as_mut().unwrap();
|
||||
b.push_value(
|
||||
self.write_batcher,
|
||||
self.dirty_bits.0 as u64,
|
||||
self.dirty_bits.1,
|
||||
)?;
|
||||
|
||||
let mut dirty_builder = None;
|
||||
std::mem::swap(&mut self.dirty_builder, &mut dirty_builder);
|
||||
if let Some(builder) = dirty_builder {
|
||||
self.dirty_root = Some(builder.complete(self.write_batcher)?);
|
||||
}
|
||||
|
||||
Ok(Visit::Continue)
|
||||
}
|
||||
|
||||
fn mapping(&mut self, m: &xml::Map) -> Result<Visit> {
|
||||
let map = Mapping {
|
||||
oblock: m.oblock,
|
||||
flags: MappingFlags::Valid as u32,
|
||||
};
|
||||
let mapping_builder = self.mapping_builder.as_mut().unwrap();
|
||||
mapping_builder.push_value(self.write_batcher, m.cblock as u64, map)?;
|
||||
|
||||
if m.dirty {
|
||||
let index = m.cblock >> 6;
|
||||
let bi = m.cblock & 63;
|
||||
if index == self.dirty_bits.0 {
|
||||
self.dirty_bits.1 |= 1 << bi;
|
||||
} else {
|
||||
let dirty_builder = self.dirty_builder.as_mut().unwrap();
|
||||
dirty_builder.push_value(
|
||||
self.write_batcher,
|
||||
self.dirty_bits.0 as u64,
|
||||
self.dirty_bits.1,
|
||||
)?;
|
||||
self.dirty_bits.0 = index;
|
||||
self.dirty_bits.1 = 0;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Visit::Continue)
|
||||
}
|
||||
|
||||
fn hints_b(&mut self) -> Result<Visit> {
|
||||
Ok(Visit::Continue)
|
||||
}
|
||||
|
||||
fn hints_e(&mut self) -> Result<Visit> {
|
||||
let mut hint_builder = None;
|
||||
std::mem::swap(&mut self.hint_builder, &mut hint_builder);
|
||||
if let Some(builder) = hint_builder {
|
||||
self.hint_root = Some(builder.complete(self.write_batcher)?);
|
||||
}
|
||||
Ok(Visit::Continue)
|
||||
}
|
||||
|
||||
fn hint(&mut self, h: &xml::Hint) -> Result<Visit> {
|
||||
let hint = Hint {
|
||||
hint: h.data[..].try_into().unwrap(),
|
||||
};
|
||||
let hint_builder = self.hint_builder.as_mut().unwrap();
|
||||
hint_builder.push_value(self.write_batcher, h.cblock as u64, hint)?;
|
||||
Ok(Visit::Continue)
|
||||
}
|
||||
|
||||
fn discards_b(&mut self) -> Result<Visit> {
|
||||
Ok(Visit::Continue)
|
||||
}
|
||||
|
||||
fn discards_e(&mut self) -> Result<Visit> {
|
||||
Ok(Visit::Continue)
|
||||
}
|
||||
|
||||
fn discard(&mut self, _d: &xml::Discard) -> Result<Visit> {
|
||||
Ok(Visit::Continue)
|
||||
}
|
||||
|
||||
fn eof(&mut self) -> Result<Visit> {
|
||||
Ok(Visit::Continue)
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------
|
||||
|
||||
pub fn restore(opts: CacheRestoreOptions) -> Result<()> {
|
||||
let input = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(false)
|
||||
.open(opts.input)?;
|
||||
|
||||
let ctx = mk_context(&opts)?;
|
||||
|
||||
let sm = core_sm(ctx.engine.get_nr_blocks(), u32::MAX);
|
||||
let mut w = WriteBatcher::new(ctx.engine.clone(), sm.clone(), ctx.engine.get_batch_size());
|
||||
|
||||
let mut restorer = Restorer::new(&mut w);
|
||||
xml::read(input, &mut restorer)?;
|
||||
let result = restorer.get_result()?;
|
||||
|
||||
w.flush()?;
|
||||
|
||||
let sb = Superblock {
|
||||
flags: SuperblockFlags {
|
||||
clean_shutdown: true,
|
||||
needs_check: false,
|
||||
},
|
||||
block: SUPERBLOCK_LOCATION,
|
||||
version: 2,
|
||||
policy_name: result.sb.policy.as_bytes().to_vec(),
|
||||
policy_version: vec![2, 0, 0],
|
||||
policy_hint_size: result.sb.hint_width,
|
||||
metadata_sm_root: vec![0; SPACE_MAP_ROOT_SIZE],
|
||||
mapping_root: result.mapping_root,
|
||||
dirty_root: result.dirty_root,
|
||||
hint_root: result.hint_root,
|
||||
discard_root: result.discard_root,
|
||||
discard_block_size: 0,
|
||||
discard_nr_blocks: 0,
|
||||
data_block_size: result.sb.block_size,
|
||||
cache_blocks: result.sb.nr_cache_blocks,
|
||||
compat_flags: 0,
|
||||
compat_ro_flags: 0,
|
||||
incompat_flags: 0,
|
||||
read_hits: 0,
|
||||
read_misses: 9,
|
||||
write_hits: 0,
|
||||
write_misses: 0,
|
||||
};
|
||||
write_superblock(ctx.engine.as_ref(), SUPERBLOCK_LOCATION, &sb)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//------------------------------------------
|
@ -6,25 +6,24 @@ use std::io::Cursor;
|
||||
use crate::checksum;
|
||||
use crate::io_engine::*;
|
||||
use crate::pdata::array::*;
|
||||
use crate::pdata::btree_builder::*;
|
||||
use crate::pdata::unpack::*;
|
||||
use crate::write_batcher::*;
|
||||
|
||||
//------------------------------------------
|
||||
|
||||
pub struct ArrayBuilder<V: Unpack + Pack> {
|
||||
pub struct ArrayBlockBuilder<V: Unpack + Pack> {
|
||||
array_io: ArrayIO<V>,
|
||||
max_entries_per_block: usize,
|
||||
values: VecDeque<(u64, V)>,
|
||||
array_blocks: Vec<ArraySummary>,
|
||||
array_blocks: Vec<u64>,
|
||||
nr_entries: u64,
|
||||
nr_emitted: u64,
|
||||
nr_queued: u64,
|
||||
}
|
||||
|
||||
struct ArraySummary {
|
||||
block: u64,
|
||||
index: u64,
|
||||
nr_entries: usize,
|
||||
pub struct ArrayBuilder<V: Unpack + Pack> {
|
||||
block_builder: ArrayBlockBuilder<V>,
|
||||
}
|
||||
|
||||
struct ArrayIO<V: Unpack + Pack> {
|
||||
@ -43,9 +42,9 @@ fn calc_max_entries<V: Unpack>() -> usize {
|
||||
|
||||
//------------------------------------------
|
||||
|
||||
impl<V: Unpack + Pack + Clone + Default> ArrayBuilder<V> {
|
||||
pub fn new(nr_entries: u64) -> ArrayBuilder<V> {
|
||||
ArrayBuilder {
|
||||
impl<V: Unpack + Pack + Clone + Default> ArrayBlockBuilder<V> {
|
||||
pub fn new(nr_entries: u64) -> ArrayBlockBuilder<V> {
|
||||
ArrayBlockBuilder {
|
||||
array_io: ArrayIO::new(),
|
||||
max_entries_per_block: calc_max_entries::<V>(),
|
||||
values: VecDeque::new(),
|
||||
@ -56,7 +55,7 @@ impl<V: Unpack + Pack + Clone + Default> ArrayBuilder<V> {
|
||||
}
|
||||
}
|
||||
|
||||
fn push_value(&mut self, w: &mut WriteBatcher, index: u64, v: V) -> Result<()> {
|
||||
pub fn push_value(&mut self, w: &mut WriteBatcher, index: u64, v: V) -> Result<()> {
|
||||
assert!(index >= self.nr_emitted + self.nr_queued);
|
||||
assert!(index < self.nr_entries);
|
||||
|
||||
@ -70,8 +69,9 @@ impl<V: Unpack + Pack + Clone + Default> ArrayBuilder<V> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn complete(mut self, w: &mut WriteBatcher) -> Result<Vec<ArraySummary>> {
|
||||
pub fn complete(mut self, w: &mut WriteBatcher) -> Result<Vec<u64>> {
|
||||
if self.nr_emitted + self.nr_queued < self.nr_entries {
|
||||
// FIXME: flushing with a default values looks confusing
|
||||
self.push_value(w, self.nr_entries - 1, Default::default())?;
|
||||
}
|
||||
self.emit_all(w)?;
|
||||
@ -112,26 +112,24 @@ impl<V: Unpack + Pack + Clone + Default> ArrayBuilder<V> {
|
||||
let len = self.values.front().unwrap().0 - self.nr_emitted + 1;
|
||||
if len <= nr_free as u64 {
|
||||
let (_, v) = self.values.pop_front().unwrap();
|
||||
values.resize_with(len as usize - 1, Default::default);
|
||||
if len > 1 {
|
||||
values.resize_with(values.len() + len as usize - 1, Default::default);
|
||||
}
|
||||
values.push(v);
|
||||
nr_free -= len as usize;
|
||||
self.nr_emitted += len;
|
||||
self.nr_queued -= len;
|
||||
} else {
|
||||
values.resize_with(nr_free, Default::default);
|
||||
values.resize_with(values.len() + nr_free as usize, Default::default);
|
||||
self.nr_emitted += nr_free as u64;
|
||||
self.nr_queued -= nr_free as u64;
|
||||
nr_free = 0;
|
||||
}
|
||||
}
|
||||
|
||||
let nr_entries = values.len();
|
||||
let wresult = self.array_io.write(w, values)?;
|
||||
|
||||
self.array_blocks.push(ArraySummary {
|
||||
block: wresult.loc,
|
||||
index: self.nr_emitted / self.max_entries_per_block as u64,
|
||||
nr_entries,
|
||||
});
|
||||
self.array_blocks.push(wresult.loc);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -139,6 +137,30 @@ impl<V: Unpack + Pack + Clone + Default> ArrayBuilder<V> {
|
||||
|
||||
//------------------------------------------
|
||||
|
||||
impl<V: Unpack + Pack + Clone + Default> ArrayBuilder<V> {
|
||||
pub fn new(nr_entries: u64) -> ArrayBuilder<V> {
|
||||
ArrayBuilder {
|
||||
block_builder: ArrayBlockBuilder::<V>::new(nr_entries),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_value(&mut self, w: &mut WriteBatcher, index: u64, v: V) -> Result<()> {
|
||||
self.block_builder.push_value(w, index, v)
|
||||
}
|
||||
|
||||
pub fn complete(self, w: &mut WriteBatcher) -> Result<u64> {
|
||||
let blocks = self.block_builder.complete(w)?;
|
||||
let mut index_builder = Builder::<u64>::new(Box::new(NoopRC {}));
|
||||
|
||||
for (i, b) in blocks.iter().enumerate() {
|
||||
index_builder.push_value(w, i as u64, *b)?;
|
||||
}
|
||||
index_builder.complete(w)
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------
|
||||
|
||||
impl<V: Unpack + Pack> ArrayIO<V> {
|
||||
pub fn new() -> ArrayIO<V> {
|
||||
ArrayIO {
|
||||
|
@ -1,4 +1,5 @@
|
||||
pub mod array;
|
||||
pub mod array_builder;
|
||||
pub mod array_walker;
|
||||
pub mod bitset;
|
||||
pub mod btree;
|
||||
|
Loading…
Reference in New Issue
Block a user