Lot's of work on thin_restore

This commit is contained in:
Joe Thornber 2021-03-24 14:20:20 +00:00
parent 12c1c6e1f5
commit 040e3bfc2d
17 changed files with 858 additions and 348 deletions

View File

@ -27,6 +27,7 @@ fn main() {
.help("Specify the input xml")
.short("i")
.long("input")
.value_name("INPUT")
.required(true),
)
.arg(
@ -34,6 +35,7 @@ fn main() {
.help("Specify the output device to check")
.short("o")
.long("output")
.value_name("OUTPUT")
.required(true),
)
.arg(

View File

@ -6,7 +6,6 @@ use std::io::Cursor;
const BLOCK_SIZE: u64 = 4096;
#[allow(dead_code)]
const MAGIC: u64 = 0xa537a0aa6309ef77;
const SUPERBLOCK_CSUM_XOR: u32 = 160774;
const BITMAP_CSUM_XOR: u32 = 240779;
const INDEX_CSUM_XOR: u32 = 160478;

View File

@ -26,6 +26,8 @@ pub struct Block {
}
impl Block {
// Creates a new block that corresponds to the given location. The
// memory is not initialised.
pub fn new(loc: u64) -> Block {
let layout = Layout::from_size_align(BLOCK_SIZE, ALIGN).unwrap();
let ptr = unsafe { alloc(layout) };
@ -42,6 +44,12 @@ impl Block {
pub fn get_data<'a>(&self) -> &'a mut [u8] {
unsafe { std::slice::from_raw_parts_mut::<'a>(self.data, BLOCK_SIZE) }
}
pub fn zero(&mut self) {
unsafe {
std::ptr::write_bytes(self.data, 0, BLOCK_SIZE);
}
}
}
impl Drop for Block {

View File

@ -19,6 +19,7 @@ pub mod cache;
pub mod checksum;
pub mod file_utils;
pub mod io_engine;
pub mod math;
pub mod pack;
pub mod pdata;
pub mod report;

View File

@ -539,6 +539,7 @@ pub fn unpack_node<V: Unpack>(
}
if !is_root {
/*
let min = header.max_entries / 3;
if header.nr_entries < min {
return Err(node_err_s(
@ -549,6 +550,7 @@ pub fn unpack_node<V: Unpack>(
),
));
}
*/
}
}

View File

@ -21,6 +21,14 @@ pub trait RefCounter<Value> {
fn dec(&mut self, v: &Value) -> Result<()>;
}
pub struct NoopRC {}
impl<Value> RefCounter<Value> for NoopRC {
fn get(&self, _v: &Value) -> Result<u32> {Ok(0)}
fn inc(&mut self, _v: &Value) -> Result<()> {Ok(())}
fn dec(&mut self, _v: &Value) -> Result<()> {Ok(())}
}
/// Wraps a space map up to become a RefCounter.
struct SMRefCounter {
sm: Arc<Mutex<dyn SpaceMap>>,
@ -126,12 +134,12 @@ fn write_node_<V: Unpack + Pack>(w: &mut WriteBatcher, mut node: Node<V>) -> Res
let keys = node.get_keys();
let first_key = keys.first().unwrap_or(&0u64).clone();
let loc = w.alloc()?;
node.set_block(loc);
let b = w.alloc()?;
node.set_block(b.loc);
let b = Block::new(loc);
let mut cursor = Cursor::new(b.get_data());
pack_node(&node, &mut cursor)?;
let loc = b.loc;
w.write(b, checksum::BT::NODE)?;
Ok(WriteResult { first_key, loc })
@ -149,7 +157,7 @@ pub trait NodeIO<V: Unpack + Pack> {
) -> Result<(Vec<u64>, Vec<V>)>;
}
struct LeafIO {}
pub struct LeafIO {}
impl<V: Unpack + Pack> NodeIO<V> for LeafIO {
fn write(&self, w: &mut WriteBatcher, keys: Vec<u64>, values: Vec<V>) -> Result<WriteResult> {
@ -229,7 +237,6 @@ impl NodeIO<u64> for InternalIO {
/// Care is taken to make sure that all nodes are at least half full unless there's
/// only a single node.
pub struct NodeBuilder<V: Pack + Unpack> {
batcher: WriteBatcher,
nio: Box<dyn NodeIO<V>>,
value_rc: Box<dyn RefCounter<V>>,
max_entries_per_node: usize,
@ -252,15 +259,13 @@ pub struct NodeSummary {
shared: bool,
}
impl<V: Pack + Unpack + Clone> NodeBuilder<V> {
impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> {
/// Create a new NodeBuilder
pub fn new(
batcher: WriteBatcher,
nio: Box<dyn NodeIO<V>>,
value_rc: Box<dyn RefCounter<V>>,
) -> Self {
NodeBuilder {
batcher,
nio,
value_rc,
max_entries_per_node: calc_max_entries::<V>(),
@ -270,12 +275,12 @@ impl<V: Pack + Unpack + Clone> NodeBuilder<V> {
}
/// Push a single value. This may emit a new node, hence the Result
/// return type. The value's ref count will be incremented.
pub fn push_value(&mut self, key: u64, val: V) -> Result<()> {
pub fn push_value(&mut self, w: &mut WriteBatcher, key: u64, val: V) -> Result<()> {
// Have we got enough values to emit a node? We try and keep
// at least max_entries_per_node entries unflushed so we
// can ensure the final node is balanced properly.
if self.values.len() == self.max_entries_per_node * 2 {
self.emit_node()?;
self.emit_node(w)?;
}
self.value_rc.inc(&val)?;
@ -289,7 +294,7 @@ impl<V: Pack + Unpack + Clone> NodeBuilder<V> {
/// Any shared nodes that are used have their block incremented in
/// the space map. Will only increment the ref count for values
/// contained in the nodes if it unpacks them.
pub fn push_nodes(&mut self, nodes: &Vec<NodeSummary>) -> Result<()> {
pub fn push_nodes(&mut self, w: &mut WriteBatcher, nodes: &Vec<NodeSummary>) -> Result<()> {
assert!(nodes.len() > 0);
// As a sanity check we make sure that all the shared nodes contain the
@ -305,7 +310,7 @@ impl<V: Pack + Unpack + Clone> NodeBuilder<V> {
if self.values.len() < half_full {
// To avoid writing an under populated node we have to grab some
// values from the first of the shared nodes.
let (keys, values) = self.read_node(nodes.get(0).unwrap().block)?;
let (keys, values) = self.read_node(w, nodes.get(0).unwrap().block)?;
for i in 0..keys.len() {
self.value_rc.inc(&values[i])?;
@ -313,21 +318,21 @@ impl<V: Pack + Unpack + Clone> NodeBuilder<V> {
}
// Flush all the values.
self.emit_all()?;
self.emit_all(w)?;
// Add the remaining nodes.
for i in 1..nodes.len() {
let n = nodes.get(i).unwrap();
self.batcher.sm.lock().unwrap().inc(n.block, 1)?;
w.sm.lock().unwrap().inc(n.block, 1)?;
self.nodes.push(n.clone());
}
} else {
// Flush all the values.
self.emit_all()?;
self.emit_all(w)?;
// add the nodes
for n in nodes {
self.batcher.sm.lock().unwrap().inc(n.block, 1)?;
w.sm.lock().unwrap().inc(n.block, 1)?;
self.nodes.push(n.clone());
}
}
@ -337,16 +342,21 @@ impl<V: Pack + Unpack + Clone> NodeBuilder<V> {
/// Signal that no more values or nodes will be pushed. Returns a
/// vector of the built nodes. Consumes the builder.
pub fn complete(mut self) -> Result<Vec<NodeSummary>> {
pub fn complete(mut self, w: &mut WriteBatcher) -> Result<Vec<NodeSummary>> {
let half_full = self.max_entries_per_node / 2;
if (self.nodes.len() > 0) && (self.values.len() < half_full) {
// We don't have enough values to emit a node. So we're going to
// have to rebalance with the previous node.
self.unshift_node()?;
self.unshift_node(w)?;
}
self.emit_all()?;
self.emit_all(w)?;
if self.nodes.len() == 0 {
self.emit_empty_leaf(w)?
}
Ok(self.nodes)
}
@ -354,13 +364,13 @@ impl<V: Pack + Unpack + Clone> NodeBuilder<V> {
// We're only interested in the keys and values from the node, and
// not whether it's a leaf or internal node.
fn read_node(&self, block: u64) -> Result<(Vec<u64>, Vec<V>)> {
self.nio.read(&self.batcher.engine, block)
fn read_node(&self, w: &WriteBatcher, block: u64) -> Result<(Vec<u64>, Vec<V>)> {
self.nio.read(&w.engine, block)
}
/// Writes a node with the first 'nr_entries' values.
fn emit_values(&mut self, nr_entries: usize) -> Result<()> {
assert!(self.values.len() <= nr_entries);
fn emit_values(&mut self, w: &mut WriteBatcher, nr_entries: usize) -> Result<()> {
assert!(nr_entries <= self.values.len());
// Write the node
let mut keys = Vec::new();
@ -372,7 +382,7 @@ impl<V: Pack + Unpack + Clone> NodeBuilder<V> {
values.push(v);
}
let wresult = self.nio.write(&mut self.batcher, keys, values)?;
let wresult = self.nio.write(w, keys, values)?;
// Push a summary to the 'nodes' vector.
self.nodes.push(NodeSummary {
@ -385,13 +395,13 @@ impl<V: Pack + Unpack + Clone> NodeBuilder<V> {
}
/// Writes a full node.
fn emit_node(&mut self) -> Result<()> {
self.emit_values(self.max_entries_per_node)
fn emit_node(&mut self, w: &mut WriteBatcher) -> Result<()> {
self.emit_values(w, self.max_entries_per_node)
}
/// Emits all remaining values. Panics if there are more than 2 *
/// max_entries_per_node values.
fn emit_all(&mut self) -> Result<()> {
fn emit_all(&mut self, w: &mut WriteBatcher) -> Result<()> {
match self.values.len() {
0 => {
// There's nothing to emit
@ -399,14 +409,14 @@ impl<V: Pack + Unpack + Clone> NodeBuilder<V> {
}
n if n <= self.max_entries_per_node => {
// Emit a single node.
self.emit_values(n)
self.emit_values(w, n)
}
n if n <= self.max_entries_per_node * 2 => {
// Emit two nodes.
let n1 = n / 2;
let n2 = n - n1;
self.emit_values(n1)?;
self.emit_values(n2)
self.emit_values(w, n1)?;
self.emit_values(w, n2)
}
_ => {
panic!("self.values shouldn't have more than 2 * max_entries_per_node entries");
@ -414,13 +424,17 @@ impl<V: Pack + Unpack + Clone> NodeBuilder<V> {
}
}
fn emit_empty_leaf(&mut self, w: &mut WriteBatcher) -> Result<()> {
self.emit_values(w, 0)
}
/// Pops the last node, and prepends it's values to 'self.values'. Used
/// to rebalance when we have insufficient values for a final node. The
/// node is decremented in the space map.
fn unshift_node(&mut self) -> Result<()> {
fn unshift_node(&mut self, w: &mut WriteBatcher) -> Result<()> {
let ls = self.nodes.pop().unwrap();
let (keys, values) = self.read_node(ls.block)?;
self.batcher.sm.lock().unwrap().dec(ls.block)?;
let (keys, values) = self.read_node(w, ls.block)?;
w.sm.lock().unwrap().dec(ls.block)?;
let mut vals = VecDeque::new();
@ -442,57 +456,47 @@ impl<V: Pack + Unpack + Clone> NodeBuilder<V> {
//------------------------------------------
pub struct Builder<V: Unpack + Pack> {
engine: Arc<dyn IoEngine + Send + Sync>,
sm: Arc<Mutex<dyn SpaceMap>>,
leaf_builder: NodeBuilder<V>,
}
const BATCH_SIZE: usize = 128;
impl<V: Unpack + Pack + Clone> Builder<V> {
pub fn new(
engine: Arc<dyn IoEngine + Send + Sync>,
sm: Arc<Mutex<dyn SpaceMap>>,
value_rc: Box<dyn RefCounter<V>>,
) -> Builder<V> {
Builder {
engine: engine.clone(),
sm: sm.clone(),
leaf_builder: NodeBuilder::new(
WriteBatcher::new(engine.clone(), sm.clone(), BATCH_SIZE),
Box::new(LeafIO {}),
value_rc,
),
}
}
pub fn push_value(&mut self, k: u64, v: V) -> Result<()> {
self.leaf_builder.push_value(k, v)
pub fn push_value(&mut self, w: &mut WriteBatcher, k: u64, v: V) -> Result<()> {
self.leaf_builder.push_value(w, k, v)
}
pub fn push_leaves(&mut self, leaves: &Vec<NodeSummary>) -> Result<()> {
self.leaf_builder.push_nodes(leaves)
pub fn push_leaves(&mut self, w: &mut WriteBatcher, leaves: &Vec<NodeSummary>) -> Result<()> {
self.leaf_builder.push_nodes(w, leaves)
}
pub fn complete(self) -> Result<u64> {
let mut nodes = self.leaf_builder.complete()?;
pub fn complete(self, w: &mut WriteBatcher) -> Result<u64> {
let mut nodes = self.leaf_builder.complete(w)?;
// Now we iterate, adding layers of internal nodes until we end
// up with a single root.
while nodes.len() > 1 {
let mut builder = NodeBuilder::new(
WriteBatcher::new(self.engine.clone(), self.sm.clone(), BATCH_SIZE),
Box::new(InternalIO {}),
Box::new(SMRefCounter {
sm: self.sm.clone(),
sm: w.sm.clone(),
}),
);
for n in nodes {
builder.push_value(n.key, n.block)?;
builder.push_value(w, n.key, n.block)?;
}
nodes = builder.complete()?;
nodes = builder.complete(w)?;
}
assert!(nodes.len() == 1);
@ -502,16 +506,3 @@ impl<V: Unpack + Pack + Clone> Builder<V> {
//------------------------------------------
/*
=======
fn write_node_<V: Unpack + Pack>(w: &mut WriteBatcher, mut node: Node<V>) -> Result<(u64, u64)> {
let keys = node.get_keys();
let first_key = *keys.first().unwrap_or(&0u64);
>>>>>>> main
=======
fn write_node_<V: Unpack + Pack>(w: &mut WriteBatcher, mut node: Node<V>) -> Result<(u64, u64)> {
let keys = node.get_keys();
let first_key = *keys.first().unwrap_or(&0u64);
>>>>>>> main
*/

View File

@ -7,5 +7,6 @@ pub mod btree_merge;
pub mod btree_leaf_walker;
pub mod btree_walker;
pub mod space_map;
pub mod space_map_disk;
pub mod unpack;

View File

@ -1,224 +1,8 @@
use anyhow::{anyhow, Result};
use byteorder::{LittleEndian, WriteBytesExt};
use anyhow::Result;
use fixedbitset::FixedBitSet;
use nom::{multi::count, number::complete::*, IResult};
use std::boxed::Box;
use std::sync::{Arc, Mutex};
use crate::io_engine::*;
use crate::pdata::unpack::{Pack, Unpack};
//------------------------------------------
#[derive(Debug)]
pub struct SMRoot {
pub nr_blocks: u64,
pub nr_allocated: u64,
pub bitmap_root: u64,
pub ref_count_root: u64,
}
pub fn unpack_root(data: &[u8]) -> Result<SMRoot> {
match SMRoot::unpack(data) {
Err(_e) => Err(anyhow!("couldn't parse SMRoot")),
Ok((_i, v)) => Ok(v),
}
}
impl Unpack for SMRoot {
fn disk_size() -> u32 {
32
}
fn unpack(data: &[u8]) -> IResult<&[u8], SMRoot> {
let (i, nr_blocks) = le_u64(data)?;
let (i, nr_allocated) = le_u64(i)?;
let (i, bitmap_root) = le_u64(i)?;
let (i, ref_count_root) = le_u64(i)?;
Ok((
i,
SMRoot {
nr_blocks,
nr_allocated,
bitmap_root,
ref_count_root,
},
))
}
}
//------------------------------------------
#[derive(Clone, Copy, Debug)]
pub struct IndexEntry {
pub blocknr: u64,
pub nr_free: u32,
pub none_free_before: u32,
}
impl Unpack for IndexEntry {
fn disk_size() -> u32 {
16
}
fn unpack(data: &[u8]) -> IResult<&[u8], Self> {
let (i, blocknr) = le_u64(data)?;
let (i, nr_free) = le_u32(i)?;
let (i, none_free_before) = le_u32(i)?;
Ok((
i,
IndexEntry {
blocknr,
nr_free,
none_free_before,
},
))
}
}
//------------------------------------------
pub const MAX_METADATA_BITMAPS: usize = 255;
pub struct MetadataIndex {
pub indexes: Vec<IndexEntry>,
}
impl Unpack for MetadataIndex {
fn disk_size() -> u32 {
BLOCK_SIZE as u32
}
fn unpack(data: &[u8]) -> IResult<&[u8], Self> {
let (i, _csum) = le_u32(data)?;
let (i, _padding) = le_u32(i)?;
let (i, _blocknr) = le_u64(i)?;
let (i, indexes) = count(IndexEntry::unpack, MAX_METADATA_BITMAPS)(i)?;
Ok((i, MetadataIndex { indexes }))
}
}
//------------------------------------------
#[derive(Debug)]
pub struct BitmapHeader {
pub csum: u32,
pub not_used: u32,
pub blocknr: u64,
}
impl Unpack for BitmapHeader {
fn disk_size() -> u32 {
16
}
fn unpack(data: &[u8]) -> IResult<&[u8], Self> {
let (i, csum) = le_u32(data)?;
let (i, not_used) = le_u32(i)?;
let (i, blocknr) = le_u64(i)?;
Ok((
i,
BitmapHeader {
csum,
not_used,
blocknr,
},
))
}
}
impl Pack for BitmapHeader {
fn pack<W: WriteBytesExt>(&self, out: &mut W) -> Result<()> {
out.write_u32::<LittleEndian>(self.csum)?;
out.write_u32::<LittleEndian>(self.not_used)?;
out.write_u64::<LittleEndian>(self.blocknr)?;
Ok(())
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum BitmapEntry {
Small(u8),
Overflow,
}
#[derive(Debug)]
pub struct Bitmap {
pub header: BitmapHeader,
pub entries: Vec<BitmapEntry>,
}
impl Unpack for Bitmap {
fn disk_size() -> u32 {
BLOCK_SIZE as u32
}
fn unpack(data: &[u8]) -> IResult<&[u8], Self> {
let (mut i, header) = BitmapHeader::unpack(data)?;
let nr_words = (BLOCK_SIZE - BitmapHeader::disk_size() as usize) / 8;
let mut entries = Vec::with_capacity(nr_words * 32);
for _w in 0..nr_words {
let (tmp, mut word) = le_u64(i)?;
for _b in 0..32 {
let val = word & 0x3;
word >>= 2;
// The bits are stored with the high bit at b * 2 + 1,
// and low at b *2. So we have to interpret this val.
entries.push(match val {
0 => BitmapEntry::Small(0),
1 => BitmapEntry::Small(2),
2 => BitmapEntry::Small(1),
_ => BitmapEntry::Overflow,
});
}
i = tmp;
}
Ok((i, Bitmap { header, entries }))
}
}
impl Pack for Bitmap {
fn pack<W: WriteBytesExt>(&self, out: &mut W) -> Result<()> {
use BitmapEntry::*;
BitmapHeader::pack(&self.header, out)?;
for chunk in self.entries.chunks(32) {
let mut w = 0u64;
for e in chunk {
w >>= 2;
match e {
Small(0) => {}
Small(1) => {
w |= 0x2 << 62;
}
Small(2) => {
w |= 0x1 << 62;
}
Small(_) => {
return Err(anyhow!("Bad small value in bitmap entry"));
}
Overflow => {
w |= 0x3 << 62;
}
}
}
u64::pack(&w, out)?;
}
Ok(())
}
}
//------------------------------------------
pub trait SpaceMap {

399
src/pdata/space_map_disk.rs Normal file
View File

@ -0,0 +1,399 @@
use anyhow::{anyhow, Result};
use byteorder::{LittleEndian, WriteBytesExt};
use nom::{number::complete::*, IResult};
use std::io::Cursor;
use std::collections::BTreeMap;
use crate::checksum;
use crate::io_engine::*;
use crate::math::*;
use crate::pdata::btree_builder::*;
use crate::pdata::space_map::*;
use crate::pdata::unpack::*;
use crate::write_batcher::*;
//--------------------------------
const MAX_METADATA_BITMAPS: usize = 255;
// const MAX_METADATA_BLOCKS: u64 = 255 * ((1 << 14) - 64);
const ENTRIES_PER_BYTE: usize = 4;
const ENTRIES_PER_BITMAP: usize = WORDS_PER_BITMAP * 8 * ENTRIES_PER_BYTE;
//--------------------------------
#[derive(Clone, Copy, Debug)]
pub struct IndexEntry {
pub blocknr: u64,
pub nr_free: u32,
pub none_free_before: u32,
}
impl Unpack for IndexEntry {
fn disk_size() -> u32 {
16
}
fn unpack(i: &[u8]) -> IResult<&[u8], IndexEntry> {
let (i, blocknr) = le_u64(i)?;
let (i, nr_free) = le_u32(i)?;
let (i, none_free_before) = le_u32(i)?;
Ok((
i,
IndexEntry {
blocknr,
nr_free,
none_free_before,
},
))
}
}
impl Pack for IndexEntry {
fn pack<W: WriteBytesExt>(&self, w: &mut W) -> Result<()> {
w.write_u64::<LittleEndian>(self.blocknr)?;
w.write_u32::<LittleEndian>(self.nr_free)?;
w.write_u32::<LittleEndian>(self.none_free_before)?;
Ok(())
}
}
//--------------------------------
pub struct MetadataIndex {
pub blocknr: u64,
pub indexes: Vec<IndexEntry>,
}
impl Unpack for MetadataIndex {
fn disk_size() -> u32 {
BLOCK_SIZE as u32
}
fn unpack(i: &[u8]) -> IResult<&[u8], MetadataIndex> {
// FIXME: check the checksum
let (i, _csum) = le_u32(i)?;
let (i, _padding) = le_u32(i)?;
let (i, blocknr) = le_u64(i)?;
let (i, indexes) = nom::multi::count(IndexEntry::unpack, MAX_METADATA_BITMAPS)(i)?;
Ok((i, MetadataIndex { blocknr, indexes }))
}
}
impl Pack for MetadataIndex {
fn pack<W: WriteBytesExt>(&self, w: &mut W) -> Result<()> {
w.write_u32::<LittleEndian>(0)?; // csum
w.write_u32::<LittleEndian>(0)?; // padding
w.write_u64::<LittleEndian>(self.blocknr)?;
assert!(self.indexes.len() <= MAX_METADATA_BITMAPS);
for ie in &self.indexes {
ie.pack(w)?;
}
Ok(())
}
}
//--------------------------------
const WORDS_PER_BITMAP: usize = (BLOCK_SIZE - 16) / 8;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BitmapEntry {
Small(u8),
Overflow,
}
#[derive(Debug)]
pub struct Bitmap {
pub blocknr: u64,
pub entries: Vec<BitmapEntry>,
}
impl Unpack for Bitmap {
fn disk_size() -> u32 {
BLOCK_SIZE as u32
}
fn unpack(data: &[u8]) -> IResult<&[u8], Self> {
let (i, _csum) = le_u32(data)?;
let (i, _not_used) = le_u32(i)?;
let (mut i, blocknr) = le_u64(i)?;
let header_size = 16;
let nr_words = (BLOCK_SIZE - header_size) / 8;
let mut entries = Vec::with_capacity(nr_words * 32);
for _w in 0..nr_words {
let (tmp, mut word) = le_u64(i)?;
for _b in 0..32 {
let val = word & 0x3;
word >>= 2;
// The bits are stored with the high bit at b * 2 + 1,
// and low at b *2. So we have to interpret this val.
entries.push(match val {
0 => BitmapEntry::Small(0),
1 => BitmapEntry::Small(2),
2 => BitmapEntry::Small(1),
_ => BitmapEntry::Overflow,
});
}
i = tmp;
}
Ok((i, Bitmap { blocknr, entries }))
}
}
impl Pack for Bitmap {
fn pack<W: WriteBytesExt>(&self, out: &mut W) -> Result<()> {
use BitmapEntry::*;
out.write_u32::<LittleEndian>(0)?;
out.write_u32::<LittleEndian>(0)?;
out.write_u64::<LittleEndian>(self.blocknr)?;
for chunk in self.entries.chunks(32) {
let mut w = 0u64;
for e in chunk {
w >>= 2;
match e {
Small(0) => {}
Small(1) => {
w |= 0x2 << 62;
}
Small(2) => {
w |= 0x1 << 62;
}
Small(_) => {
return Err(anyhow!("Bad small value in bitmap entry"));
}
Overflow => {
w |= 0x3 << 62;
}
}
}
u64::pack(&w, out)?;
}
Ok(())
}
}
//--------------------------------
#[derive(Debug)]
pub struct SMRoot {
pub nr_blocks: u64,
pub nr_allocated: u64,
pub bitmap_root: u64,
pub ref_count_root: u64,
}
impl Unpack for SMRoot {
fn disk_size() -> u32 {
32
}
fn unpack(i: &[u8]) -> IResult<&[u8], Self> {
let (i, nr_blocks) = le_u64(i)?;
let (i, nr_allocated) = le_u64(i)?;
let (i, bitmap_root) = le_u64(i)?;
let (i, ref_count_root) = le_u64(i)?;
Ok((
i,
SMRoot {
nr_blocks,
nr_allocated,
bitmap_root,
ref_count_root,
},
))
}
}
pub fn unpack_root(data: &[u8]) -> Result<SMRoot> {
match SMRoot::unpack(data) {
Err(_e) => Err(anyhow!("couldn't parse SMRoot")),
Ok((_i, v)) => Ok(v),
}
}
impl Pack for SMRoot {
fn pack<W: WriteBytesExt>(&self, w: &mut W) -> Result<()> {
w.write_u64::<LittleEndian>(self.nr_blocks)?;
w.write_u64::<LittleEndian>(self.nr_allocated)?;
w.write_u64::<LittleEndian>(self.bitmap_root)?;
w.write_u64::<LittleEndian>(self.ref_count_root)?;
Ok(())
}
}
//--------------------------------
pub fn write_common(w: &mut WriteBatcher, sm: &dyn SpaceMap) -> Result<(Vec<IndexEntry>, u64)> {
use BitmapEntry::*;
let mut index_entries = Vec::new();
let mut overflow_builder: Builder<u32> = Builder::new(Box::new(NoopRC {}));
// how many bitmaps do we need?
for bm in 0..div_up(sm.get_nr_blocks()? as usize, ENTRIES_PER_BITMAP) {
let mut entries = Vec::with_capacity(ENTRIES_PER_BITMAP);
let mut first_free: Option<u32> = None;
let mut nr_free: u32 = 0;
for i in 0..ENTRIES_PER_BITMAP {
let b: u64 = ((bm * ENTRIES_PER_BITMAP) as u64) + i as u64;
if b > sm.get_nr_blocks()? {
break;
}
let rc = sm.get(b)?;
let e = match rc {
0 => {
nr_free += 1;
if first_free.is_none() {
first_free = Some(i as u32);
}
Small(0)
}
1 => Small(1),
2 => Small(2),
_ => {
overflow_builder.push_value(w, b as u64, rc)?;
Overflow
}
};
entries.push(e);
}
// allocate a new block
let b = w.alloc()?;
let mut cursor = Cursor::new(b.get_data());
// write the bitmap to it
let blocknr = b.loc;
let bitmap = Bitmap { blocknr, entries };
bitmap.pack(&mut cursor)?;
w.write(b, checksum::BT::BITMAP)?;
// Insert into the index tree
let ie = IndexEntry {
blocknr,
nr_free,
none_free_before: first_free.unwrap_or(ENTRIES_PER_BITMAP as u32),
};
index_entries.push(ie);
}
let ref_count_root = overflow_builder.complete(w)?;
Ok((index_entries, ref_count_root))
}
pub fn write_disk_sm(w: &mut WriteBatcher, sm: &dyn SpaceMap) -> Result<SMRoot> {
let (index_entries, ref_count_root) = write_common(w, sm)?;
let mut index_builder: Builder<IndexEntry> = Builder::new(Box::new(NoopRC {}));
for (i, ie) in index_entries.iter().enumerate() {
index_builder.push_value(w, i as u64, *ie)?;
}
let bitmap_root = index_builder.complete(w)?;
Ok(SMRoot {
nr_blocks: sm.get_nr_blocks()?,
nr_allocated: sm.get_nr_allocated()?,
bitmap_root,
ref_count_root,
})
}
//----------------------------
fn block_to_bitmap(b: u64) -> usize {
(b / ENTRIES_PER_BITMAP as u64) as usize
}
fn adjust_counts(w: &mut WriteBatcher, ie: &IndexEntry, allocs: &[u64]) -> Result<IndexEntry> {
use BitmapEntry::*;
let mut first_free = ie.none_free_before;
let mut nr_free = ie.nr_free - allocs.len() as u32;
// Read the bitmap
let bitmap_block = w.engine.read(ie.blocknr)?;
let (_, mut bitmap) = Bitmap::unpack(bitmap_block.get_data())?;
// Update all the entries
for a in allocs {
if first_free == *a as u32 {
first_free = *a as u32 + 1;
}
if bitmap.entries[*a as usize] == Small(0) {
nr_free -= 1;
}
bitmap.entries[*a as usize] = Small(1);
}
// Write the bitmap
let mut cur = Cursor::new(bitmap_block.get_data());
bitmap.pack(&mut cur)?;
w.write(bitmap_block, checksum::BT::BITMAP)?;
// Return the adjusted index entry
Ok (IndexEntry {
blocknr: ie.blocknr,
nr_free,
none_free_before: first_free,
})
}
pub fn write_metadata_sm(w: &mut WriteBatcher, sm: &dyn SpaceMap) -> Result<SMRoot> {
w.clear_allocations();
let (mut indexes, ref_count_root) = write_common(w, sm)?;
let bitmap_root = w.alloc()?;
// Now we need to patch up the counts for the metadata that was used for storing
// the space map itself. These ref counts all went from 0 to 1.
let allocations = w.clear_allocations();
// Sort the allocations by bitmap
let mut by_bitmap = BTreeMap::new();
for b in allocations {
let bitmap = block_to_bitmap(b);
(*by_bitmap.entry(bitmap).or_insert(Vec::new())).push(b % ENTRIES_PER_BITMAP as u64);
}
for (bitmap, allocs) in by_bitmap {
indexes[bitmap] = adjust_counts(w, &indexes[bitmap], &allocs)?;
}
// Write out the metadata index
let metadata_index = MetadataIndex {
blocknr: bitmap_root.loc,
indexes
};
let mut cur = Cursor::new(bitmap_root.get_data());
metadata_index.pack(&mut cur)?;
let loc = bitmap_root.loc;
w.write(bitmap_root, checksum::BT::INDEX)?;
Ok(SMRoot {
nr_blocks: sm.get_nr_blocks()?,
nr_allocated: sm.get_nr_allocated()?,
bitmap_root: loc,
ref_count_root,
})
}
//--------------------------------

View File

@ -1,3 +1,5 @@
use anyhow::Result;
use byteorder::WriteBytesExt;
use nom::{number::complete::*, IResult};
use std::fmt;
@ -31,6 +33,13 @@ impl Unpack for BlockTime {
}
}
impl Pack for BlockTime {
fn pack<W: WriteBytesExt>(&self, data: &mut W) -> Result<()> {
let bt: u64 = (self.block << 24) | self.time as u64;
bt.pack(data)
}
}
impl fmt::Display for BlockTime {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} @ {}", self.block, self.time)

View File

@ -12,6 +12,7 @@ use crate::io_engine::{AsyncIoEngine, IoEngine, SyncIoEngine};
use crate::pdata::btree::{self, *};
use crate::pdata::btree_walker::*;
use crate::pdata::space_map::*;
use crate::pdata::space_map_disk::*;
use crate::pdata::unpack::*;
use crate::report::*;
use crate::thin::block_time::*;

View File

@ -1,7 +1,9 @@
use anyhow::Result;
use byteorder::{LittleEndian, WriteBytesExt};
use nom::{number::complete::*, IResult};
use std::fmt;
use crate::pdata::unpack::*;
use nom::{number::complete::*, IResult};
//------------------------------------------
@ -15,11 +17,11 @@ pub struct DeviceDetail {
impl fmt::Display for DeviceDetail {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "mapped = {}, trans = {}, create = {}, snap = {}",
self.mapped_blocks,
self.transaction_id,
self.creation_time,
self.snapshotted_time)?;
write!(
f,
"mapped = {}, trans = {}, create = {}, snap = {}",
self.mapped_blocks, self.transaction_id, self.creation_time, self.snapshotted_time
)?;
Ok(())
}
}
@ -47,4 +49,14 @@ impl Unpack for DeviceDetail {
}
}
impl Pack for DeviceDetail {
fn pack<W: WriteBytesExt>(&self, w: &mut W) -> Result<()> {
w.write_u64::<LittleEndian>(self.mapped_blocks)?;
w.write_u64::<LittleEndian>(self.transaction_id)?;
w.write_u32::<LittleEndian>(self.creation_time)?;
w.write_u32::<LittleEndian>(self.snapshotted_time)?;
Ok(())
}
}
//------------------------------------------

View File

@ -11,6 +11,7 @@ use crate::pdata::btree::{self, *};
use crate::pdata::btree_leaf_walker::*;
use crate::pdata::btree_walker::*;
use crate::pdata::space_map::*;
use crate::pdata::space_map_disk::*;
use crate::pdata::unpack::*;
use crate::report::*;
use crate::thin::block_time::*;
@ -287,7 +288,7 @@ fn find_shared_nodes(
// We have to get the leaves so w is consumed and the &mut on sm
// is dropped.
let leaves = w.get_leaves();
let _leaves = w.get_leaves();
let mut shared = BTreeSet::new();
{
for i in 0..sm.get_nr_blocks().unwrap() {
@ -297,6 +298,8 @@ fn find_shared_nodes(
}
}
/*
// FIXME: why?!!
// we're not interested in leaves (roots will get re-added later).
{
for i in 0..leaves.len() {
@ -305,6 +308,7 @@ fn find_shared_nodes(
}
}
}
*/
Ok((shared, sm))
}
@ -616,9 +620,11 @@ pub fn dump(opts: ThinDumpOptions) -> Result<()> {
let sb = read_superblock(ctx.engine.as_ref(), SUPERBLOCK_LOCATION)?;
let md = build_metadata(&ctx, &sb)?;
/*
ctx.report
.set_title("Optimising metadata to improve leaf packing");
let md = optimise_metadata(md)?;
*/
dump_metadata(&ctx, &sb, &md)
}

View File

@ -1,62 +1,203 @@
use anyhow::Result;
use anyhow::{anyhow, Result};
use std::collections::{BTreeMap, BTreeSet};
use std::collections::BTreeMap;
use std::fs::OpenOptions;
use std::path::Path;
use std::sync::Arc;
use crate::io_engine::*;
use crate::pdata::btree_builder::*;
use crate::pdata::space_map::*;
use crate::report::*;
use crate::thin::block_time::*;
use crate::thin::device_detail::*;
use crate::thin::superblock::*;
use crate::thin::superblock::{self, *};
use crate::thin::xml::{self, *};
use crate::write_batcher::*;
//------------------------------------------
#[derive(Default)]
struct Pass1 {
//
enum MappedSection {
Def(String),
Dev(u32),
}
impl MetadataVisitor for Pass1 {
impl std::fmt::Display for MappedSection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MappedSection::Def(name) => write!(f, "Def {}", name),
MappedSection::Dev(thin_id) => write!(f, "Device {}", thin_id),
}
}
}
struct Pass1Result {
sb: Option<xml::Superblock>,
devices: BTreeMap<u32, (DeviceDetail, Vec<NodeSummary>)>,
}
struct Pass1<'a> {
w: &'a mut WriteBatcher,
current_dev: Option<DeviceDetail>,
sub_trees: BTreeMap<String, Vec<NodeSummary>>,
// The builder for the current shared sub tree or device
map: Option<(MappedSection, NodeBuilder<BlockTime>)>,
result: Pass1Result,
}
impl<'a> Pass1<'a> {
fn new(w: &'a mut WriteBatcher) -> Self {
Pass1 {
w,
current_dev: None,
sub_trees: BTreeMap::new(),
map: None,
result: Pass1Result {
sb: None,
devices: BTreeMap::new(),
},
}
}
fn get_result(self) -> Pass1Result {
self.result
}
fn begin_section(&mut self, section: MappedSection) -> Result<Visit> {
if let Some((outer, _)) = self.map.as_ref() {
let msg = format!(
"Nested subtrees are not allowed '{}' within '{}'",
section, outer
);
return Err(anyhow!(msg));
}
let value_rc = Box::new(NoopRC {});
let leaf_builder = NodeBuilder::new(Box::new(LeafIO {}), value_rc);
self.map = Some((section, leaf_builder));
Ok(Visit::Continue)
}
fn end_section(&mut self) -> Result<(MappedSection, Vec<NodeSummary>)> {
let mut current = None;
std::mem::swap(&mut self.map, &mut current);
if let Some((name, nodes)) = current {
Ok((name, nodes.complete(self.w)?))
} else {
let msg = format!("Unbalanced </def> tag");
Err(anyhow!(msg))
}
}
}
impl<'a> MetadataVisitor for Pass1<'a> {
fn superblock_b(&mut self, sb: &xml::Superblock) -> Result<Visit> {
todo!();
self.result.sb = Some(sb.clone());
Ok(Visit::Continue)
}
fn superblock_e(&mut self) -> Result<Visit> {
todo!();
Ok(Visit::Continue)
}
fn def_shared_b(&mut self, name: &str) -> Result<Visit> {
todo!();
self.begin_section(MappedSection::Def(name.to_string()))
}
fn def_shared_e(&mut self) -> Result<Visit> {
todo!();
if let (MappedSection::Def(name), nodes) = self.end_section()? {
self.sub_trees.insert(name, nodes);
Ok(Visit::Continue)
} else {
Err(anyhow!("unexpected </def>"))
}
}
fn device_b(&mut self, d: &Device) -> Result<Visit> {
todo!();
self.current_dev = Some(DeviceDetail {
mapped_blocks: d.mapped_blocks,
transaction_id: d.transaction,
creation_time: d.creation_time as u32,
snapshotted_time: d.snap_time as u32,
});
self.begin_section(MappedSection::Dev(d.dev_id))
}
fn device_e(&mut self) -> Result<Visit> {
todo!();
if let Some(detail) = self.current_dev.take() {
if let (MappedSection::Dev(thin_id), nodes) = self.end_section()? {
self.result.devices.insert(thin_id, (detail, nodes));
Ok(Visit::Continue)
} else {
Err(anyhow!("internal error, couldn't find device details"))
}
} else {
Err(anyhow!("unexpected </device>"))
}
}
fn map(&mut self, m: &Map) -> Result<Visit> {
todo!();
if let Some((_name, _builder)) = self.map.as_mut() {
for i in 0..m.len {
let bt = BlockTime {
block: m.data_begin + i,
time: m.time,
};
let (_, builder) = self.map.as_mut().unwrap();
builder.push_value(self.w, m.thin_begin + i, bt)?;
}
Ok(Visit::Continue)
} else {
let msg = format!("Mapping tags must appear within a <def> or <device> tag.");
Err(anyhow!(msg))
}
}
fn ref_shared(&mut self, name: &str) -> Result<Visit> {
todo!();
if self.current_dev.is_none() {
return Err(anyhow!(
"<ref> tags may only occur within <device> sections."
));
}
if let Some(leaves) = self.sub_trees.get(name) {
// We could be in a <def> or <device>
if let Some((_name, builder)) = self.map.as_mut() {
builder.push_nodes(self.w, leaves)?;
} else {
let msg = format!(
"<ref name=\"{}\"> tag must be within either a <def> or <device> section",
name
);
return Err(anyhow!(msg));
}
Ok(Visit::Continue)
} else {
let msg = format!("Couldn't find sub tree '{}'.", name);
Err(anyhow!(msg))
}
}
fn eof(&mut self) -> Result<Visit> {
todo!();
// FIXME: build the rest of the device trees
Ok(Visit::Continue)
}
}
//------------------------------------------
/*
/// Writes a data space map to disk. Returns the space map root that needs
/// to be written to the superblock.
fn build_data_sm(batcher: WriteBatcher, sm: Box<dyn SpaceMap>) -> Result<Vec<u8>> {
}
*/
//------------------------------------------
pub struct ThinRestoreOptions<'a> {
@ -66,6 +207,29 @@ pub struct ThinRestoreOptions<'a> {
pub report: Arc<Report>,
}
struct Context {
report: Arc<Report>,
engine: Arc<dyn IoEngine + Send + Sync>,
}
const MAX_CONCURRENT_IO: u32 = 1024;
fn new_context(opts: &ThinRestoreOptions) -> Result<Context> {
let engine: Arc<dyn IoEngine + Send + Sync>;
if opts.async_io {
engine = Arc::new(AsyncIoEngine::new(opts.output, MAX_CONCURRENT_IO, true)?);
} else {
let nr_threads = std::cmp::max(8, num_cpus::get() * 2);
engine = Arc::new(SyncIoEngine::new(opts.output, nr_threads, true)?);
}
Ok(Context {
report: opts.report.clone(),
engine,
})
}
//------------------------------------------
pub fn restore(opts: ThinRestoreOptions) -> Result<()> {
@ -74,8 +238,66 @@ pub fn restore(opts: ThinRestoreOptions) -> Result<()> {
.write(false)
.open(opts.input)?;
let mut pass = Pass1::default();
let ctx = new_context(&opts)?;
let max_count = u32::MAX;
let sm = core_sm(ctx.engine.get_nr_blocks(), max_count);
let mut w = WriteBatcher::new(ctx.engine.clone(), sm.clone(), ctx.engine.get_batch_size());
let mut pass = Pass1::new(&mut w);
xml::read(input, &mut pass)?;
let pass = pass.get_result();
// Build the device details tree.
let mut details_builder: Builder<DeviceDetail> = Builder::new(Box::new(NoopRC {}));
for (thin_id, (detail, _)) in &pass.devices {
details_builder.push_value(&mut w, *thin_id as u64, *detail)?;
}
let details_root = details_builder.complete(&mut w)?;
// Build the individual mapping trees that make up the bottom layer.
let mut devs: BTreeMap<u32, u64> = BTreeMap::new();
for (thin_id, (_, nodes)) in &pass.devices {
ctx.report
.info(&format!("building btree for device {}", thin_id));
let mut builder: Builder<BlockTime> = Builder::new(Box::new(NoopRC {}));
builder.push_leaves(&mut w, nodes)?;
let root = builder.complete(&mut w)?;
devs.insert(*thin_id, root);
}
// Build the top level mapping tree
let mut builder: Builder<u64> = Builder::new(Box::new(NoopRC {}));
for (thin_id, root) in devs {
builder.push_value(&mut w, thin_id as u64, root)?;
}
let mapping_root = builder.complete(&mut w)?;
// Build data space map
// FIXME: I think we need to decrement the shared leaves
// Build metadata space map
// Write the superblock
if let Some(xml_sb) = pass.sb {
let sb = superblock::Superblock {
flags: SuperblockFlags { needs_check: false },
block: SUPERBLOCK_LOCATION,
version: 2,
time: xml_sb.time as u32,
transaction_id: xml_sb.transaction,
metadata_snap: 0,
data_sm_root: vec![0; SPACE_MAP_ROOT_SIZE],
metadata_sm_root: vec![0; SPACE_MAP_ROOT_SIZE],
mapping_root,
details_root,
data_block_size: xml_sb.data_block_size,
nr_metadata_blocks: ctx.engine.get_nr_blocks(),
};
write_superblock(ctx.engine.as_ref(), SUPERBLOCK_LOCATION, &sb)?;
} else {
return Err(anyhow!("No superblock found in xml file"));
}
Ok(())
}

View File

@ -1,10 +1,18 @@
use crate::io_engine::*;
use anyhow::{anyhow, Result};
use byteorder::{LittleEndian, WriteBytesExt};
use nom::{bytes::complete::*, number::complete::*, IResult};
use std::fmt;
use std::io::Cursor;
use crate::io_engine::*;
use crate::checksum::*;
//----------------------------------------
pub const MAGIC: u64 = 27022010;
pub const SUPERBLOCK_LOCATION: u64 = 0;
//const UUID_SIZE: usize = 16;
const SPACE_MAP_ROOT_SIZE: usize = 128;
const UUID_SIZE: usize = 16;
pub const SPACE_MAP_ROOT_SIZE: usize = 128;
#[derive(Debug, Clone)]
pub struct SuperblockFlags {
@ -35,36 +43,9 @@ pub struct Superblock {
pub mapping_root: u64,
pub details_root: u64,
pub data_block_size: u32,
pub nr_metadata_blocks: u64,
}
/*
pub enum CheckSeverity {
Fatal,
NonFatal,
}
pub trait CheckError {
fn severity(&self) -> CheckSeverity;
fn block(&self) -> u64;
fn sub_errors(&self) -> Vec<Box<dyn CheckError>>;
}
enum ErrorType {
BadChecksum,
BadBlockType(&'static str),
BadBlock(u64),
BadVersion(u32),
MetadataSnapOutOfBounds(u64),
MappingRootOutOfBounds(u64),
DetailsRootOutOfBounds(u64),
}
struct SuperblockError {
severity: CheckSeverity,
kind: ErrorType,
}
*/
fn unpack(data: &[u8]) -> IResult<&[u8], Superblock> {
let (i, _csum) = le_u32(data)?;
let (i, flags) = le_u32(i)?;
@ -81,7 +62,7 @@ fn unpack(data: &[u8]) -> IResult<&[u8], Superblock> {
let (i, details_root) = le_u64(i)?;
let (i, data_block_size) = le_u32(i)?;
let (i, _metadata_block_size) = le_u32(i)?;
let (i, _metadata_nr_blocks) = le_u64(i)?;
let (i, nr_metadata_blocks) = le_u64(i)?;
Ok((
i,
@ -100,6 +81,7 @@ fn unpack(data: &[u8]) -> IResult<&[u8], Superblock> {
mapping_root,
details_root,
data_block_size,
nr_metadata_blocks,
},
))
}
@ -115,3 +97,51 @@ pub fn read_superblock(engine: &dyn IoEngine, loc: u64) -> Result<Superblock> {
}
//------------------------------
fn pack_superblock<W: WriteBytesExt>(sb: &Superblock, w: &mut W) -> Result<()> {
// checksum, which we don't know yet
w.write_u32::<LittleEndian>(0)?;
// flags
if sb.flags.needs_check {
w.write_u32::<LittleEndian>(0x1)?;
} else {
w.write_u32::<LittleEndian>(0)?;
}
w.write_u64::<LittleEndian>(sb.block)?;
w.write_all(&vec![0; UUID_SIZE])?;
w.write_u64::<LittleEndian>(MAGIC)?;
w.write_u32::<LittleEndian>(sb.version)?;
w.write_u32::<LittleEndian>(sb.time)?;
w.write_u64::<LittleEndian>(sb.transaction_id)?;
w.write_u64::<LittleEndian>(sb.metadata_snap)?;
w.write_all(&vec![0; SPACE_MAP_ROOT_SIZE])?; // data sm root
w.write_all(&vec![0; SPACE_MAP_ROOT_SIZE])?; // metadata sm root
w.write_u64::<LittleEndian>(sb.mapping_root)?;
w.write_u64::<LittleEndian>(sb.details_root)?;
w.write_u32::<LittleEndian>(sb.data_block_size)?;
w.write_u32::<LittleEndian>(BLOCK_SIZE as u32)?;
w.write_u64::<LittleEndian>(sb.nr_metadata_blocks)?;
Ok(())
}
pub fn write_superblock(engine: &dyn IoEngine, _loc: u64, sb: &Superblock) -> Result<()> {
let b = Block::zeroed(SUPERBLOCK_LOCATION);
// pack the superblock
{
let mut cursor = Cursor::new(b.get_data());
pack_superblock(sb, &mut cursor)?;
}
// calculate the checksum
write_checksum(b.get_data(), BT::SUPERBLOCK)?;
// write
engine.write(&b)?;
Ok(())
}
//------------------------------

View File

@ -1,4 +1,4 @@
use anyhow::Result;
use anyhow::{anyhow, Result};
use std::{borrow::Cow, fmt::Display, io::prelude::*, io::BufReader, io::Write};
use quick_xml::events::attributes::Attribute;
@ -46,9 +46,11 @@ pub trait MetadataVisitor {
fn superblock_b(&mut self, sb: &Superblock) -> Result<Visit>;
fn superblock_e(&mut self) -> Result<Visit>;
// Defines a shared sub tree. May only contain a 'map' (no 'ref' allowed).
fn def_shared_b(&mut self, name: &str) -> Result<Visit>;
fn def_shared_e(&mut self) -> Result<Visit>;
// A device contains a number of 'map' or 'ref' items.
fn device_b(&mut self, d: &Device) -> Result<Visit>;
fn device_e(&mut self) -> Result<Visit>;
@ -207,8 +209,9 @@ fn bad_attr<T>(_tag: &str, _attr: &[u8]) -> Result<T> {
todo!();
}
fn missing_attr<T>(_tag: &str, _attr: &str) -> Result<T> {
todo!();
fn missing_attr<T>(tag: &str, attr: &str) -> Result<T> {
let msg = format!("missing attribute '{}' for tag '{}", attr, tag);
Err(anyhow!(msg))
}
fn check_attr<T>(tag: &str, name: &str, maybe_v: Option<T>) -> Result<T> {
@ -257,6 +260,24 @@ fn parse_superblock(e: &BytesStart) -> Result<Superblock> {
})
}
fn parse_def(e: &BytesStart, tag: &str) -> Result<String> {
let mut name: Option<String> = None;
for a in e.attributes() {
let kv = a.unwrap();
match kv.key {
b"name" => {
name = Some(string_val(&kv));
},
_ => {
return bad_attr(tag, kv.key)
}
}
}
Ok(name.unwrap())
}
fn parse_device(e: &BytesStart) -> Result<Device> {
let mut dev_id: Option<u32> = None;
let mut mapped_blocks: Option<u64> = None;
@ -348,16 +369,19 @@ where
Ok(Event::Start(ref e)) => match e.name() {
b"superblock" => visitor.superblock_b(&parse_superblock(e)?),
b"device" => visitor.device_b(&parse_device(e)?),
b"def" => visitor.def_shared_b(&parse_def(e, "def")?),
_ => todo!(),
},
Ok(Event::End(ref e)) => match e.name() {
b"superblock" => visitor.superblock_e(),
b"device" => visitor.device_e(),
b"def" => visitor.def_shared_e(),
_ => todo!(),
},
Ok(Event::Empty(ref e)) => match e.name() {
b"single_mapping" => visitor.map(&parse_single_map(e)?),
b"range_mapping" => visitor.map(&parse_range_map(e)?),
b"ref" => visitor.ref_shared(&parse_def(e, "ref")?),
_ => todo!(),
},
Ok(Event::Text(_)) => Ok(Visit::Continue),

View File

@ -1,4 +1,5 @@
use anyhow::{anyhow, Result};
use std::collections::BTreeSet;
use std::sync::{Arc, Mutex};
use crate::checksum;
@ -10,10 +11,13 @@ use crate::pdata::space_map::*;
#[derive(Clone)]
pub struct WriteBatcher {
pub engine: Arc<dyn IoEngine + Send + Sync>,
// FIXME: this doesn't need to be in a mutex
pub sm: Arc<Mutex<dyn SpaceMap>>,
batch_size: usize,
queue: Vec<Block>,
allocations: BTreeSet<u64>,
}
impl WriteBatcher {
@ -27,10 +31,11 @@ impl WriteBatcher {
sm,
batch_size,
queue: Vec::with_capacity(batch_size),
allocations: BTreeSet::new(),
}
}
pub fn alloc(&mut self) -> Result<u64> {
pub fn alloc(&mut self) -> Result<Block> {
let mut sm = self.sm.lock().unwrap();
let b = sm.alloc()?;
@ -38,23 +43,37 @@ impl WriteBatcher {
return Err(anyhow!("out of metadata space"));
}
Ok(b.unwrap())
Ok(Block::new(b.unwrap()))
}
pub fn clear_allocations(&mut self) -> BTreeSet<u64> {
let mut tmp = BTreeSet::new();
std::mem::swap(&mut tmp, &mut self.allocations);
tmp
}
pub fn write(&mut self, b: Block, kind: checksum::BT) -> Result<()> {
checksum::write_checksum(&mut b.get_data(), kind)?;
if self.queue.len() == self.batch_size {
self.flush()?;
let mut tmp = Vec::new();
std::mem::swap(&mut tmp, &mut self.queue);
self.flush_(tmp)?;
}
self.queue.push(b);
Ok(())
}
pub fn flush_(&mut self, queue: Vec<Block>) -> Result<()> {
self.engine.write_many(&queue)?;
Ok(())
}
pub fn flush(&mut self) -> Result<()> {
self.engine.write_many(&self.queue)?;
self.queue.clear();
let mut tmp = Vec::new();
std::mem::swap(&mut tmp, &mut self.queue);
self.flush_(tmp)?;
Ok(())
}
}