[block-cache] Fix some bugs in the copier

This commit is contained in:
Joe Thornber 2016-06-14 16:27:17 +01:00
parent 0f778a0a38
commit a124b7ce26
4 changed files with 188 additions and 58 deletions

View File

@ -11,12 +11,12 @@ using namespace std;
copier::copier(io_engine &engine, copier::copier(io_engine &engine,
string const &src, string const &dest, string const &src, string const &dest,
sector_t block_size, size_t mem) sector_t block_size, size_t mem)
: pool_(block_size * 512, mem), : pool_(block_size * 512, mem, PAGE_SIZE),
block_size_(block_size), block_size_(block_size),
nr_blocks_(mem / block_size), nr_blocks_(mem / block_size),
engine_(engine), engine_(engine),
src_handle_(engine_.open_file(src, io_engine::READ_ONLY)), src_handle_(engine_.open_file(src, io_engine::M_READ_ONLY)),
dest_handle_(engine_.open_file(dest, io_engine::READ_WRITE)), dest_handle_(engine_.open_file(dest, io_engine::M_READ_WRITE)),
genkey_count_(0) genkey_count_(0)
{ {
} }
@ -30,14 +30,14 @@ copier::~copier()
void void
copier::issue(copy_op const &op) copier::issue(copy_op const &op)
{ {
auto data = pool_.alloc(); void *data;
if (!data) {
wait_();
data = pool_.alloc();
if (!data) while (!(data = pool_.alloc())) {
// Shouldn't get here wait_();
throw runtime_error("couldn't allocate buffer");
// data may still not be present because the wait_ could
// have completed a read and issued the corresponding
// write.
} }
copy_job job(op, data); copy_job job(op, data);
@ -45,7 +45,7 @@ copier::issue(copy_op const &op)
unsigned key = genkey(); // used as context for the io_engine unsigned key = genkey(); // used as context for the io_engine
auto r = engine_.issue_io(src_handle_, auto r = engine_.issue_io(src_handle_,
io_engine::READ, io_engine::D_READ,
to_sector(op.src_b), to_sector(op.src_b),
to_sector(op.src_e), to_sector(op.src_e),
data, data,
@ -53,6 +53,7 @@ copier::issue(copy_op const &op)
if (r) if (r)
jobs_.insert(make_pair(key, job)); jobs_.insert(make_pair(key, job));
else else
complete(job); complete(job);
} }
@ -66,23 +67,74 @@ copier::nr_pending() const
boost::optional<copy_op> boost::optional<copy_op>
copier::wait() copier::wait()
{ {
while (!jobs_.empty() && complete_.empty()) if (complete_.empty())
wait_(); wait_();
return wait_complete();
}
boost::optional<copy_op>
copier::wait(unsigned &micro)
{
if (complete_.empty()) if (complete_.empty())
wait_(micro);
return wait_complete();
}
bool
copier::pending() const
{
return !jobs_.empty();
}
boost::optional<copy_op>
copier::wait_complete()
{
if (complete_.empty()) {
return optional<copy_op>(); return optional<copy_op>();
else { } else {
auto op = complete_.front(); auto op = complete_.front();
complete_.pop_front(); complete_.pop_front();
return optional<copy_op>(op); return optional<copy_op>(op);
} }
} }
void
copier::wait_(unsigned &micro)
{
optional<io_engine::wait_result> mp;
if (!pending())
return;
bool completed = false;
while (pending() && !completed) {
mp = engine_.wait(micro);
if (mp)
completed = wait_successful(*mp);
if (!micro)
break;
}
}
void void
copier::wait_() copier::wait_()
{ {
auto p = engine_.wait(); bool completed = false;
while (pending() && !completed) {
auto mp = engine_.wait();
if (mp)
completed = wait_successful(*mp);
}
}
bool
copier::wait_successful(io_engine::wait_result const &p)
{
auto it = jobs_.find(p.second); auto it = jobs_.find(p.second);
if (it == jobs_.end()) if (it == jobs_.end())
throw runtime_error("Internal error. Lost track of copy job."); throw runtime_error("Internal error. Lost track of copy job.");
@ -92,26 +144,29 @@ copier::wait_()
// IO was unsuccessful // IO was unsuccessful
complete(j); complete(j);
jobs_.erase(it); jobs_.erase(it);
return; return true;
} }
// IO was successful // IO was successful
if (!j.op.read_complete) { if (!j.op.read_complete) {
j.op.read_complete = true; j.op.read_complete = true;
if (!engine_.issue_io(dest_handle_, if (!engine_.issue_io(dest_handle_,
io_engine::WRITE, io_engine::D_WRITE,
to_sector(j.op.dest_b), to_sector(j.op.dest_b),
to_sector(j.op.dest_b + (j.op.src_e - j.op.src_b)), to_sector(j.op.dest_b + (j.op.src_e - j.op.src_b)),
j.data, j.data,
it->first)) { it->first)) {
complete(j); complete(j);
jobs_.erase(it); jobs_.erase(it);
return true;
} }
return false;
} else { } else {
j.op.write_complete = true; j.op.write_complete = true;
complete(j); complete(j);
jobs_.erase(it); jobs_.erase(it);
return true;
} }
} }

