From 423c7ea3e84ef54c16168d0d72496dff12da4d16 Mon Sep 17 00:00:00 2001 From: Marcus Holland-Moritz Date: Tue, 23 Mar 2021 17:44:49 +0100 Subject: [PATCH] Reuse zstd contexts (appears to give some speedup) --- include/dwarfs/block_compressor.h | 12 +-- src/dwarfs/block_compressor.cpp | 169 ++++++++++++++---------------- 2 files changed, 81 insertions(+), 100 deletions(-) diff --git a/include/dwarfs/block_compressor.h b/include/dwarfs/block_compressor.h index b53f68a8..ba71d4dc 100644 --- a/include/dwarfs/block_compressor.h +++ b/include/dwarfs/block_compressor.h @@ -40,7 +40,7 @@ enum class compression_type : uint8_t { class block_compressor { public: - block_compressor(const std::string& spec, size_t block_size = 0); + block_compressor(const std::string& spec); block_compressor(const block_compressor& bc) : impl_(bc.impl_->clone()) {} @@ -56,12 +56,6 @@ class block_compressor { return impl_->compress(std::move(data)); } - void append(const uint8_t* data, size_t size, bool last) { - impl_->append(data, size, last); - } - - std::vector move_data() { return impl_->move_data(); } - compression_type type() const { return impl_->type(); } class impl { @@ -70,15 +64,11 @@ class block_compressor { virtual std::unique_ptr clone() const = 0; - // TODO: obsolete virtual std::vector compress(const std::vector& data) const = 0; virtual std::vector compress(std::vector&& data) const = 0; - virtual void append(const uint8_t* data, size_t size, bool last) = 0; - virtual std::vector move_data() = 0; - virtual compression_type type() const = 0; }; diff --git a/src/dwarfs/block_compressor.cpp b/src/dwarfs/block_compressor.cpp index 0d7d867f..32c450fc 100644 --- a/src/dwarfs/block_compressor.cpp +++ b/src/dwarfs/block_compressor.cpp @@ -132,8 +132,7 @@ std::unordered_map const lzma_error_desc{ class lzma_block_compressor final : public block_compressor::impl { public: lzma_block_compressor(unsigned level, bool extreme, - const std::string& binary_mode, unsigned dict_size, - size_t block_size); + const std::string& binary_mode, unsigned dict_size); lzma_block_compressor(const lzma_block_compressor& rhs) = default; std::unique_ptr clone() const override { @@ -146,10 +145,6 @@ class lzma_block_compressor final : public block_compressor::impl { return compress(data); } - void append(const uint8_t* data, size_t size, bool last) override; - - std::vector move_data() override { return std::move(data_); } - compression_type type() const override { return compression_type::LZMA; } private: @@ -188,8 +183,6 @@ class lzma_block_compressor final : public block_compressor::impl { lzma_options_lzma opt_lzma_; std::array filters_; - lzma_stream stream_; - std::vector data_; }; #endif @@ -211,25 +204,13 @@ class null_block_compressor final : public block_compressor::impl { return std::move(data); } - void append(const uint8_t* data, size_t size, bool) override { - data_.insert(data_.end(), data, data + size); - } - - std::vector move_data() override { return std::move(data_); } - compression_type type() const override { return compression_type::NONE; } - - private: - std::vector data_; }; #ifdef DWARFS_HAVE_LIBLZMA lzma_block_compressor::lzma_block_compressor(unsigned level, bool extreme, const std::string& binary_mode, - unsigned dict_size, - size_t block_size) - : stream_(LZMA_STREAM_INIT) - , data_(block_size > 0 ? lzma_stream_buffer_bound(block_size) : 0) { + unsigned dict_size) { if (lzma_lzma_preset(&opt_lzma_, get_preset(level, extreme))) { DWARFS_THROW(runtime_error, "unsupported preset, possibly a bug"); } @@ -244,45 +225,6 @@ lzma_block_compressor::lzma_block_compressor(unsigned level, bool extreme, filters_[1].options = &opt_lzma_; filters_[2].id = LZMA_VLI_UNKNOWN; filters_[2].options = NULL; - - if (block_size > 0) { - auto* filters = &filters_[filters_[0].id == LZMA_VLI_UNKNOWN]; - - if (lzma_stream_encoder(&stream_, filters, LZMA_CHECK_CRC64)) { - DWARFS_THROW(runtime_error, "lzma_stream_encoder"); - } - - stream_.next_out = data_.data(); - stream_.avail_out = data_.size(); - } -} - -void lzma_block_compressor::append(const uint8_t* data, size_t size, - bool last) { - const lzma_action action = last ? LZMA_FINISH : LZMA_RUN; - - stream_.next_in = data; - stream_.avail_in = size; - - lzma_ret ret = lzma_code(&stream_, action); - - if (ret == LZMA_STREAM_END) { - if (!last) { - DWARFS_THROW(runtime_error, "LZMA: unexpected stream end"); - } - } else { - if (auto it = lzma_error_desc.find(ret); it != lzma_error_desc.end()) { - DWARFS_THROW(runtime_error, fmt::format("LZMA error: {}", it->second)); - } else { - DWARFS_THROW(runtime_error, fmt::format("LZMA: unknown error {}", ret)); - } - } - - if (last) { - data_.resize(data_.size() - stream_.avail_out); - data_.shrink_to_fit(); - lzma_end(&stream_); - } } std::vector @@ -387,18 +329,9 @@ class lz4_block_compressor final : public block_compressor::impl { return compress(data); } - void append(const uint8_t* data, size_t size, bool) override { - data_.insert(data_.end(), data, data + size); - } - - std::vector move_data() override { - return compress(std::move(data_)); - } - compression_type type() const override { return compression_type::LZ4; } private: - std::vector data_; const int level_; }; #endif @@ -407,14 +340,11 @@ class lz4_block_compressor final : public block_compressor::impl { class zstd_block_compressor final : public block_compressor::impl { public: explicit zstd_block_compressor(int level) - : ctx_(ZSTD_createCCtx()) + : ctxmgr_(get_context_manager()) , level_(level) {} zstd_block_compressor(const zstd_block_compressor& rhs) - : ctx_(ZSTD_createCCtx()) - , level_(rhs.level_) {} - - ~zstd_block_compressor() override { ZSTD_freeCCtx(ctx_); } + : level_(rhs.level_) {} std::unique_ptr clone() const override { return std::make_unique(*this); @@ -427,34 +357,96 @@ class zstd_block_compressor final : public block_compressor::impl { return compress(data); } - void append(const uint8_t* data, size_t size, bool) override { - data_.insert(data_.end(), data, data + size); - } - - std::vector move_data() override { - return compress(std::move(data_)); - } - compression_type type() const override { return compression_type::ZSTD; } private: - ZSTD_CCtx* ctx_; - std::vector data_; + class scoped_context; + + class context_manager { + public: + context_manager() = default; + + ~context_manager() { + for (auto ctx : ctx_) { + ZSTD_freeCCtx(ctx); + } + } + + private: + friend class scoped_context; + + ZSTD_CCtx* acquire() { + std::lock_guard lock(mx_); + if (ctx_.empty()) { + return ZSTD_createCCtx(); + } + auto ctx = ctx_.back(); + ctx_.pop_back(); + return ctx; + } + + void release(ZSTD_CCtx* ctx) { + std::lock_guard lock(mx_); + ctx_.push_back(ctx); + } + + std::mutex mx_; + std::vector ctx_; + }; + + class scoped_context { + public: + scoped_context(context_manager& mgr) + : mgr_{&mgr} + , ctx_{mgr_->acquire()} {} + ~scoped_context() { mgr_->release(ctx_); } + + scoped_context(scoped_context const&) = delete; + scoped_context(scoped_context&&) = default; + scoped_context& operator=(scoped_context const&) = delete; + scoped_context& operator=(scoped_context&&) = default; + + ZSTD_CCtx* get() const { return ctx_; } + + private: + context_manager* mgr_; + ZSTD_CCtx* ctx_; + }; + + static std::shared_ptr get_context_manager() { + std::lock_guard lock(s_mx); + if (auto mgr = s_ctxmgr.lock()) { + return mgr; + } + auto mgr = std::make_shared(); + s_ctxmgr = mgr; + return mgr; + } + + static std::mutex s_mx; + static std::weak_ptr s_ctxmgr; + + std::shared_ptr ctxmgr_; const int level_; }; +std::mutex zstd_block_compressor::s_mx; +std::weak_ptr + zstd_block_compressor::s_ctxmgr; + std::vector zstd_block_compressor::compress(const std::vector& data) const { std::vector compressed(ZSTD_compressBound(data.size())); - auto size = ZSTD_compress(compressed.data(), compressed.size(), data.data(), - data.size(), level_); + scoped_context ctx(*ctxmgr_); + auto size = ZSTD_compressCCtx(ctx.get(), compressed.data(), compressed.size(), + data.data(), data.size(), level_); compressed.resize(size); compressed.shrink_to_fit(); return compressed; } #endif -block_compressor::block_compressor(const std::string& spec, size_t block_size) { +block_compressor::block_compressor(const std::string& spec) { option_map om(spec); if (om.choice() == "null") { @@ -463,8 +455,7 @@ block_compressor::block_compressor(const std::string& spec, size_t block_size) { } else if (om.choice() == "lzma") { impl_ = std::make_unique( om.get("level", 9u), om.get("extreme", false), - om.get("binary"), om.get("dict_size", 0u), - block_size); + om.get("binary"), om.get("dict_size", 0u)); #endif #ifdef DWARFS_HAVE_LIBLZ4 } else if (om.choice() == "lz4") {