[block-cache] unit tests + debug io_engine and copier

This commit is contained in:
Joe Thornber
2016-06-07 11:12:27 +01:00
parent 34c039d7dc
commit a94bfea798
12 changed files with 560 additions and 150 deletions

View File

@@ -8,18 +8,25 @@ using namespace std;
//----------------------------------------------------------------
copier::copier(string const &src, string const &dest,
copier::copier(io_engine &engine,
string const &src, string const &dest,
sector_t block_size, size_t mem)
: pool_(block_size, mem),
: pool_(block_size * 512, mem),
block_size_(block_size),
nr_blocks_(mem / block_size),
engine_(nr_blocks_),
engine_(engine),
src_handle_(engine_.open_file(src, io_engine::READ_ONLY)),
dest_handle_(engine_.open_file(dest, io_engine::READ_WRITE)),
genkey_count_(0)
{
}
copier::~copier()
{
engine_.close_file(src_handle_);
engine_.close_file(dest_handle_);
}
void
copier::issue(copy_op const &op)
{
@@ -37,14 +44,17 @@ copier::issue(copy_op const &op)
job.op.read_complete = job.op.write_complete = false;
unsigned key = genkey(); // used as context for the io_engine
cerr << "data = " << data << "\n";
engine_.issue_io(src_handle_,
io_engine::READ,
to_sector(op.src_b),
to_sector(op.src_e),
data,
key);
jobs_.insert(make_pair(key, job));
auto r = engine_.issue_io(src_handle_,
io_engine::READ,
to_sector(op.src_b),
to_sector(op.src_e),
data,
key);
if (r)
jobs_.insert(make_pair(key, job));
else
complete(job);
}
unsigned
@@ -56,7 +66,7 @@ copier::nr_pending() const
boost::optional<copy_op>
copier::wait()
{
while (complete_.empty() && !jobs_.empty())
while (!jobs_.empty() && complete_.empty())
wait_();
if (complete_.empty())
@@ -77,28 +87,31 @@ copier::wait_()
if (it == jobs_.end())
throw runtime_error("Internal error. Lost track of copy job.");
copy_job j = it->second;
copy_job &j = it->second;
if (!p.first) {
// IO was unsuccessful
jobs_.erase(it);
complete(j);
jobs_.erase(it);
return;
}
// IO was successful
if (!j.op.read_complete) {
j.op.read_complete = true;
engine_.issue_io(dest_handle_,
io_engine::WRITE,
to_sector(j.op.dest_b),
to_sector(j.op.dest_b + (j.op.src_e - j.op.src_b)),
j.data,
it->first);
if (!engine_.issue_io(dest_handle_,
io_engine::WRITE,
to_sector(j.op.dest_b),
to_sector(j.op.dest_b + (j.op.src_e - j.op.src_b)),
j.data,
it->first)) {
complete(j);
jobs_.erase(it);
}
} else {
j.op.write_complete = true;
jobs_.erase(it);
complete(j);
jobs_.erase(it);
}
}

View File

@@ -15,11 +15,13 @@ namespace bcache {
struct copy_op {
copy_op()
: read_complete(false),
: src_b(0),
src_e(0),
dest_b(0),
read_complete(false),
write_complete(false) {
}
copy_op(block_address src_b_,
block_address src_e_,
block_address dest_b_)
@@ -30,6 +32,10 @@ namespace bcache {
write_complete(false) {
}
bool success() const {
return read_complete && write_complete;
}
block_address src_b, src_e;
block_address dest_b;
@@ -49,8 +55,10 @@ namespace bcache {
class copier {
public:
copier(std::string const &src, std::string const &dest,
copier(io_engine &engine,
std::string const &src, std::string const &dest,
sector_t block_size, size_t mem);
~copier();
sector_t get_block_size() const {
return block_size_;
@@ -72,7 +80,7 @@ namespace bcache {
mempool pool_;
sector_t block_size_;
unsigned nr_blocks_;
io_engine engine_;
io_engine &engine_;
io_engine::handle src_handle_;
io_engine::handle dest_handle_;
unsigned genkey_count_;

View File

@@ -12,7 +12,12 @@ using namespace bcache;
using namespace boost;
using namespace std;
#define SECTOR_SHIFT 9
//----------------------------------------------------------------
namespace {
unsigned const SECTOR_SHIFT = 9;
unsigned const PAGE_SIZE = 4096;
}
//----------------------------------------------------------------
@@ -55,23 +60,22 @@ control_block_set::context(iocb *cb) const
//----------------------------------------------------------------
io_engine::io_engine(unsigned max_io)
aio_engine::aio_engine(unsigned max_io)
: aio_context_(0),
cbs_(max_io),
events_(max_io)
cbs_(max_io)
{
int r = io_setup(max_io, &aio_context_);
if (r < 0)
throw runtime_error("io_setup failed");
}
io_engine::~io_engine()
aio_engine::~aio_engine()
{
io_destroy(aio_context_);
}
io_engine::handle
io_engine:: open_file(std::string const &path, mode m)
aio_engine::handle
aio_engine::open_file(std::string const &path, mode m)
{
int flags = (m == READ_ONLY) ? O_RDONLY : O_RDWR;
int fd = ::open(path.c_str(), O_DIRECT | flags);
@@ -87,7 +91,7 @@ io_engine:: open_file(std::string const &path, mode m)
}
void
io_engine::close_file(handle h)
aio_engine::close_file(handle h)
{
for (auto it = descriptors_.begin(); it != descriptors_.end(); ++it) {
unsigned it_h = it->get();
@@ -103,71 +107,60 @@ io_engine::close_file(handle h)
}
bool
io_engine::issue_io(handle h, dir d, sector_t b, sector_t e, void *data, unsigned context)
aio_engine::issue_io(handle h, dir d, sector_t b, sector_t e, void *data, unsigned context)
{
auto cb = cbs_.alloc(context);
if (reinterpret_cast<uint64_t>(data) & (PAGE_SIZE - 1))
throw runtime_error("Data passed to issue_io must be page aligned\n");
iocb *cb;
cb = cbs_.alloc(context);
if (!cb)
return false;
memset(cb, 0, sizeof(*cb));
cb->aio_fildes = static_cast<int>(h);
cb->u.c.buf = data;
cb->u.c.offset = b << SECTOR_SHIFT;
cb->u.c.nbytes = (e - b) << SECTOR_SHIFT;
cb->aio_lio_opcode = (d == READ) ? IO_CMD_PREAD : IO_CMD_PWRITE;
int r = io_submit(aio_context_, 1, &cb);
if (r != 1) {
std::ostringstream out;
out << "couldn't issue "
<< ((d == READ) ? "READ" : "WRITE")
<< " io: io_submit ";
if (r < 0)
out << "failed with " << r;
else
out << "succeeded, but queued no io";
throw std::runtime_error(out.str());
}
return true;
return r == 1;
}
std::pair<bool, io_engine::handle>
io_engine::wait()
aio_engine::wait()
{
int r;
unsigned i;
struct io_event event;
r = io_getevents(aio_context_, 1, events_.size(), &events_[0], NULL);
memset(&event, 0, sizeof(event));
r = io_getevents(aio_context_, 1, 1, &event, NULL);
if (r < 0) {
std::ostringstream out;
out << "io_getevents failed: " << r;
throw std::runtime_error(out.str());
}
for (i = 0; i < static_cast<unsigned>(r); i++) {
io_event const &e = events_[i];
iocb *cb = reinterpret_cast<iocb *>(e.obj);
unsigned context = cbs_.context(cb);
iocb *cb = reinterpret_cast<iocb *>(event.obj);
unsigned context = cbs_.context(cb);
if (event.res == cb->u.c.nbytes) {
cbs_.free(cb);
return make_pair(true, context);
if (e.res == cb->u.c.nbytes)
return make_pair(true, context);
} else if (static_cast<int>(event.res) < 0) {
cbs_.free(cb);
return make_pair(false, context);
else {
std::ostringstream out;
out << "io failed"
<< ", e.res = " << e.res
<< ", e.res2 = " << e.res2
<< ", offset = " << cb->u.c.offset
<< ", nbytes = " << cb->u.c.nbytes;
return make_pair(false, context);
}
} else {
cbs_.free(cb);
return make_pair(false, context);
}
// shouldn't get here
return make_pair(false, 0);
}

View File

@@ -5,16 +5,50 @@
#include <boost/optional.hpp>
#include <ctype.h>
#include <libaio.h>
#include <set>
#include <string>
#include <libaio.h>
//----------------------------------------------------------------
namespace bcache {
using sector_t = uint64_t;
//----------------
// Virtual base class to aid unit testing
class io_engine {
public:
enum mode {
READ_ONLY,
READ_WRITE
};
enum dir {
READ,
WRITE
};
io_engine() {}
virtual ~io_engine() {}
using handle = unsigned;
virtual handle open_file(std::string const &path, mode m) = 0;
virtual void close_file(handle h) = 0;
// returns false if there are insufficient resources to
// queue the IO
virtual bool issue_io(handle h, dir d, sector_t b, sector_t e, void *data, unsigned context) = 0;
// returns (success, context)
using wait_result = std::pair<bool, unsigned>;
virtual wait_result wait() = 0;
private:
io_engine(io_engine const &) = delete;
io_engine &operator =(io_engine const &) = delete;
};
//--------------------------------
class control_block_set {
public:
@@ -37,43 +71,32 @@ namespace bcache {
//----------------
class io_engine {
class aio_engine : public io_engine {
public:
enum mode {
READ_ONLY,
READ_WRITE
};
enum dir {
READ,
WRITE
};
// max_io is the maximum nr of concurrent ios expected
io_engine(unsigned max_io);
~io_engine();
aio_engine(unsigned max_io);
~aio_engine();
using handle = unsigned;
handle open_file(std::string const &path, mode m);
void close_file(handle h);
// FIXME: open exclusive?
virtual handle open_file(std::string const &path, mode m);
virtual void close_file(handle h);
// returns false if there are insufficient resources to
// queue the IO
bool issue_io(handle h, dir d, sector_t b, sector_t e, void *data, unsigned context);
// Returns false if queueing the io failed
virtual bool issue_io(handle h, dir d, sector_t b, sector_t e, void *data, unsigned context);
// returns (success, context)
std::pair<bool, unsigned> wait();
virtual wait_result wait();
private:
std::list<base::unique_fd> descriptors_;
io_context_t aio_context_;
control_block_set cbs_;
std::vector<io_event> events_;
io_engine(io_engine const &) = delete;
io_engine &operator =(io_engine const &) = delete;
aio_engine(io_engine const &) = delete;
aio_engine &operator =(io_engine const &) = delete;
};
}