View File

@ -32,6 +32,10 @@ namespace bcache {
write_complete(false) { write_complete(false) {
} }
bool operator <(copy_op const &rhs) const {
return dest_b < rhs.dest_b;
}
bool success() const { bool success() const {
return read_complete && write_complete; return read_complete && write_complete;
} }
@ -69,8 +73,13 @@ namespace bcache {
unsigned nr_pending() const; unsigned nr_pending() const;
boost::optional<copy_op> wait(); boost::optional<copy_op> wait();
boost::optional<copy_op> wait(unsigned &micro);
private: private:
bool pending() const;
bool wait_successful(io_engine::wait_result const &p);
boost::optional<copy_op> wait_complete();
void wait_(unsigned &micro);
void wait_(); void wait_();
void complete(copy_job const &j); void complete(copy_job const &j);

View File

@ -20,7 +20,6 @@
#include "block-cache/copier.h" #include "block-cache/copier.h"
#include "test_utils.h" #include "test_utils.h"
#include <fcntl.h> #include <fcntl.h>
using namespace boost; using namespace boost;
@ -31,18 +30,32 @@ using namespace testing;
//---------------------------------------------------------------- //----------------------------------------------------------------
namespace { namespace {
unsigned const BLOCK_SIZE = 64u;
using wait_result = io_engine::wait_result;
ostream &operator <<(ostream &out, wait_result const &wr) {
out << "wait_result[" << wr.first << ", " << wr.second << "]";
return out;
}
ostream &operator <<(ostream &out, optional<wait_result> const &mwr) {
if (mwr) {
out << "Just[wait_result[" << mwr->first << ", " << mwr->second << "]]";
} else
out << "Nothing";
return out;
}
class io_engine_mock : public io_engine { class io_engine_mock : public io_engine {
public: public:
MOCK_METHOD3(open_file, handle(string const &, mode, sharing)); MOCK_METHOD3(open_file, handle(string const &, mode, sharing));
MOCK_METHOD1(close_file, void(handle)); MOCK_METHOD1(close_file, void(handle));
MOCK_METHOD6(issue_io, bool(handle, dir, sector_t, sector_t, void *, unsigned)); MOCK_METHOD6(issue_io, bool(handle, dir, sector_t, sector_t, void *, unsigned));
MOCK_METHOD0(wait, wait_result()); MOCK_METHOD0(wait, optional<wait_result>());
MOCK_METHOD1(wait, optional<wait_result>(unsigned &));
}; };
unsigned const BLOCK_SIZE = 64u;
using wait_result = io_engine::wait_result;
class CopierTests : public Test { class CopierTests : public Test {
public: public:
CopierTests() CopierTests()
@ -51,9 +64,9 @@ namespace {
} }
unique_ptr<copier> make_copier() { unique_ptr<copier> make_copier() {
EXPECT_CALL(engine_, open_file(src_file_, io_engine::READ_ONLY, io_engine::EXCLUSIVE)). EXPECT_CALL(engine_, open_file(src_file_, io_engine::M_READ_ONLY, io_engine::EXCLUSIVE)).
WillOnce(Return(SRC_HANDLE)); WillOnce(Return(SRC_HANDLE));
EXPECT_CALL(engine_, open_file(dest_file_, io_engine::READ_WRITE, io_engine::EXCLUSIVE)). EXPECT_CALL(engine_, open_file(dest_file_, io_engine::M_READ_WRITE, io_engine::EXCLUSIVE)).
WillOnce(Return(DEST_HANDLE)); WillOnce(Return(DEST_HANDLE));
EXPECT_CALL(engine_, close_file(SRC_HANDLE)).Times(1); EXPECT_CALL(engine_, close_file(SRC_HANDLE)).Times(1);
@ -64,11 +77,15 @@ namespace {
BLOCK_SIZE, 1 * 1024 * 1024)); BLOCK_SIZE, 1 * 1024 * 1024));
} }
static optional<wait_result> make_wr(bool success, unsigned context) {
return optional<wait_result>(wait_result(success, context));
}
void issue_successful_op(copier &c, copy_op &op, unsigned context) { void issue_successful_op(copier &c, copy_op &op, unsigned context) {
InSequence dummy; InSequence dummy;
unsigned nr_pending = c.nr_pending(); unsigned nr_pending = c.nr_pending();
EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::READ, EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::D_READ,
op.src_b * BLOCK_SIZE, op.src_b * BLOCK_SIZE,
op.src_e * BLOCK_SIZE, _, context)). op.src_e * BLOCK_SIZE, _, context)).
WillOnce(Return(true)); WillOnce(Return(true));
@ -77,15 +94,15 @@ namespace {
ASSERT_TRUE(c.nr_pending() == nr_pending + 1); ASSERT_TRUE(c.nr_pending() == nr_pending + 1);
EXPECT_CALL(engine_, wait()). EXPECT_CALL(engine_, wait()).
WillOnce(Return(wait_result(true, context))); WillOnce(Return(make_wr(true, context)));
EXPECT_CALL(engine_, issue_io(DEST_HANDLE, io_engine::WRITE, EXPECT_CALL(engine_, issue_io(DEST_HANDLE, io_engine::D_WRITE,
op.dest_b * BLOCK_SIZE, op.dest_b * BLOCK_SIZE,
(op.dest_b + (op.src_e - op.src_b)) * BLOCK_SIZE, _, context)). (op.dest_b + (op.src_e - op.src_b)) * BLOCK_SIZE, _, context)).
WillOnce(Return(true)); WillOnce(Return(true));
EXPECT_CALL(engine_, wait()). EXPECT_CALL(engine_, wait()).
WillOnce(Return(wait_result(true, context))); WillOnce(Return(make_wr(true, context)));
auto mop = c.wait(); auto mop = c.wait();
ASSERT_EQ(c.nr_pending(), nr_pending); ASSERT_EQ(c.nr_pending(), nr_pending);
@ -124,7 +141,7 @@ TEST_F(CopierTests, unsuccessful_issue_read)
auto c = make_copier(); auto c = make_copier();
InSequence dummy; InSequence dummy;
EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::READ, 0, BLOCK_SIZE, _, 0)). EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::D_READ, 0, BLOCK_SIZE, _, 0)).
WillOnce(Return(false)); WillOnce(Return(false));
c->issue(op1); c->issue(op1);
@ -141,13 +158,13 @@ TEST_F(CopierTests, unsuccessful_read)
auto c = make_copier(); auto c = make_copier();
InSequence dummy; InSequence dummy;
EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::READ, 0, BLOCK_SIZE, _, 0)). EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::D_READ, 0, BLOCK_SIZE, _, 0)).
WillOnce(Return(true)); WillOnce(Return(true));
c->issue(op1); c->issue(op1);
ASSERT_EQ(c->nr_pending(), 1u); ASSERT_EQ(c->nr_pending(), 1u);
EXPECT_CALL(engine_, wait()). EXPECT_CALL(engine_, wait()).
WillOnce(Return(wait_result(false, 0u))); WillOnce(Return(make_wr(false, 0u)));
ASSERT_EQ(c->nr_pending(), 1u); ASSERT_EQ(c->nr_pending(), 1u);
auto mop = c->wait(); auto mop = c->wait();
@ -161,17 +178,17 @@ TEST_F(CopierTests, unsuccessful_issue_write)
auto c = make_copier(); auto c = make_copier();
InSequence dummy; InSequence dummy;
EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::READ, 0, BLOCK_SIZE, _, 0)). EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::D_READ, 0, BLOCK_SIZE, _, 0)).
WillOnce(Return(true)); WillOnce(Return(true));
c->issue(op1); c->issue(op1);
ASSERT_EQ(c->nr_pending(), 1u); ASSERT_EQ(c->nr_pending(), 1u);
EXPECT_CALL(engine_, wait()). EXPECT_CALL(engine_, wait()).
WillOnce(Return(wait_result(true, 0u))); WillOnce(Return(make_wr(true, 0u)));
ASSERT_EQ(c->nr_pending(), 1u); ASSERT_EQ(c->nr_pending(), 1u);
EXPECT_CALL(engine_, issue_io(DEST_HANDLE, io_engine::WRITE, 0, BLOCK_SIZE, _, 0)). EXPECT_CALL(engine_, issue_io(DEST_HANDLE, io_engine::D_WRITE, 0, BLOCK_SIZE, _, 0)).
WillOnce(Return(false)); WillOnce(Return(false));
auto mop = c->wait(); auto mop = c->wait();
@ -187,19 +204,19 @@ TEST_F(CopierTests, unsuccessful_write)
auto c = make_copier(); auto c = make_copier();
InSequence dummy; InSequence dummy;
EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::READ, 0, BLOCK_SIZE, _, 0)). EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::D_READ, 0, BLOCK_SIZE, _, 0)).
WillOnce(Return(true)); WillOnce(Return(true));
c->issue(op1); c->issue(op1);
ASSERT_EQ(c->nr_pending(), 1u); ASSERT_EQ(c->nr_pending(), 1u);
EXPECT_CALL(engine_, wait()). EXPECT_CALL(engine_, wait()).
WillOnce(Return(wait_result(true, 0u))); WillOnce(Return(make_wr(true, 0u)));
EXPECT_CALL(engine_, issue_io(DEST_HANDLE, io_engine::WRITE, 0, BLOCK_SIZE, _, 0)). EXPECT_CALL(engine_, issue_io(DEST_HANDLE, io_engine::D_WRITE, 0, BLOCK_SIZE, _, 0)).
WillOnce(Return(true)); WillOnce(Return(true));
EXPECT_CALL(engine_, wait()). EXPECT_CALL(engine_, wait()).
WillOnce(Return(wait_result(false, 0u))); WillOnce(Return(make_wr(false, 0u)));
auto mop = c->wait(); auto mop = c->wait();
ASSERT_EQ(c->nr_pending(), 0u); ASSERT_EQ(c->nr_pending(), 0u);
@ -225,4 +242,32 @@ TEST_F(CopierTests, copy_different_blocks)
} }
} }
TEST_F(CopierTests, wait_can_timeout)
{
copy_op op1(0, 1, 0);
auto c = make_copier();
InSequence dummy;
EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::D_READ, 0, BLOCK_SIZE, _, 0)).
WillOnce(Return(true));
c->issue(op1);
ASSERT_EQ(c->nr_pending(), 1u);
unsigned micro = 10000;
EXPECT_CALL(engine_, wait(micro)).
WillOnce(Return(make_wr(true, 0u)));
ASSERT_EQ(c->nr_pending(), 1u);
EXPECT_CALL(engine_, issue_io(DEST_HANDLE, io_engine::D_WRITE, 0, BLOCK_SIZE, _, 0)).
WillOnce(Return(true));
EXPECT_CALL(engine_, wait(micro)).
WillOnce(DoAll(SetArgReferee<0>(0u), Return(optional<wait_result>())));
auto mop = c->wait(micro);
ASSERT_FALSE(mop);
ASSERT_EQ(c->nr_pending(), 1u);
}
//---------------------------------------------------------------- //----------------------------------------------------------------

