Parallelize checksum computation in filesystem writer

This commit is contained in:
Marcus Holland-Moritz 2021-04-05 12:47:52 +02:00
parent 713b18709d
commit ef722f02c4

View File

@ -47,11 +47,11 @@ namespace {
class fsblock {
public:
fsblock(logger& lgr, section_type type, const block_compressor& bc,
std::shared_ptr<block_data>&& data);
fsblock(section_type type, block_compressor const& bc,
std::shared_ptr<block_data>&& data, uint32_t number);
fsblock(section_type type, compression_type compression,
folly::ByteRange data);
folly::ByteRange data, uint32_t number);
void compress(worker_group& wg) { impl_->compress(wg); }
void wait_until_compressed() { impl_->wait_until_compressed(); }
@ -60,6 +60,8 @@ class fsblock {
folly::ByteRange data() const { return impl_->data(); }
size_t uncompressed_size() const { return impl_->uncompressed_size(); }
size_t size() const { return impl_->size(); }
uint32_t number() const { return impl_->number(); }
section_header_v2 const& header() const { return impl_->header(); }
class impl {
public:
@ -72,112 +74,97 @@ class fsblock {
virtual folly::ByteRange data() const = 0;
virtual size_t uncompressed_size() const = 0;
virtual size_t size() const = 0;
virtual uint32_t number() const = 0;
virtual section_header_v2 const& header() const = 0;
};
static void
build_section_header(section_header_v2& sh, fsblock::impl const& fsb);
private:
std::unique_ptr<impl> impl_;
};
template <typename LoggerPolicy>
class raw_fsblock : public fsblock::impl {
private:
class state {
public:
state(std::shared_ptr<block_data>&& data, logger& lgr)
: compressed_(false)
, data_(std::move(data))
, LOG_PROXY_INIT(lgr) {}
public:
raw_fsblock(section_type type, const block_compressor& bc,
std::shared_ptr<block_data>&& data, uint32_t number)
: type_{type}
, bc_{bc}
, uncompressed_size_{data->size()}
, data_{std::move(data)}
, number_{number} {}
void compress(const block_compressor& bc) {
std::shared_ptr<block_data> tmp;
void compress(worker_group& wg) override {
std::promise<void> prom;
future_ = prom.get_future();
{
auto td = LOG_TIMED_TRACE;
tmp = std::make_shared<block_data>(bc.compress(data_->vec()));
td << "block compression finished";
}
wg.add_job([this, prom = std::move(prom)]() mutable {
auto tmp = std::make_shared<block_data>(bc_.compress(data_->vec()));
{
std::lock_guard lock(mx_);
data_.swap(tmp);
compressed_ = true;
}
cond_.notify_one();
}
tmp.reset();
void wait() {
std::unique_lock lock(mx_);
cond_.wait(lock, [&]() -> bool { return compressed_; });
}
fsblock::build_section_header(header_, *this);
std::vector<uint8_t> const& data() const { return data_->vec(); }
size_t size() const {
std::lock_guard lock(mx_);
return data_->size();
}
private:
mutable std::mutex mx_;
std::condition_variable cond_;
std::atomic<bool> compressed_;
std::shared_ptr<block_data> data_;
LOG_PROXY_DECL(LoggerPolicy);
};
public:
raw_fsblock(logger& lgr, section_type type, const block_compressor& bc,
std::shared_ptr<block_data>&& data)
: type_(type)
, bc_(bc)
, uncompressed_size_(data->size())
, state_(std::make_shared<state>(std::move(data), lgr))
, LOG_PROXY_INIT(lgr) {}
void compress(worker_group& wg) override {
LOG_TRACE << "block queued for compression";
std::shared_ptr<state> s = state_;
wg.add_job([&, s] {
LOG_TRACE << "block compression started";
s->compress(bc_);
prom.set_value();
});
}
void wait_until_compressed() override { state_->wait(); }
void wait_until_compressed() override { future_.wait(); }
section_type type() const override { return type_; }
compression_type compression() const override { return bc_.type(); }
folly::ByteRange data() const override { return state_->data(); }
folly::ByteRange data() const override { return data_->vec(); }
size_t uncompressed_size() const override { return uncompressed_size_; }
size_t size() const override { return state_->size(); }
size_t size() const override {
std::lock_guard lock(mx_);
return data_->size();
}
uint32_t number() const override { return number_; }
section_header_v2 const& header() const override { return header_; }
private:
const section_type type_;
block_compressor const& bc_;
const size_t uncompressed_size_;
std::shared_ptr<state> state_;
LOG_PROXY_DECL(LoggerPolicy);
mutable std::mutex mx_;
std::shared_ptr<block_data> data_;
std::future<void> future_;
uint32_t const number_;
section_header_v2 header_;
};
class compressed_fsblock : public fsblock::impl {
public:
compressed_fsblock(section_type type, compression_type compression,
folly::ByteRange range)
: type_(type)
, compression_(compression)
, range_(range) {}
folly::ByteRange range, uint32_t number)
: type_{type}
, compression_{compression}
, range_{range}
, number_{number} {}
void compress(worker_group&) override {}
void wait_until_compressed() override {}
void compress(worker_group& wg) override {
std::promise<void> prom;
future_ = prom.get_future();
wg.add_job([this, prom = std::move(prom)]() mutable {
fsblock::build_section_header(header_, *this);
prom.set_value();
});
}
void wait_until_compressed() override { future_.wait(); }
section_type type() const override { return type_; }
compression_type compression() const override { return compression_; }
@ -187,20 +174,52 @@ class compressed_fsblock : public fsblock::impl {
size_t uncompressed_size() const override { return range_.size(); }
size_t size() const override { return range_.size(); }
uint32_t number() const override { return number_; }
section_header_v2 const& header() const override { return header_; }
private:
const section_type type_;
const compression_type compression_;
section_type const type_;
compression_type const compression_;
folly::ByteRange range_;
std::future<void> future_;
uint32_t const number_;
section_header_v2 header_;
};
fsblock::fsblock(logger& lgr, section_type type, const block_compressor& bc,
std::shared_ptr<block_data>&& data)
: impl_(make_unique_logging_object<impl, raw_fsblock, logger_policies>(
lgr, type, bc, std::move(data))) {}
fsblock::fsblock(section_type type, block_compressor const& bc,
std::shared_ptr<block_data>&& data, uint32_t number)
: impl_(std::make_unique<raw_fsblock>(type, bc, std::move(data), number)) {}
fsblock::fsblock(section_type type, compression_type compression,
folly::ByteRange data)
: impl_(std::make_unique<compressed_fsblock>(type, compression, data)) {}
folly::ByteRange data, uint32_t number)
: impl_(std::make_unique<compressed_fsblock>(type, compression, data,
number)) {}
void fsblock::build_section_header(section_header_v2& sh,
fsblock::impl const& fsb) {
auto range = fsb.data();
::memcpy(&sh.magic[0], "DWARFS", 6);
sh.major = MAJOR_VERSION;
sh.minor = MINOR_VERSION;
sh.number = fsb.number();
sh.type = static_cast<uint16_t>(fsb.type());
sh.compression = static_cast<uint16_t>(fsb.compression());
sh.length = range.size();
checksum xxh(checksum::algorithm::XXH3_64);
xxh.update(&sh.number,
sizeof(section_header_v2) - offsetof(section_header_v2, number));
xxh.update(range.data(), range.size());
DWARFS_CHECK(xxh.finalize(&sh.xxh3_64), "XXH3-64 checksum failed");
checksum sha(checksum::algorithm::SHA2_512_256);
sha.update(&sh.xxh3_64,
sizeof(section_header_v2) - offsetof(section_header_v2, xxh3_64));
sha.update(range.data(), range.size());
DWARFS_CHECK(sha.finalize(&sh.sha2_512_256), "SHA512/256 checksum failed");
}
template <typename LoggerPolicy>
class filesystem_writer_ final : public filesystem_writer::impl {
@ -226,8 +245,7 @@ class filesystem_writer_ final : public filesystem_writer::impl {
private:
void write_section(section_type type, std::shared_ptr<block_data>&& data,
block_compressor const& bc);
void write(section_type type, compression_type compression,
folly::ByteRange range);
void write(fsblock const& fsb);
void write(const char* data, size_t size);
template <typename T>
void write(const T& obj);
@ -321,7 +339,7 @@ void filesystem_writer_<LoggerPolicy>::writer_thread() {
<< size_with_unit(fsb->uncompressed_size()) << " to "
<< size_with_unit(fsb->size());
write(fsb->type(), fsb->compression(), fsb->data());
write(*fsb);
}
}
@ -355,34 +373,11 @@ void filesystem_writer_<LoggerPolicy>::write(folly::ByteRange range) {
}
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write(section_type type,
compression_type compression,
folly::ByteRange range) {
section_header_v2 sh;
::memcpy(&sh.magic[0], "DWARFS", 6);
sh.major = MAJOR_VERSION;
sh.minor = MINOR_VERSION;
sh.number = section_number_++;
sh.type = static_cast<uint16_t>(type);
sh.compression = static_cast<uint16_t>(compression);
sh.length = range.size();
void filesystem_writer_<LoggerPolicy>::write(fsblock const& fsb) {
write(fsb.header());
write(fsb.data());
checksum xxh(checksum::algorithm::XXH3_64);
xxh.update(&sh.number,
sizeof(section_header_v2) - offsetof(section_header_v2, number));
xxh.update(range.data(), range.size());
DWARFS_CHECK(xxh.finalize(&sh.xxh3_64), "XXH3-64 checksum failed");
checksum sha(checksum::algorithm::SHA2_512_256);
sha.update(&sh.xxh3_64,
sizeof(section_header_v2) - offsetof(section_header_v2, xxh3_64));
sha.update(range.data(), range.size());
DWARFS_CHECK(sha.finalize(&sh.sha2_512_256), "SHA512/256 checksum failed");
write(sh);
write(range);
if (type == section_type::BLOCK) {
if (fsb.type() == section_type::BLOCK) {
prog_.blocks_written++;
}
}
@ -400,7 +395,7 @@ void filesystem_writer_<LoggerPolicy>::write_section(
}
auto fsb =
std::make_unique<fsblock>(LOG_GET_LOGGER, type, bc, std::move(data));
std::make_unique<fsblock>(type, bc, std::move(data), section_number_++);
fsb->compress(wg_);
@ -415,7 +410,10 @@ void filesystem_writer_<LoggerPolicy>::write_section(
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write_compressed_section(
section_type type, compression_type compression, folly::ByteRange data) {
auto fsb = std::make_unique<fsblock>(type, compression, data);
auto fsb =
std::make_unique<fsblock>(type, compression, data, section_number_++);
fsb->compress(wg_);
{
std::lock_guard lock(mx_);