[era_dump (rust)] Support wrapped-around era in logical dump

This commit is contained in:
Ming-Hung Tsai 2021-10-08 10:22:20 +08:00
parent 89e568b897
commit 533e174051

View File

@ -1,4 +1,4 @@
use anyhow::Result;
use anyhow::{anyhow, Result};
use fixedbitset::FixedBitSet;
use std::convert::TryFrom;
use std::fs::File;
@ -16,7 +16,7 @@ use crate::io_engine::{AsyncIoEngine, IoEngine, SyncIoEngine};
use crate::pdata::array::{self, ArrayBlock};
use crate::pdata::array_walker::*;
use crate::pdata::bitset::read_bitset_no_err;
use crate::pdata::btree_walker::{btree_to_key_set, btree_to_map};
use crate::pdata::btree_walker::btree_to_map;
//------------------------------------------
@ -58,27 +58,28 @@ impl<'a> ArrayVisitor<u32> for EraEmitter<'a> {
trait Archive {
fn set(&mut self, key: u32, value: u32) -> Result<()>;
fn get(&self, key: u32) -> Result<u32>;
fn get(&self, key: u32) -> Option<u32>;
}
// in-core archive of writeset eras
// In-core archive of writeset eras.
// The actual era for a given block is `digested_era + deltas[b]` if `deltas[b]` is non-zero.
struct EraArchive<T> {
digested_era: u32, // maximum possible era in the era array
deltas: Vec<T>,
}
fn new_era_archive(nr_blocks: u32, archived_min: u32, nr_writesets: u32) -> Box<dyn Archive> {
fn new_era_archive(nr_blocks: u32, archived_begin: u32, nr_writesets: u32) -> Box<dyn Archive> {
match nr_writesets + 1 {
0..=255 => Box::new(EraArchive {
digested_era: archived_min.wrapping_sub(1),
digested_era: archived_begin.wrapping_sub(1),
deltas: vec![0u8; nr_blocks as usize],
}),
256..=65535 => Box::new(EraArchive {
digested_era: archived_min.wrapping_sub(1),
digested_era: archived_begin.wrapping_sub(1),
deltas: vec![0u16; nr_blocks as usize],
}),
_ => Box::new(EraArchive {
digested_era: archived_min.wrapping_sub(1),
digested_era: archived_begin.wrapping_sub(1),
deltas: vec![0u32; nr_blocks as usize],
}),
}
@ -94,12 +95,16 @@ where
Ok(())
}
fn get(&self, block: u32) -> Result<u32> {
let delta: u32 = self.deltas[block as usize].into();
if delta > 0 {
Ok(self.digested_era.wrapping_add(delta))
fn get(&self, block: u32) -> Option<u32> {
if let Some(&delta) = self.deltas.get(block as usize) {
let d: u32 = delta.into();
if d == 0 {
None
} else {
Some(self.digested_era.wrapping_add(d))
}
} else {
Ok(0)
None
}
}
}
@ -136,18 +141,14 @@ impl<'a> ArrayVisitor<u32> for LogicalEraEmitter<'a> {
let begin = index as u32 * b.header.max_entries;
let end = begin + b.header.nr_entries;
for (v, block) in b.values.iter().zip(begin..end) {
let archived = inner
.era_archive
.get(block)
.map_err(|e| array::value_err(format!("{}", e)))?;
let era = if archived > 0 {
ir::Era {
let era;
if let Some(archived) = inner.era_archive.get(block) {
era = ir::Era {
block,
era: archived,
}
} else {
ir::Era { block, era: *v }
era = ir::Era { block, era: *v }
};
inner
@ -297,6 +298,41 @@ pub fn dump_metadata(
//-----------------------------------------
fn get_writesets_ordered(
engine: Arc<dyn IoEngine + Send + Sync>,
sb: &Superblock,
repair: bool,
) -> Result<Vec<(u32, Writeset)>> {
let mut path = vec![0];
let mut writesets =
btree_to_map::<Writeset>(&mut path, engine.clone(), repair, sb.writeset_tree_root)?;
if sb.current_writeset.root != 0 {
if writesets.contains_key(&(sb.current_era as u64)) {
return Err(anyhow!(
"Duplicated era found in current_writeset and the writeset tree"
));
}
writesets.insert(sb.current_era as u64, sb.current_writeset);
}
if writesets.is_empty() {
return Ok(Vec::new());
}
let mut v = Vec::<(u32, Writeset)>::new();
let era_begin = sb.current_era.wrapping_sub((writesets.len() - 1) as u32);
for era in era_begin..=sb.current_era {
if let Some(ws) = writesets.get(&(era as u64)) {
v.push((era, *ws));
} else {
return Err(anyhow!("Writeset of era {} is not present", era));
}
}
Ok(v)
}
fn collate_writeset(index: u32, bitset: &FixedBitSet, archive: &mut dyn Archive) -> Result<()> {
let era_delta = index + 1;
@ -320,12 +356,10 @@ fn collate_writesets(
sb: &Superblock,
repair: bool,
) -> Result<Box<dyn Archive>> {
let mut path = vec![0];
let writesets =
btree_to_map::<Writeset>(&mut path, engine.clone(), repair, sb.writeset_tree_root)?;
let writesets = get_writesets_ordered(engine.clone(), sb, repair)?;
let archived_min = writesets.iter().next().map_or(0u32, |(k, _v)| *k as u32);
let mut archive = new_era_archive(sb.nr_blocks, archived_min, writesets.len() as u32);
let archived_begin = writesets.get(0).map_or(0u32, |(era, _ws)| *era);
let mut archive = new_era_archive(sb.nr_blocks, archived_begin, writesets.len() as u32);
for (index, (_era, ws)) in writesets.iter().enumerate() {
let bitset = read_bitset_no_err(engine.clone(), ws.root, ws.nr_bits as usize, repair)?;
@ -377,14 +411,7 @@ pub fn dump(opts: EraDumpOptions) -> anyhow::Result<()> {
}
let mut out = xml::XmlWriter::new(writer, false);
let mut path = vec![0];
let writesets = btree_to_key_set::<Writeset>(
&mut path,
ctx.engine.clone(),
opts.repair,
sb.writeset_tree_root,
)?;
let writesets = get_writesets_ordered(ctx.engine.clone(), &sb, opts.repair)?;
if opts.logical && !writesets.is_empty() {
dump_metadata_logical(ctx.engine, &mut out, &sb, opts.repair)
} else {