View File

@ -64,8 +64,8 @@ TEST_F(IOEngineTests, empty_test)
TEST_F(IOEngineTests, open_and_close) TEST_F(IOEngineTests, open_and_close)
{ {
auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::READ_ONLY); auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::M_READ_ONLY);
auto dest_handle = engine_->open_file(dest_file_.get_path(), io_engine::READ_WRITE); auto dest_handle = engine_->open_file(dest_file_.get_path(), io_engine::M_READ_WRITE);
ASSERT_TRUE(src_handle != dest_handle); ASSERT_TRUE(src_handle != dest_handle);
engine_->close_file(src_handle); engine_->close_file(src_handle);
engine_->close_file(dest_handle); engine_->close_file(dest_handle);
@ -74,17 +74,17 @@ TEST_F(IOEngineTests, open_and_close)
TEST_F(IOEngineTests, you_can_read_a_read_only_handle) TEST_F(IOEngineTests, you_can_read_a_read_only_handle)
{ {
unsigned nr_sectors = 8; unsigned nr_sectors = 8;
auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::READ_ONLY); auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::M_READ_ONLY);
void *data = pool_.alloc(); void *data = pool_.alloc();
bool r = engine_->issue_io(src_handle, bool r = engine_->issue_io(src_handle,
io_engine::READ, io_engine::D_READ,
0, nr_sectors, 0, nr_sectors,
data, data,
123); 123);
ASSERT_TRUE(r); ASSERT_TRUE(r);
auto wr = engine_->wait(); auto wr = engine_->wait();
ASSERT_TRUE(wr.first); ASSERT_TRUE(wr->first);
ASSERT_TRUE(wr.second == 123); ASSERT_TRUE(wr->second == 123);
engine_->close_file(src_handle); engine_->close_file(src_handle);
pool_.free(data); pool_.free(data);
@ -94,10 +94,10 @@ TEST_F(IOEngineTests, you_can_read_a_read_only_handle)
TEST_F(IOEngineTests, you_cannot_write_to_a_read_only_handle) TEST_F(IOEngineTests, you_cannot_write_to_a_read_only_handle)
{ {
unsigned nr_sectors = 8; unsigned nr_sectors = 8;
auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::READ_ONLY); auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::M_READ_ONLY);
void *data = pool_.alloc(); void *data = pool_.alloc();
bool r = engine_->issue_io(src_handle, bool r = engine_->issue_io(src_handle,
io_engine::WRITE, io_engine::D_WRITE,
0, nr_sectors, 0, nr_sectors,
data, data,
0); 0);
@ -109,17 +109,17 @@ TEST_F(IOEngineTests, you_cannot_write_to_a_read_only_handle)
TEST_F(IOEngineTests, you_can_write_to_a_read_write_handle) TEST_F(IOEngineTests, you_can_write_to_a_read_write_handle)
{ {
unsigned nr_sectors = 8; unsigned nr_sectors = 8;
auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::READ_ONLY); auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::M_READ_ONLY);
void *data = pool_.alloc(); void *data = pool_.alloc();
bool r = engine_->issue_io(src_handle, bool r = engine_->issue_io(src_handle,
io_engine::READ, io_engine::D_READ,
0, nr_sectors, 0, nr_sectors,
data, data,
123); 123);
ASSERT_TRUE(r); ASSERT_TRUE(r);
auto wr = engine_->wait(); auto wr = engine_->wait();
ASSERT_TRUE(wr.first); ASSERT_TRUE(wr->first);
ASSERT_TRUE(wr.second == 123); ASSERT_TRUE(wr->second == 123);
engine_->close_file(src_handle); engine_->close_file(src_handle);
pool_.free(data); pool_.free(data);
@ -128,16 +128,16 @@ TEST_F(IOEngineTests, you_can_write_to_a_read_write_handle)
TEST_F(IOEngineTests, final_block_read_succeeds) TEST_F(IOEngineTests, final_block_read_succeeds)
{ {
unsigned nr_sectors = 8; unsigned nr_sectors = 8;
auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::READ_ONLY); auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::M_READ_ONLY);
void *data = pool_.alloc(); void *data = pool_.alloc();
bool r = engine_->issue_io(src_handle, bool r = engine_->issue_io(src_handle,
io_engine::READ, io_engine::D_READ,
meg(32) - nr_sectors, meg(32), meg(32) - nr_sectors, meg(32),
data, data,
123); 123);
ASSERT_TRUE(r); ASSERT_TRUE(r);
auto wr = engine_->wait(); auto wr = engine_->wait();
ASSERT_TRUE(wr.first); ASSERT_TRUE(wr->first);
engine_->close_file(src_handle); engine_->close_file(src_handle);
pool_.free(data); pool_.free(data);
@ -147,16 +147,16 @@ TEST_F(IOEngineTests, final_block_read_succeeds)
TEST_F(IOEngineTests, out_of_bounds_read_fails) TEST_F(IOEngineTests, out_of_bounds_read_fails)
{ {
unsigned nr_sectors = 8; unsigned nr_sectors = 8;
auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::READ_ONLY); auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::M_READ_ONLY);
void *data = pool_.alloc(); void *data = pool_.alloc();
bool r = engine_->issue_io(src_handle, bool r = engine_->issue_io(src_handle,
io_engine::READ, io_engine::D_READ,
meg(32), meg(32) + nr_sectors, meg(32), meg(32) + nr_sectors,
data, data,
123); 123);
ASSERT_TRUE(r); ASSERT_TRUE(r);
auto wr = engine_->wait(); auto wr = engine_->wait();
ASSERT_FALSE(wr.first); ASSERT_FALSE(wr->first);
engine_->close_file(src_handle); engine_->close_file(src_handle);
pool_.free(data); pool_.free(data);
@ -166,20 +166,41 @@ TEST_F(IOEngineTests, out_of_bounds_read_fails)
TEST_F(IOEngineTests, out_of_bounds_write_succeeds) TEST_F(IOEngineTests, out_of_bounds_write_succeeds)
{ {
unsigned nr_sectors = 8; unsigned nr_sectors = 8;
auto handle = engine_->open_file(dest_file_.get_path(), io_engine::READ_WRITE); auto handle = engine_->open_file(dest_file_.get_path(), io_engine::M_READ_WRITE);
void *data = pool_.alloc(); void *data = pool_.alloc();
bool r = engine_->issue_io(handle, bool r = engine_->issue_io(handle,
io_engine::WRITE, io_engine::D_WRITE,
meg(32), meg(32) + nr_sectors, meg(32), meg(32) + nr_sectors,
data, data,
123); 123);
ASSERT_TRUE(r); ASSERT_TRUE(r);
auto wr = engine_->wait(); auto wr = engine_->wait();
ASSERT_TRUE(wr.first); ASSERT_TRUE(wr->first);
engine_->close_file(handle); engine_->close_file(handle);
pool_.free(data); pool_.free(data);
} }
TEST_F(IOEngineTests, succeed_with_timeout)
{
unsigned nr_sectors = 8;
auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::M_READ_ONLY);
void *data = pool_.alloc();
bool r = engine_->issue_io(src_handle,
io_engine::D_READ,
0, nr_sectors,
data,
123);
ASSERT_TRUE(r);
unsigned micro = 10;
auto wr = engine_->wait(micro);
ASSERT_TRUE(wr->first);
ASSERT_TRUE(wr->second == 123);
engine_->close_file(src_handle);
pool_.free(data);
}
//---------------------------------------------------------------- //----------------------------------------------------------------