From ef722f02c4cec0130c5e96c824513fe2b1117178 Mon Sep 17 00:00:00 2001 From: Marcus Holland-Moritz Date: Mon, 5 Apr 2021 12:47:52 +0200 Subject: [PATCH] Parallelize checksum computation in filesystem writer --- src/dwarfs/filesystem_writer.cpp | 218 +++++++++++++++---------------- 1 file changed, 108 insertions(+), 110 deletions(-) diff --git a/src/dwarfs/filesystem_writer.cpp b/src/dwarfs/filesystem_writer.cpp index 43717390..e9b4ecba 100644 --- a/src/dwarfs/filesystem_writer.cpp +++ b/src/dwarfs/filesystem_writer.cpp @@ -47,11 +47,11 @@ namespace { class fsblock { public: - fsblock(logger& lgr, section_type type, const block_compressor& bc, - std::shared_ptr&& data); + fsblock(section_type type, block_compressor const& bc, + std::shared_ptr&& data, uint32_t number); fsblock(section_type type, compression_type compression, - folly::ByteRange data); + folly::ByteRange data, uint32_t number); void compress(worker_group& wg) { impl_->compress(wg); } void wait_until_compressed() { impl_->wait_until_compressed(); } @@ -60,6 +60,8 @@ class fsblock { folly::ByteRange data() const { return impl_->data(); } size_t uncompressed_size() const { return impl_->uncompressed_size(); } size_t size() const { return impl_->size(); } + uint32_t number() const { return impl_->number(); } + section_header_v2 const& header() const { return impl_->header(); } class impl { public: @@ -72,112 +74,97 @@ class fsblock { virtual folly::ByteRange data() const = 0; virtual size_t uncompressed_size() const = 0; virtual size_t size() const = 0; + virtual uint32_t number() const = 0; + virtual section_header_v2 const& header() const = 0; }; + static void + build_section_header(section_header_v2& sh, fsblock::impl const& fsb); + private: std::unique_ptr impl_; }; -template class raw_fsblock : public fsblock::impl { - private: - class state { - public: - state(std::shared_ptr&& data, logger& lgr) - : compressed_(false) - , data_(std::move(data)) - , LOG_PROXY_INIT(lgr) {} + public: + raw_fsblock(section_type type, const block_compressor& bc, + std::shared_ptr&& data, uint32_t number) + : type_{type} + , bc_{bc} + , uncompressed_size_{data->size()} + , data_{std::move(data)} + , number_{number} {} - void compress(const block_compressor& bc) { - std::shared_ptr tmp; + void compress(worker_group& wg) override { + std::promise prom; + future_ = prom.get_future(); - { - auto td = LOG_TIMED_TRACE; - - tmp = std::make_shared(bc.compress(data_->vec())); - - td << "block compression finished"; - } + wg.add_job([this, prom = std::move(prom)]() mutable { + auto tmp = std::make_shared(bc_.compress(data_->vec())); { std::lock_guard lock(mx_); data_.swap(tmp); - compressed_ = true; } - cond_.notify_one(); - } + tmp.reset(); - void wait() { - std::unique_lock lock(mx_); - cond_.wait(lock, [&]() -> bool { return compressed_; }); - } + fsblock::build_section_header(header_, *this); - std::vector const& data() const { return data_->vec(); } - - size_t size() const { - std::lock_guard lock(mx_); - return data_->size(); - } - - private: - mutable std::mutex mx_; - std::condition_variable cond_; - std::atomic compressed_; - std::shared_ptr data_; - LOG_PROXY_DECL(LoggerPolicy); - }; - - public: - raw_fsblock(logger& lgr, section_type type, const block_compressor& bc, - std::shared_ptr&& data) - : type_(type) - , bc_(bc) - , uncompressed_size_(data->size()) - , state_(std::make_shared(std::move(data), lgr)) - , LOG_PROXY_INIT(lgr) {} - - void compress(worker_group& wg) override { - LOG_TRACE << "block queued for compression"; - - std::shared_ptr s = state_; - - wg.add_job([&, s] { - LOG_TRACE << "block compression started"; - s->compress(bc_); + prom.set_value(); }); } - void wait_until_compressed() override { state_->wait(); } + void wait_until_compressed() override { future_.wait(); } section_type type() const override { return type_; } compression_type compression() const override { return bc_.type(); } - folly::ByteRange data() const override { return state_->data(); } + folly::ByteRange data() const override { return data_->vec(); } size_t uncompressed_size() const override { return uncompressed_size_; } - size_t size() const override { return state_->size(); } + size_t size() const override { + std::lock_guard lock(mx_); + return data_->size(); + } + + uint32_t number() const override { return number_; } + + section_header_v2 const& header() const override { return header_; } private: const section_type type_; block_compressor const& bc_; const size_t uncompressed_size_; - std::shared_ptr state_; - LOG_PROXY_DECL(LoggerPolicy); + mutable std::mutex mx_; + std::shared_ptr data_; + std::future future_; + uint32_t const number_; + section_header_v2 header_; }; class compressed_fsblock : public fsblock::impl { public: compressed_fsblock(section_type type, compression_type compression, - folly::ByteRange range) - : type_(type) - , compression_(compression) - , range_(range) {} + folly::ByteRange range, uint32_t number) + : type_{type} + , compression_{compression} + , range_{range} + , number_{number} {} - void compress(worker_group&) override {} - void wait_until_compressed() override {} + void compress(worker_group& wg) override { + std::promise prom; + future_ = prom.get_future(); + + wg.add_job([this, prom = std::move(prom)]() mutable { + fsblock::build_section_header(header_, *this); + prom.set_value(); + }); + } + + void wait_until_compressed() override { future_.wait(); } section_type type() const override { return type_; } compression_type compression() const override { return compression_; } @@ -187,20 +174,52 @@ class compressed_fsblock : public fsblock::impl { size_t uncompressed_size() const override { return range_.size(); } size_t size() const override { return range_.size(); } + uint32_t number() const override { return number_; } + + section_header_v2 const& header() const override { return header_; } + private: - const section_type type_; - const compression_type compression_; + section_type const type_; + compression_type const compression_; folly::ByteRange range_; + std::future future_; + uint32_t const number_; + section_header_v2 header_; }; -fsblock::fsblock(logger& lgr, section_type type, const block_compressor& bc, - std::shared_ptr&& data) - : impl_(make_unique_logging_object( - lgr, type, bc, std::move(data))) {} +fsblock::fsblock(section_type type, block_compressor const& bc, + std::shared_ptr&& data, uint32_t number) + : impl_(std::make_unique(type, bc, std::move(data), number)) {} fsblock::fsblock(section_type type, compression_type compression, - folly::ByteRange data) - : impl_(std::make_unique(type, compression, data)) {} + folly::ByteRange data, uint32_t number) + : impl_(std::make_unique(type, compression, data, + number)) {} + +void fsblock::build_section_header(section_header_v2& sh, + fsblock::impl const& fsb) { + auto range = fsb.data(); + + ::memcpy(&sh.magic[0], "DWARFS", 6); + sh.major = MAJOR_VERSION; + sh.minor = MINOR_VERSION; + sh.number = fsb.number(); + sh.type = static_cast(fsb.type()); + sh.compression = static_cast(fsb.compression()); + sh.length = range.size(); + + checksum xxh(checksum::algorithm::XXH3_64); + xxh.update(&sh.number, + sizeof(section_header_v2) - offsetof(section_header_v2, number)); + xxh.update(range.data(), range.size()); + DWARFS_CHECK(xxh.finalize(&sh.xxh3_64), "XXH3-64 checksum failed"); + + checksum sha(checksum::algorithm::SHA2_512_256); + sha.update(&sh.xxh3_64, + sizeof(section_header_v2) - offsetof(section_header_v2, xxh3_64)); + sha.update(range.data(), range.size()); + DWARFS_CHECK(sha.finalize(&sh.sha2_512_256), "SHA512/256 checksum failed"); +} template class filesystem_writer_ final : public filesystem_writer::impl { @@ -226,8 +245,7 @@ class filesystem_writer_ final : public filesystem_writer::impl { private: void write_section(section_type type, std::shared_ptr&& data, block_compressor const& bc); - void write(section_type type, compression_type compression, - folly::ByteRange range); + void write(fsblock const& fsb); void write(const char* data, size_t size); template void write(const T& obj); @@ -321,7 +339,7 @@ void filesystem_writer_::writer_thread() { << size_with_unit(fsb->uncompressed_size()) << " to " << size_with_unit(fsb->size()); - write(fsb->type(), fsb->compression(), fsb->data()); + write(*fsb); } } @@ -355,34 +373,11 @@ void filesystem_writer_::write(folly::ByteRange range) { } template -void filesystem_writer_::write(section_type type, - compression_type compression, - folly::ByteRange range) { - section_header_v2 sh; - ::memcpy(&sh.magic[0], "DWARFS", 6); - sh.major = MAJOR_VERSION; - sh.minor = MINOR_VERSION; - sh.number = section_number_++; - sh.type = static_cast(type); - sh.compression = static_cast(compression); - sh.length = range.size(); +void filesystem_writer_::write(fsblock const& fsb) { + write(fsb.header()); + write(fsb.data()); - checksum xxh(checksum::algorithm::XXH3_64); - xxh.update(&sh.number, - sizeof(section_header_v2) - offsetof(section_header_v2, number)); - xxh.update(range.data(), range.size()); - DWARFS_CHECK(xxh.finalize(&sh.xxh3_64), "XXH3-64 checksum failed"); - - checksum sha(checksum::algorithm::SHA2_512_256); - sha.update(&sh.xxh3_64, - sizeof(section_header_v2) - offsetof(section_header_v2, xxh3_64)); - sha.update(range.data(), range.size()); - DWARFS_CHECK(sha.finalize(&sh.sha2_512_256), "SHA512/256 checksum failed"); - - write(sh); - write(range); - - if (type == section_type::BLOCK) { + if (fsb.type() == section_type::BLOCK) { prog_.blocks_written++; } } @@ -400,7 +395,7 @@ void filesystem_writer_::write_section( } auto fsb = - std::make_unique(LOG_GET_LOGGER, type, bc, std::move(data)); + std::make_unique(type, bc, std::move(data), section_number_++); fsb->compress(wg_); @@ -415,7 +410,10 @@ void filesystem_writer_::write_section( template void filesystem_writer_::write_compressed_section( section_type type, compression_type compression, folly::ByteRange data) { - auto fsb = std::make_unique(type, compression, data); + auto fsb = + std::make_unique(type, compression, data, section_number_++); + + fsb->compress(wg_); { std::lock_guard lock(mx_);