[caching] cache_writeback

This commit is contained in:
Joe Thornber 2016-06-14 16:29:37 +01:00
parent b77ba14a2a
commit 8989bb0f32

View File

@ -1,3 +1,4 @@
#include "base/progress_monitor.h"
#include "persistent-data/file_utils.h"
#include "block-cache/copier.h"
#include "caching/commands.h"
@ -18,28 +19,99 @@ using namespace std;
//----------------------------------------------------------------
namespace {
template <typename T> T safe_div(T const n, T const d, T const def) {
return (d == T()) ? def : (n / d);
}
//--------------------------------
struct flags {
flags()
: cache_size(1024 * 1024 * 128) {
: cache_size(4 * 1024 * 1024),
sort_buffers(16 * 1024),
list_failed_blocks(false),
update_metadata(true) {
}
// The sort buffers have a dramatic effect on the
// performance. We give up 10% of the general buffer space
// for them.
void calc_sort_buffer_size() {
size_t sbs = cache_size / 10;
cache_size = cache_size - sbs;
sort_buffers = sbs / sizeof(copy_op);
}
using maybe_string = boost::optional<string>;
size_t cache_size;
unsigned sort_buffers;
maybe_string metadata_dev;
maybe_string origin_dev;
maybe_string fast_dev;
bool list_failed_blocks;
bool update_metadata;
};
//--------------------------------
class copy_batch {
public:
copy_batch(unsigned nr)
: max_(nr),
count_(0),
ops_(nr) {
}
bool space() const {
return count_ < max_;
}
void push_op(copy_op const &op) {
if (!space())
throw runtime_error("copy_batch out of space");
ops_[count_++] = op;
}
void reset() {
count_ = 0;
}
vector<copy_op>::iterator begin() {
return ops_.begin();
}
vector<copy_op>::iterator end() {
return ops_.begin() + count_;
}
private:
unsigned max_;
unsigned count_;
vector<copy_op> ops_;
};
class copy_visitor : public mapping_visitor {
public:
copy_visitor(copier &c, bool only_dirty)
copy_visitor(copier &c, unsigned sort_buffer, bool only_dirty,
bool list_failed_blocks,
progress_monitor &monitor, unsigned cache_blocks)
: copier_(c),
block_size_(c.get_block_size()),
only_dirty_(only_dirty) {
only_dirty_(only_dirty),
list_failed_blocks_(list_failed_blocks),
batch_(sort_buffer),
monitor_(monitor),
cache_blocks_(cache_blocks) {
}
virtual void visit(block_address cblock, mapping const &m) {
stats_.blocks_scanned = cblock;
update_monitor();
if (!(m.flags_ & M_VALID))
return;
@ -52,41 +124,122 @@ namespace {
cop.dest_b = m.oblock_;
// blocks
copier_.issue(cop);
stats_.blocks_needed++;
batch_.push_op(cop);
if (!batch_.space())
issue();
}
stats_.blocks_issued++;
void issue() {
auto compare_dest = [](copy_op const &lhs, copy_op const &rhs) {
return lhs.dest_b < rhs.dest_b;
};
sort(batch_.begin(), batch_.end(), compare_dest);
#if 0
if (sectors_copied < block_size_) {
stats_.blocks_failed++;
stats_.sectors_failed += block_size_ - sectors_copied;
auto e = batch_.end();
for (auto it = batch_.begin(); it != e; ++it) {
copier_.issue(*it);
stats_.blocks_issued++;
update_monitor();
check_for_completed_copies();
}
#endif
check_for_completed_copies();
batch_.reset();
}
void check_for_completed_copies(bool block = false) {
optional<copy_op> mop;
do {
if (block)
mop = copier_.wait();
else {
unsigned micro = 0;
mop = copier_.wait(micro);
}
if (mop) {
inc_completed(*mop);
if (!mop->success()) {
failed_blocks_.insert(*mop);
failed_cblocks_.insert(mop->src_b);
}
}
} while (mop);
}
void complete() {
issue();
while (copier_.nr_pending())
check_for_completed_copies(true);
monitor_.update_percent(100);
cerr << "\n";
}
void inc_completed(copy_op const &op) {
stats_.blocks_completed++;
update_monitor();
}
void update_monitor() {
static unsigned call_count = 0;
if (call_count++ % 128)
return;
uint64_t scanned = stats_.blocks_scanned * 100 / cache_blocks_;
uint64_t copied = safe_div<block_address>(stats_.blocks_completed * 100,
stats_.blocks_needed, 100ull);
uint64_t percent = min<uint64_t>(scanned, copied);
monitor_.update_percent(percent);
}
struct copy_stats {
copy_stats()
: blocks_issued(0),
blocks_failed(0),
sectors_failed(0) {
: blocks_scanned(0),
blocks_needed(0),
blocks_issued(0),
blocks_completed(0),
blocks_failed(0) {
}
block_address blocks_scanned;
block_address blocks_needed;
block_address blocks_issued;
block_address blocks_completed;
block_address blocks_failed;
block_address sectors_failed;
};
copy_stats const &get_stats() const {
return stats_;
}
set<block_address> failed_writebacks() const {
return failed_cblocks_;
}
private:
copier &copier_;
unsigned block_size_;
bool only_dirty_;
bool list_failed_blocks_;
copy_stats stats_;
copy_batch batch_;
progress_monitor &monitor_;
unsigned cache_blocks_;
set<copy_op> failed_blocks_;
set<block_address> failed_cblocks_;
};
//--------------------------------
using namespace mapping_array_damage;
class ignore_damage_visitor : public damage_visitor {
@ -117,25 +270,65 @@ namespace {
return md.sb_.flags.get_flag(superblock_flags::CLEAN_SHUTDOWN);
}
void update_metadata(metadata &md, set<block_address> const &failed_writebacks) {
cout << "Updating metadata ... ";
cout.flush();
auto &mappings = md.mappings_;
for (block_address cblock = 0; cblock < mappings->get_nr_entries(); cblock++) {
auto m = mappings->get(cblock);
if (!(m.flags_ & M_VALID))
continue;
if (!(m.flags_ & M_DIRTY))
continue;
if (failed_writebacks.count(cblock))
continue;
m.flags_ &= ~M_DIRTY;
cerr << "clearing dirty flag for block " << cblock << "\n";
mappings->set(cblock, m);
}
md.commit(true);
cout << "done\n";
cout.flush();
}
int writeback_(flags const &f) {
block_manager<>::ptr bm = open_bm(*f.metadata_dev, block_manager<>::READ_ONLY);
block_manager<>::ptr bm = open_bm(*f.metadata_dev, block_manager<>::READ_WRITE);
metadata md(bm, metadata::OPEN);
aio_engine engine(f.cache_size / md.sb_.data_block_size);
// FIXME: we're going to have to copy runs to get the through put with small block sizes
unsigned max_ios = f.cache_size / (md.sb_.data_block_size << SECTOR_SHIFT);
aio_engine engine(max_ios);
copier c(engine, *f.fast_dev, *f.origin_dev,
md.sb_.data_block_size, f.cache_size);
copy_visitor cv(c, clean_shutdown(md));
auto bar = create_progress_bar("Copying data");
copy_visitor cv(c, f.sort_buffers, clean_shutdown(md), f.list_failed_blocks,
*bar, md.sb_.cache_blocks);
ignore_damage_visitor dv;
walk_mapping_array(*md.mappings_, cv, dv);
cv.complete();
auto stats = cv.get_stats();
cout << stats.blocks_issued << " copies issued\n"
<< stats.blocks_failed << " copies failed\n";
cout << stats.blocks_issued - stats.blocks_failed << "/"
<< stats.blocks_issued << " blocks successfully copied.\n";
if (stats.blocks_failed)
cout << stats.sectors_failed << " sectors were not copied\n";
cout << stats.blocks_failed << " blocks were not copied\n";
if (dv.was_corruption())
if (dv.was_corruption()) {
cout << "Metadata corruption was found, some data may not have been copied.\n";
if (f.update_metadata)
cout << "Unable to update metadata.\n";
} else if (f.update_metadata)
update_metadata(md, cv.failed_writebacks());
return (stats.blocks_failed || dv.was_corruption()) ? 1 : 0;
}
@ -169,6 +362,9 @@ cache_writeback_cmd::usage(std::ostream &out) const
<< "\t\t--metadata-device <dev>\n"
<< "\t\t--origin-device <dev>\n"
<< "\t\t--fast-device <dev>\n"
<< "\t\t--buffer-size-meg <size>\n"
<< "\t\t--list-failed-blocks\n"
<< "\t\t--no-metadata-update\n"
<< "Options:\n"
<< " {-h|--help}\n"
<< " {-V|--version}" << endl;
@ -184,6 +380,9 @@ cache_writeback_cmd::run(int argc, char **argv)
{ "metadata-device", required_argument, NULL, 0 },
{ "origin-device", required_argument, NULL, 1 },
{ "fast-device", required_argument, NULL, 2 },
{ "buffer-size-meg", required_argument, NULL, 3 },
{ "list-failed-blocks", no_argument, NULL, 4 },
{ "no-metadata-update", no_argument, NULL, 5 },
{ "help", no_argument, NULL, 'h'},
{ "version", no_argument, NULL, 'V'},
{ NULL, no_argument, NULL, 0 }
@ -203,6 +402,18 @@ cache_writeback_cmd::run(int argc, char **argv)
fs.fast_dev = optarg;
break;
case 3:
fs.cache_size = parse_uint64(optarg, "buffer size") * 1024 * 1024;
break;
case 4:
fs.list_failed_blocks = true;
break;
case 5:
fs.update_metadata = false;
break;
case 'h':
usage(cout);
return 0;
@ -217,6 +428,8 @@ cache_writeback_cmd::run(int argc, char **argv)
}
}
fs.calc_sort_buffer_size();
if (argc != optind) {
usage(cerr);
return 1;
@ -241,7 +454,6 @@ cache_writeback_cmd::run(int argc, char **argv)
}
return writeback(fs);
}
//----------------------------------------------------------------