Introduce a buffer class that has its own allocator which makes sure

the buffer is 512 byte aligned.

Open metadata devices with O_DIRECT to let us work with live metadata.
This commit is contained in:
Joe Thornber 2012-05-17 13:05:26 +01:00
parent e25c211591
commit 2598648e62
13 changed files with 99 additions and 56 deletions

67
block.h
View File

@ -30,6 +30,7 @@
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include <string.h> #include <string.h>
#include <malloc.h>
//---------------------------------------------------------------- //----------------------------------------------------------------
@ -39,12 +40,47 @@ namespace persistent_data {
typedef uint64_t block_address; typedef uint64_t block_address;
template <uint32_t BlockSize> template <uint32_t BlockSize = MD_BLOCK_SIZE, uint32_t Alignment = 512>
class buffer : private boost::noncopyable {
public:
unsigned char &operator[](unsigned index) {
if (index >= BlockSize)
throw std::runtime_error("buffer index out of bounds");
return data_[index];
}
unsigned char const &operator[](unsigned index) const {
if (index >= BlockSize)
throw std::runtime_error("buffer index out of bounds");
return data_[index];
}
unsigned char *raw() {
return data_;
}
unsigned char const *raw() const {
return data_;
}
static void *operator new(size_t s) {
return ::memalign(Alignment, s);
}
static void operator delete(void *p) {
free(p);
}
private:
unsigned char data_[BlockSize];
};
template <uint32_t BlockSize = MD_BLOCK_SIZE>
class block_io : private boost::noncopyable { class block_io : private boost::noncopyable {
public: public:
typedef boost::shared_ptr<block_io> ptr; typedef boost::shared_ptr<block_io> ptr;
typedef unsigned char buffer[BlockSize];
typedef unsigned char const const_buffer[BlockSize];
block_io(std::string const &path, block_address nr_blocks, bool writeable = false); block_io(std::string const &path, block_address nr_blocks, bool writeable = false);
~block_io(); ~block_io();
@ -53,8 +89,8 @@ namespace persistent_data {
return nr_blocks_; return nr_blocks_;
} }
void read_buffer(block_address location, buffer &buf) const; void read_buffer(block_address location, buffer<BlockSize> &buf) const;
void write_buffer(block_address location, const_buffer &buf); void write_buffer(block_address location, buffer<BlockSize> const &buf);
private: private:
int fd_; int fd_;
@ -72,23 +108,20 @@ namespace persistent_data {
unsigned max_concurrent_locks, unsigned max_concurrent_locks,
bool writeable = false); bool writeable = false);
typedef unsigned char buffer[BlockSize];
typedef unsigned char const const_buffer[BlockSize];
class validator { class validator {
public: public:
typedef boost::shared_ptr<validator> ptr; typedef boost::shared_ptr<validator> ptr;
virtual ~validator() {} virtual ~validator() {}
virtual void check(const_buffer &b, block_address location) const = 0; virtual void check(buffer<BlockSize> const &b, block_address location) const = 0;
virtual void prepare(buffer &b, block_address location) const = 0; virtual void prepare(buffer<BlockSize> &b, block_address location) const = 0;
}; };
class noop_validator : public validator { class noop_validator : public validator {
public: public:
void check(const_buffer &b, block_address location) const {} void check(buffer<BlockSize> const &b, block_address location) const {}
void prepare(buffer &b, block_address location) const {} void prepare(buffer<BlockSize> &b, block_address location) const {}
}; };
enum block_type { enum block_type {
@ -96,7 +129,7 @@ namespace persistent_data {
BT_NORMAL BT_NORMAL
}; };
struct block { struct block : private boost::noncopyable {
typedef boost::shared_ptr<block> ptr; typedef boost::shared_ptr<block> ptr;
block(typename block_io<BlockSize>::ptr io, block(typename block_io<BlockSize>::ptr io,
@ -118,12 +151,12 @@ namespace persistent_data {
typename block_io<BlockSize>::ptr io_; typename block_io<BlockSize>::ptr io_;
block_address location_; block_address location_;
buffer data_; std::auto_ptr<buffer<BlockSize> > data_;
typename validator::ptr validator_; typename validator::ptr validator_;
block_type bt_; block_type bt_;
bool dirty_; bool dirty_;
}; };
typedef typename block::ptr block_ptr; typedef typename block::ptr block_ptr; // FIXME: remove
class read_ref { class read_ref {
public: public:
@ -135,7 +168,7 @@ namespace persistent_data {
read_ref const &operator =(read_ref const &rhs); read_ref const &operator =(read_ref const &rhs);
block_address get_location() const; block_address get_location() const;
const_buffer &data() const; buffer<BlockSize> const &data() const;
protected: protected:
block_manager<BlockSize> const &bm_; block_manager<BlockSize> const &bm_;
@ -151,7 +184,7 @@ namespace persistent_data {
typename block::ptr b); typename block::ptr b);
using read_ref::data; using read_ref::data;
buffer &data(); buffer<BlockSize> &data();
}; };
// Locking methods // Locking methods

View File

@ -40,7 +40,7 @@ block_io<BlockSize>::block_io(std::string const &path, block_address nr_blocks,
writeable_(writeable) writeable_(writeable)
{ {
// fd_ = ::open(path.c_str(), writeable ? (O_RDWR | O_CREAT) : O_RDONLY, 0666); // fd_ = ::open(path.c_str(), writeable ? (O_RDWR | O_CREAT) : O_RDONLY, 0666);
fd_ = ::open(path.c_str(), writeable ? O_RDWR : O_RDONLY, 0666); fd_ = ::open(path.c_str(), O_DIRECT | O_SYNC | (writeable ? O_RDWR : O_RDONLY), 0666);
if (fd_ < 0) if (fd_ < 0)
throw std::runtime_error("couldn't open file"); throw std::runtime_error("couldn't open file");
} }
@ -53,7 +53,7 @@ block_io<BlockSize>::~block_io()
template <uint32_t BlockSize> template <uint32_t BlockSize>
void void
block_io<BlockSize>::read_buffer(block_address location, buffer &buffer) const block_io<BlockSize>::read_buffer(block_address location, buffer<BlockSize> &buffer) const
{ {
off_t r; off_t r;
r = ::lseek(fd_, BlockSize * location, SEEK_SET); r = ::lseek(fd_, BlockSize * location, SEEK_SET);
@ -62,7 +62,7 @@ block_io<BlockSize>::read_buffer(block_address location, buffer &buffer) const
ssize_t n; ssize_t n;
size_t remaining = BlockSize; size_t remaining = BlockSize;
unsigned char *buf = buffer; unsigned char *buf = buffer.raw();
do { do {
n = ::read(fd_, buf, remaining); n = ::read(fd_, buf, remaining);
if (n > 0) { if (n > 0) {
@ -77,7 +77,7 @@ block_io<BlockSize>::read_buffer(block_address location, buffer &buffer) const
template <uint32_t BlockSize> template <uint32_t BlockSize>
void void
block_io<BlockSize>::write_buffer(block_address location, const_buffer &buffer) block_io<BlockSize>::write_buffer(block_address location, buffer<BlockSize> const &buffer)
{ {
off_t r; off_t r;
r = ::lseek(fd_, BlockSize * location, SEEK_SET); r = ::lseek(fd_, BlockSize * location, SEEK_SET);
@ -86,7 +86,7 @@ block_io<BlockSize>::write_buffer(block_address location, const_buffer &buffer)
ssize_t n; ssize_t n;
size_t remaining = BlockSize; size_t remaining = BlockSize;
unsigned char const *buf = buffer; unsigned char const *buf = buffer.raw();
do { do {
n = ::write(fd_, buf, remaining); n = ::write(fd_, buf, remaining);
if (n > 0) { if (n > 0) {
@ -118,16 +118,17 @@ block_manager<BlockSize>::block::block(typename block_io<BlockSize>::ptr io,
bool zero) bool zero)
: io_(io), : io_(io),
location_(location), location_(location),
data_(new buffer<BlockSize>()),
validator_(v), validator_(v),
bt_(bt), bt_(bt),
dirty_(false) dirty_(false)
{ {
if (zero) { if (zero) {
memset(&data_, 0, sizeof(data_)); memset(data_->raw(), 0, BlockSize);
dirty_ = true; dirty_ = true;
} else { } else {
io_->read_buffer(location_, data_); io_->read_buffer(location_, *data_);
validator_->check(data_, location_); validator_->check(*data_, location_);
} }
} }
@ -142,8 +143,8 @@ void
block_manager<BlockSize>::block::flush() block_manager<BlockSize>::block::flush()
{ {
if (dirty_) { if (dirty_) {
validator_->prepare(data_, location_); validator_->prepare(*data_, location_);
io_->write_buffer(location_, data_); io_->write_buffer(location_, *data_);
} }
} }
@ -203,10 +204,10 @@ block_manager<BlockSize>::read_ref::get_location() const
} }
template <uint32_t BlockSize> template <uint32_t BlockSize>
typename block_manager<BlockSize>::const_buffer & buffer<BlockSize> const &
block_manager<BlockSize>::read_ref::data() const block_manager<BlockSize>::read_ref::data() const
{ {
return block_->data_; return *block_->data_;
} }
//-------------------------------- //--------------------------------
@ -220,10 +221,10 @@ block_manager<BlockSize>::write_ref::write_ref(block_manager<BlockSize> const &b
} }
template <uint32_t BlockSize> template <uint32_t BlockSize>
typename block_manager<BlockSize>::buffer & buffer<BlockSize> &
block_manager<BlockSize>::write_ref::data() block_manager<BlockSize>::write_ref::data()
{ {
return read_ref::block_->data_; return *read_ref::block_->data_;
} }
//---------------------------------------------------------------- //----------------------------------------------------------------
@ -285,7 +286,7 @@ block_manager<BlockSize>::write_lock_zero(block_address location,
boost::optional<block_ptr> cached_block = cache_.get(location); boost::optional<block_ptr> cached_block = cache_.get(location);
if (cached_block) { if (cached_block) {
(*cached_block)->check_write_lockable(); (*cached_block)->check_write_lockable();
memset(&(*cached_block)->data_, 0, BlockSize); memset((*cached_block)->data_->raw(), 0, BlockSize);
return write_ref(*this, *cached_block); return write_ref(*this, *cached_block);
} }
@ -326,7 +327,7 @@ block_manager<BlockSize>::superblock_zero(block_address location,
if (cached_block) { if (cached_block) {
(*cached_block)->check_write_lockable(); (*cached_block)->check_write_lockable();
memset(&(*cached_block)->data_, 0, BlockSize); memset((*cached_block)->data_->raw(), 0, BlockSize); // FIXME: add a zero method to buffer
(*cached_block)->validator_ = v; (*cached_block)->validator_ = v;
return write_ref(*this, *cached_block); return write_ref(*this, *cached_block);
} }

View File

@ -176,7 +176,7 @@ namespace persistent_data {
return node_ref<ValueTraits>( return node_ref<ValueTraits>(
b.get_location(), b.get_location(),
reinterpret_cast<disk_node *>( reinterpret_cast<disk_node *>(
const_cast<unsigned char *>(b.data()))); const_cast<unsigned char *>(b.data().raw())));
} }
template <typename ValueTraits> template <typename ValueTraits>
@ -186,7 +186,7 @@ namespace persistent_data {
return node_ref<ValueTraits>( return node_ref<ValueTraits>(
b.get_location(), b.get_location(),
reinterpret_cast<disk_node *>( reinterpret_cast<disk_node *>(
const_cast<unsigned char *>(b.data()))); const_cast<unsigned char *>(b.data().raw())));
} }
class ro_spine : private noncopyable { class ro_spine : private noncopyable {

View File

@ -33,7 +33,7 @@ using namespace std;
namespace { namespace {
struct btree_node_validator : public block_manager<>::validator { struct btree_node_validator : public block_manager<>::validator {
virtual void check(block_manager<>::const_buffer &b, block_address location) const { virtual void check(buffer<> const &b, block_address location) const {
disk_node const *data = reinterpret_cast<disk_node const *>(&b); disk_node const *data = reinterpret_cast<disk_node const *>(&b);
node_header const *n = &data->header; node_header const *n = &data->header;
crc32c sum(BTREE_CSUM_XOR); crc32c sum(BTREE_CSUM_XOR);
@ -45,7 +45,7 @@ namespace {
throw checksum_error("bad block nr in btree node"); throw checksum_error("bad block nr in btree node");
} }
virtual void prepare(block_manager<>::buffer &b, block_address location) const { virtual void prepare(buffer<> &b, block_address location) const {
disk_node *data = reinterpret_cast<disk_node *>(&b); disk_node *data = reinterpret_cast<disk_node *>(&b);
node_header *n = &data->header; node_header *n = &data->header;
n->blocknr = to_disk<base::__le64, uint64_t>(location); n->blocknr = to_disk<base::__le64, uint64_t>(location);

View File

@ -40,7 +40,7 @@ namespace {
uint32_t const SUPERBLOCK_CSUM_SEED = 160774; uint32_t const SUPERBLOCK_CSUM_SEED = 160774;
struct superblock_validator : public block_manager<>::validator { struct superblock_validator : public block_manager<>::validator {
virtual void check(block_manager<>::const_buffer &b, block_address location) const { virtual void check(buffer<> const &b, block_address location) const {
superblock_disk const *sbd = reinterpret_cast<superblock_disk const *>(&b); superblock_disk const *sbd = reinterpret_cast<superblock_disk const *>(&b);
crc32c sum(SUPERBLOCK_CSUM_SEED); crc32c sum(SUPERBLOCK_CSUM_SEED);
sum.append(&sbd->flags_, MD_BLOCK_SIZE - sizeof(uint32_t)); sum.append(&sbd->flags_, MD_BLOCK_SIZE - sizeof(uint32_t));
@ -48,7 +48,7 @@ namespace {
throw checksum_error("bad checksum in superblock"); throw checksum_error("bad checksum in superblock");
} }
virtual void prepare(block_manager<>::buffer &b, block_address location) const { virtual void prepare(buffer<> &b, block_address location) const {
superblock_disk *sbd = reinterpret_cast<superblock_disk *>(&b); superblock_disk *sbd = reinterpret_cast<superblock_disk *>(&b);
crc32c sum(SUPERBLOCK_CSUM_SEED); crc32c sum(SUPERBLOCK_CSUM_SEED);
sum.append(&sbd->flags_, MD_BLOCK_SIZE - sizeof(uint32_t)); sum.append(&sbd->flags_, MD_BLOCK_SIZE - sizeof(uint32_t));
@ -127,6 +127,10 @@ metadata::metadata(std::string const &dev_path, open_type ot,
case OPEN: case OPEN:
tm_ = open_tm(dev_path, false); tm_ = open_tm(dev_path, false);
sb_ = read_superblock(tm_->get_bm()); sb_ = read_superblock(tm_->get_bm());
if (sb_.version_ != 1)
throw runtime_error("unknown metadata version");
metadata_sm_ = open_metadata_sm(tm_, &sb_.metadata_space_map_root_); metadata_sm_ = open_metadata_sm(tm_, &sb_.metadata_space_map_root_);
tm_->set_sm(metadata_sm_); tm_->set_sm(metadata_sm_);
@ -161,10 +165,10 @@ metadata::metadata(std::string const &dev_path, open_type ot,
} }
} }
metadata::metadata(std::string const &dev_path, block_address held_root) metadata::metadata(std::string const &dev_path, block_address metadata_snap)
{ {
tm_ = open_tm(dev_path, false); tm_ = open_tm(dev_path, false);
sb_ = read_superblock(tm_->get_bm(), held_root); sb_ = read_superblock(tm_->get_bm(), metadata_snap);
// We don't open the metadata sm for a held root // We don't open the metadata sm for a held root
//metadata_sm_ = open_metadata_sm(tm_, &sb_.metadata_space_map_root_); //metadata_sm_ = open_metadata_sm(tm_, &sb_.metadata_space_map_root_);
tm_->set_sm(metadata_sm_); tm_->set_sm(metadata_sm_);
@ -208,7 +212,7 @@ metadata::commit()
superblock_validator v; superblock_validator v;
write_ref superblock = tm_->get_bm()->superblock_zero(SUPERBLOCK_LOCATION, write_ref superblock = tm_->get_bm()->superblock_zero(SUPERBLOCK_LOCATION,
mk_validator(new superblock_validator())); mk_validator(new superblock_validator()));
superblock_disk *disk = reinterpret_cast<superblock_disk *>(superblock.data()); superblock_disk *disk = reinterpret_cast<superblock_disk *>(superblock.data().raw());
superblock_traits::pack(sb_, *disk); superblock_traits::pack(sb_, *disk);
} }

View File

@ -151,7 +151,7 @@ namespace thin_provisioning {
sector_t data_block_size = 128, sector_t data_block_size = 128,
block_address nr_data_blocks = 0); // Only used if CREATE block_address nr_data_blocks = 0); // Only used if CREATE
metadata(std::string const &dev_path, block_address held_root); metadata(std::string const &dev_path, block_address metadata_snap);
void commit(); void commit();

View File

@ -171,11 +171,11 @@ thin_provisioning::metadata_check(metadata::ptr md)
block_counter metadata_counter, data_counter; block_counter metadata_counter, data_counter;
if (md->sb_.held_root_) { if (md->sb_.metadata_snap_) {
block_manager<>::ptr bm = md->tm_->get_bm(); block_manager<>::ptr bm = md->tm_->get_bm();
block_address root = md->sb_.held_root_; block_address root = md->sb_.metadata_snap_;
metadata_counter.inc(root); metadata_counter.inc(root);

View File

@ -55,7 +55,7 @@ superblock_traits::unpack(superblock_disk const &disk, superblock &value)
value.time_ = to_cpu<uint32_t>(disk.time_); value.time_ = to_cpu<uint32_t>(disk.time_);
value.trans_id_ = to_cpu<uint64_t>(disk.trans_id_); value.trans_id_ = to_cpu<uint64_t>(disk.trans_id_);
value.held_root_ = to_cpu<uint64_t>(disk.held_root_); value.metadata_snap_ = to_cpu<uint64_t>(disk.metadata_snap_);
::memcpy(value.data_space_map_root_, ::memcpy(value.data_space_map_root_,
disk.data_space_map_root_, disk.data_space_map_root_,
@ -88,7 +88,7 @@ superblock_traits::pack(superblock const &value, superblock_disk &disk)
disk.time_ = to_disk<__le32>(value.time_); disk.time_ = to_disk<__le32>(value.time_);
disk.trans_id_ = to_disk<__le64>(value.trans_id_); disk.trans_id_ = to_disk<__le64>(value.trans_id_);
disk.held_root_ = to_disk<__le64>(value.held_root_); disk.metadata_snap_ = to_disk<__le64>(value.metadata_snap_);
::memcpy(disk.data_space_map_root_, ::memcpy(disk.data_space_map_root_,
value.data_space_map_root_, value.data_space_map_root_,

View File

@ -66,7 +66,7 @@ namespace thin_provisioning {
__le64 trans_id_; __le64 trans_id_;
/* root for userspace's transaction (for migration and friends) */ /* root for userspace's transaction (for migration and friends) */
__le64 held_root_; __le64 metadata_snap_;
__u8 data_space_map_root_[SPACE_MAP_ROOT_SIZE]; __u8 data_space_map_root_[SPACE_MAP_ROOT_SIZE];
__u8 metadata_space_map_root_[SPACE_MAP_ROOT_SIZE]; __u8 metadata_space_map_root_[SPACE_MAP_ROOT_SIZE];
@ -99,7 +99,7 @@ namespace thin_provisioning {
uint64_t trans_id_; uint64_t trans_id_;
/* root for userspace's transaction (for migration and friends) */ /* root for userspace's transaction (for migration and friends) */
uint64_t held_root_; uint64_t metadata_snap_;
unsigned char data_space_map_root_[SPACE_MAP_ROOT_SIZE]; unsigned char data_space_map_root_[SPACE_MAP_ROOT_SIZE];
unsigned char metadata_space_map_root_[SPACE_MAP_ROOT_SIZE]; unsigned char metadata_space_map_root_[SPACE_MAP_ROOT_SIZE];

View File

@ -186,10 +186,15 @@ namespace {
void void
thin_provisioning::metadata_dump(metadata::ptr md, emitter::ptr e, bool repair) thin_provisioning::metadata_dump(metadata::ptr md, emitter::ptr e, bool repair)
{ {
optional<uint64_t> md_snap = md->sb_.metadata_snap_ ?
optional<uint64_t>(md->sb_.metadata_snap_) :
optional<uint64_t>();
e->begin_superblock("", md->sb_.time_, e->begin_superblock("", md->sb_.time_,
md->sb_.trans_id_, md->sb_.trans_id_,
md->sb_.data_block_size_, md->sb_.data_block_size_,
md->data_sm_->get_nr_blocks()); md->data_sm_->get_nr_blocks(),
md_snap);
details_extractor::ptr de(new details_extractor); details_extractor::ptr de(new details_extractor);

View File

@ -38,7 +38,7 @@ namespace {
uint64_t const BITMAP_CSUM_XOR = 240779; uint64_t const BITMAP_CSUM_XOR = 240779;
struct bitmap_block_validator : public block_manager<>::validator { struct bitmap_block_validator : public block_manager<>::validator {
virtual void check(block_manager<>::const_buffer &b, block_address location) const { virtual void check(buffer<> const &b, block_address location) const {
bitmap_header const *data = reinterpret_cast<bitmap_header const *>(&b); bitmap_header const *data = reinterpret_cast<bitmap_header const *>(&b);
crc32c sum(BITMAP_CSUM_XOR); crc32c sum(BITMAP_CSUM_XOR);
sum.append(&data->not_used, MD_BLOCK_SIZE - sizeof(uint32_t)); sum.append(&data->not_used, MD_BLOCK_SIZE - sizeof(uint32_t));
@ -49,7 +49,7 @@ namespace {
throw checksum_error("bad block nr in space map bitmap"); throw checksum_error("bad block nr in space map bitmap");
} }
virtual void prepare(block_manager<>::buffer &b, block_address location) const { virtual void prepare(buffer<> &b, block_address location) const {
bitmap_header *data = reinterpret_cast<bitmap_header *>(&b); bitmap_header *data = reinterpret_cast<bitmap_header *>(&b);
data->blocknr = to_disk<base::__le64, uint64_t>(location); data->blocknr = to_disk<base::__le64, uint64_t>(location);
@ -70,7 +70,7 @@ namespace {
// FIXME: factor out the common code in these validators // FIXME: factor out the common code in these validators
struct index_block_validator : public block_manager<>::validator { struct index_block_validator : public block_manager<>::validator {
virtual void check(block_manager<>::const_buffer &b, block_address location) const { virtual void check(buffer<> const &b, block_address location) const {
metadata_index const *mi = reinterpret_cast<metadata_index const *>(&b); metadata_index const *mi = reinterpret_cast<metadata_index const *>(&b);
crc32c sum(INDEX_CSUM_XOR); crc32c sum(INDEX_CSUM_XOR);
sum.append(&mi->padding_, MD_BLOCK_SIZE - sizeof(uint32_t)); sum.append(&mi->padding_, MD_BLOCK_SIZE - sizeof(uint32_t));
@ -81,7 +81,7 @@ namespace {
throw checksum_error("bad block nr in metadata index block"); throw checksum_error("bad block nr in metadata index block");
} }
virtual void prepare(block_manager<>::buffer &b, block_address location) const { virtual void prepare(buffer<> &b, block_address location) const {
metadata_index *mi = reinterpret_cast<metadata_index *>(&b); metadata_index *mi = reinterpret_cast<metadata_index *>(&b);
mi->blocknr_ = to_disk<base::__le64, uint64_t>(location); mi->blocknr_ = to_disk<base::__le64, uint64_t>(location);

View File

@ -193,7 +193,7 @@ namespace {
field(f, "version", sb.version_); field(f, "version", sb.version_);
field(f, "time", sb.time_); field(f, "time", sb.time_);
field(f, "trans id", sb.trans_id_); field(f, "trans id", sb.trans_id_);
field(f, "held root", sb.held_root_); field(f, "metadata snap", sb.metadata_snap_);
field(f, "data mapping root", sb.data_mapping_root_); field(f, "data mapping root", sb.data_mapping_root_);
field(f, "device details root", sb.device_details_root_); field(f, "device details root", sb.device_details_root_);
field(f, "data block size", sb.data_block_size_); field(f, "data block size", sb.data_block_size_);

View File

@ -73,7 +73,7 @@ transaction_manager::shadow(block_address orig, validator v)
throw runtime_error("couldn't allocate new block"); throw runtime_error("couldn't allocate new block");
write_ref dest = bm_->write_lock_zero(*mb, v); write_ref dest = bm_->write_lock_zero(*mb, v);
::memcpy(dest.data(), src.data(), MD_BLOCK_SIZE); ::memcpy(dest.data().raw(), src.data().raw(), MD_BLOCK_SIZE); // FIXME: use buffer copy method
ref_t count = sm_->get_count(orig); ref_t count = sm_->get_count(orig);
if (count == 0) if (count == 0)