Reuse zstd contexts (appears to give some speedup)

This commit is contained in:
Marcus Holland-Moritz 2021-03-23 17:44:49 +01:00
parent a2b373165f
commit 423c7ea3e8
2 changed files with 81 additions and 100 deletions

View File

@ -40,7 +40,7 @@ enum class compression_type : uint8_t {
class block_compressor {
public:
block_compressor(const std::string& spec, size_t block_size = 0);
block_compressor(const std::string& spec);
block_compressor(const block_compressor& bc)
: impl_(bc.impl_->clone()) {}
@ -56,12 +56,6 @@ class block_compressor {
return impl_->compress(std::move(data));
}
void append(const uint8_t* data, size_t size, bool last) {
impl_->append(data, size, last);
}
std::vector<uint8_t> move_data() { return impl_->move_data(); }
compression_type type() const { return impl_->type(); }
class impl {
@ -70,15 +64,11 @@ class block_compressor {
virtual std::unique_ptr<impl> clone() const = 0;
// TODO: obsolete
virtual std::vector<uint8_t>
compress(const std::vector<uint8_t>& data) const = 0;
virtual std::vector<uint8_t>
compress(std::vector<uint8_t>&& data) const = 0;
virtual void append(const uint8_t* data, size_t size, bool last) = 0;
virtual std::vector<uint8_t> move_data() = 0;
virtual compression_type type() const = 0;
};

View File

@ -132,8 +132,7 @@ std::unordered_map<lzma_ret, char const*> const lzma_error_desc{
class lzma_block_compressor final : public block_compressor::impl {
public:
lzma_block_compressor(unsigned level, bool extreme,
const std::string& binary_mode, unsigned dict_size,
size_t block_size);
const std::string& binary_mode, unsigned dict_size);
lzma_block_compressor(const lzma_block_compressor& rhs) = default;
std::unique_ptr<block_compressor::impl> clone() const override {
@ -146,10 +145,6 @@ class lzma_block_compressor final : public block_compressor::impl {
return compress(data);
}
void append(const uint8_t* data, size_t size, bool last) override;
std::vector<uint8_t> move_data() override { return std::move(data_); }
compression_type type() const override { return compression_type::LZMA; }
private:
@ -188,8 +183,6 @@ class lzma_block_compressor final : public block_compressor::impl {
lzma_options_lzma opt_lzma_;
std::array<lzma_filter, 3> filters_;
lzma_stream stream_;
std::vector<uint8_t> data_;
};
#endif
@ -211,25 +204,13 @@ class null_block_compressor final : public block_compressor::impl {
return std::move(data);
}
void append(const uint8_t* data, size_t size, bool) override {
data_.insert(data_.end(), data, data + size);
}
std::vector<uint8_t> move_data() override { return std::move(data_); }
compression_type type() const override { return compression_type::NONE; }
private:
std::vector<uint8_t> data_;
};
#ifdef DWARFS_HAVE_LIBLZMA
lzma_block_compressor::lzma_block_compressor(unsigned level, bool extreme,
const std::string& binary_mode,
unsigned dict_size,
size_t block_size)
: stream_(LZMA_STREAM_INIT)
, data_(block_size > 0 ? lzma_stream_buffer_bound(block_size) : 0) {
unsigned dict_size) {
if (lzma_lzma_preset(&opt_lzma_, get_preset(level, extreme))) {
DWARFS_THROW(runtime_error, "unsupported preset, possibly a bug");
}
@ -244,45 +225,6 @@ lzma_block_compressor::lzma_block_compressor(unsigned level, bool extreme,
filters_[1].options = &opt_lzma_;
filters_[2].id = LZMA_VLI_UNKNOWN;
filters_[2].options = NULL;
if (block_size > 0) {
auto* filters = &filters_[filters_[0].id == LZMA_VLI_UNKNOWN];
if (lzma_stream_encoder(&stream_, filters, LZMA_CHECK_CRC64)) {
DWARFS_THROW(runtime_error, "lzma_stream_encoder");
}
stream_.next_out = data_.data();
stream_.avail_out = data_.size();
}
}
void lzma_block_compressor::append(const uint8_t* data, size_t size,
bool last) {
const lzma_action action = last ? LZMA_FINISH : LZMA_RUN;
stream_.next_in = data;
stream_.avail_in = size;
lzma_ret ret = lzma_code(&stream_, action);
if (ret == LZMA_STREAM_END) {
if (!last) {
DWARFS_THROW(runtime_error, "LZMA: unexpected stream end");
}
} else {
if (auto it = lzma_error_desc.find(ret); it != lzma_error_desc.end()) {
DWARFS_THROW(runtime_error, fmt::format("LZMA error: {}", it->second));
} else {
DWARFS_THROW(runtime_error, fmt::format("LZMA: unknown error {}", ret));
}
}
if (last) {
data_.resize(data_.size() - stream_.avail_out);
data_.shrink_to_fit();
lzma_end(&stream_);
}
}
std::vector<uint8_t>
@ -387,18 +329,9 @@ class lz4_block_compressor final : public block_compressor::impl {
return compress(data);
}
void append(const uint8_t* data, size_t size, bool) override {
data_.insert(data_.end(), data, data + size);
}
std::vector<uint8_t> move_data() override {
return compress(std::move(data_));
}
compression_type type() const override { return compression_type::LZ4; }
private:
std::vector<uint8_t> data_;
const int level_;
};
#endif
@ -407,14 +340,11 @@ class lz4_block_compressor final : public block_compressor::impl {
class zstd_block_compressor final : public block_compressor::impl {
public:
explicit zstd_block_compressor(int level)
: ctx_(ZSTD_createCCtx())
: ctxmgr_(get_context_manager())
, level_(level) {}
zstd_block_compressor(const zstd_block_compressor& rhs)
: ctx_(ZSTD_createCCtx())
, level_(rhs.level_) {}
~zstd_block_compressor() override { ZSTD_freeCCtx(ctx_); }
: level_(rhs.level_) {}
std::unique_ptr<block_compressor::impl> clone() const override {
return std::make_unique<zstd_block_compressor>(*this);
@ -427,34 +357,96 @@ class zstd_block_compressor final : public block_compressor::impl {
return compress(data);
}
void append(const uint8_t* data, size_t size, bool) override {
data_.insert(data_.end(), data, data + size);
}
std::vector<uint8_t> move_data() override {
return compress(std::move(data_));
}
compression_type type() const override { return compression_type::ZSTD; }
private:
ZSTD_CCtx* ctx_;
std::vector<uint8_t> data_;
class scoped_context;
class context_manager {
public:
context_manager() = default;
~context_manager() {
for (auto ctx : ctx_) {
ZSTD_freeCCtx(ctx);
}
}
private:
friend class scoped_context;
ZSTD_CCtx* acquire() {
std::lock_guard lock(mx_);
if (ctx_.empty()) {
return ZSTD_createCCtx();
}
auto ctx = ctx_.back();
ctx_.pop_back();
return ctx;
}
void release(ZSTD_CCtx* ctx) {
std::lock_guard lock(mx_);
ctx_.push_back(ctx);
}
std::mutex mx_;
std::vector<ZSTD_CCtx*> ctx_;
};
class scoped_context {
public:
scoped_context(context_manager& mgr)
: mgr_{&mgr}
, ctx_{mgr_->acquire()} {}
~scoped_context() { mgr_->release(ctx_); }
scoped_context(scoped_context const&) = delete;
scoped_context(scoped_context&&) = default;
scoped_context& operator=(scoped_context const&) = delete;
scoped_context& operator=(scoped_context&&) = default;
ZSTD_CCtx* get() const { return ctx_; }
private:
context_manager* mgr_;
ZSTD_CCtx* ctx_;
};
static std::shared_ptr<context_manager> get_context_manager() {
std::lock_guard lock(s_mx);
if (auto mgr = s_ctxmgr.lock()) {
return mgr;
}
auto mgr = std::make_shared<context_manager>();
s_ctxmgr = mgr;
return mgr;
}
static std::mutex s_mx;
static std::weak_ptr<context_manager> s_ctxmgr;
std::shared_ptr<context_manager> ctxmgr_;
const int level_;
};
std::mutex zstd_block_compressor::s_mx;
std::weak_ptr<zstd_block_compressor::context_manager>
zstd_block_compressor::s_ctxmgr;
std::vector<uint8_t>
zstd_block_compressor::compress(const std::vector<uint8_t>& data) const {
std::vector<uint8_t> compressed(ZSTD_compressBound(data.size()));
auto size = ZSTD_compress(compressed.data(), compressed.size(), data.data(),
data.size(), level_);
scoped_context ctx(*ctxmgr_);
auto size = ZSTD_compressCCtx(ctx.get(), compressed.data(), compressed.size(),
data.data(), data.size(), level_);
compressed.resize(size);
compressed.shrink_to_fit();
return compressed;
}
#endif
block_compressor::block_compressor(const std::string& spec, size_t block_size) {
block_compressor::block_compressor(const std::string& spec) {
option_map om(spec);
if (om.choice() == "null") {
@ -463,8 +455,7 @@ block_compressor::block_compressor(const std::string& spec, size_t block_size) {
} else if (om.choice() == "lzma") {
impl_ = std::make_unique<lzma_block_compressor>(
om.get<unsigned>("level", 9u), om.get<bool>("extreme", false),
om.get<std::string>("binary"), om.get<unsigned>("dict_size", 0u),
block_size);
om.get<std::string>("binary"), om.get<unsigned>("dict_size", 0u));
#endif
#ifdef DWARFS_HAVE_LIBLZ4
} else if (om.choice() == "lz4") {