refactor: new start_compression API for more flexible buffer handling

This commit is contained in:
Marcus Holland-Moritz 2025-03-31 12:01:46 +02:00
parent 1aef650477
commit 39950e085e
19 changed files with 258 additions and 228 deletions

View File

@ -61,6 +61,7 @@ add_library(
$<IF:${DWARFS_GIT_BUILD},${CMAKE_CURRENT_BINARY_DIR},${CMAKE_CURRENT_SOURCE_DIR}>/src/version.cpp $<IF:${DWARFS_GIT_BUILD},${CMAKE_CURRENT_BINARY_DIR},${CMAKE_CURRENT_SOURCE_DIR}>/src/version.cpp
src/compression/base.cpp
src/compression/null.cpp src/compression/null.cpp
src/compression/zstd.cpp src/compression/zstd.cpp
$<$<BOOL:${LIBLZMA_FOUND}>:src/compression/lzma.cpp> $<$<BOOL:${LIBLZMA_FOUND}>:src/compression/lzma.cpp>

View File

@ -35,9 +35,9 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include <dwarfs/byte_buffer.h>
#include <dwarfs/compression.h> #include <dwarfs/compression.h>
#include <dwarfs/compression_constraints.h> #include <dwarfs/compression_constraints.h>
#include <dwarfs/vector_byte_buffer.h>
namespace dwarfs { namespace dwarfs {
@ -109,8 +109,9 @@ class block_compressor {
class block_decompressor { class block_decompressor {
public: public:
block_decompressor(compression_type type, std::span<uint8_t const> data, block_decompressor(compression_type type, std::span<uint8_t const> data);
mutable_byte_buffer target);
shared_byte_buffer start_decompression(mutable_byte_buffer target);
bool decompress_frame(size_t frame_size = BUFSIZ) { bool decompress_frame(size_t frame_size = BUFSIZ) {
return impl_->decompress_frame(frame_size); return impl_->decompress_frame(frame_size);
@ -123,17 +124,13 @@ class block_decompressor {
std::optional<std::string> metadata() const { return impl_->metadata(); } std::optional<std::string> metadata() const { return impl_->metadata(); }
static shared_byte_buffer static shared_byte_buffer
decompress(compression_type type, std::span<uint8_t const> data) { decompress(compression_type type, std::span<uint8_t const> data);
auto target = vector_byte_buffer::create();
block_decompressor bd(type, data, target);
bd.decompress_frame(bd.uncompressed_size());
return target.share();
}
class impl { class impl {
public: public:
virtual ~impl() = default; virtual ~impl() = default;
virtual void start_decompression(mutable_byte_buffer target) = 0;
virtual bool decompress_frame(size_t frame_size) = 0; virtual bool decompress_frame(size_t frame_size) = 0;
virtual size_t uncompressed_size() const = 0; virtual size_t uncompressed_size() const = 0;
virtual std::optional<std::string> metadata() const = 0; virtual std::optional<std::string> metadata() const = 0;
@ -160,8 +157,7 @@ class compression_factory : public compression_info {
virtual std::unique_ptr<block_compressor::impl> virtual std::unique_ptr<block_compressor::impl>
make_compressor(option_map& om) const = 0; make_compressor(option_map& om) const = 0;
virtual std::unique_ptr<block_decompressor::impl> virtual std::unique_ptr<block_decompressor::impl>
make_decompressor(std::span<uint8_t const> data, make_decompressor(std::span<uint8_t const> data) const = 0;
mutable_byte_buffer target) const = 0;
}; };
namespace detail { namespace detail {
@ -178,8 +174,7 @@ class compression_registry {
std::unique_ptr<block_compressor::impl> std::unique_ptr<block_compressor::impl>
make_compressor(std::string_view spec) const; make_compressor(std::string_view spec) const;
std::unique_ptr<block_decompressor::impl> std::unique_ptr<block_decompressor::impl>
make_decompressor(compression_type type, std::span<uint8_t const> data, make_decompressor(compression_type type, std::span<uint8_t const> data) const;
mutable_byte_buffer target) const;
void for_each_algorithm( void for_each_algorithm(
std::function<void(compression_type, compression_info const&)> const& fn) std::function<void(compression_type, compression_info const&)> const& fn)

View File

@ -125,10 +125,14 @@ class mutable_byte_buffer {
public: public:
using value_type = uint8_t; using value_type = uint8_t;
mutable_byte_buffer() = default;
explicit mutable_byte_buffer( explicit mutable_byte_buffer(
std::shared_ptr<mutable_byte_buffer_interface> bb) std::shared_ptr<mutable_byte_buffer_interface> bb)
: bb_{std::move(bb)} {} : bb_{std::move(bb)} {}
explicit operator bool() const noexcept { return static_cast<bool>(bb_); }
uint8_t const* data() const { return bb_->data(); } uint8_t const* data() const { return bb_->data(); }
uint8_t* data() { return bb_->mutable_data(); } uint8_t* data() { return bb_->mutable_data(); }

View File

@ -31,6 +31,7 @@ class vector_byte_buffer {
public: public:
static mutable_byte_buffer create(); static mutable_byte_buffer create();
static mutable_byte_buffer create(size_t size); static mutable_byte_buffer create(size_t size);
static mutable_byte_buffer create_reserve(size_t size);
static mutable_byte_buffer create(std::string_view data); static mutable_byte_buffer create(std::string_view data);
static mutable_byte_buffer create(std::span<uint8_t const> data); static mutable_byte_buffer create(std::span<uint8_t const> data);
static mutable_byte_buffer create(std::vector<uint8_t>&& data); static mutable_byte_buffer create(std::vector<uint8_t>&& data);

View File

@ -33,6 +33,7 @@
#include <dwarfs/error.h> #include <dwarfs/error.h>
#include <dwarfs/fstypes.h> #include <dwarfs/fstypes.h>
#include <dwarfs/option_map.h> #include <dwarfs/option_map.h>
#include <dwarfs/vector_byte_buffer.h>
namespace dwarfs { namespace dwarfs {
@ -41,10 +42,25 @@ block_compressor::block_compressor(std::string const& spec) {
} }
block_decompressor::block_decompressor(compression_type type, block_decompressor::block_decompressor(compression_type type,
std::span<uint8_t const> data, std::span<uint8_t const> data) {
mutable_byte_buffer target) { impl_ = compression_registry::instance().make_decompressor(type, data);
impl_ = compression_registry::instance().make_decompressor(type, data, }
std::move(target));
shared_byte_buffer
block_decompressor::decompress(compression_type type,
std::span<uint8_t const> data) {
block_decompressor bd(type, data);
auto target = vector_byte_buffer::create_reserve(bd.uncompressed_size());
bd.start_decompression(target);
bd.decompress_frame(bd.uncompressed_size());
return target.share();
}
shared_byte_buffer
block_decompressor::start_decompression(mutable_byte_buffer target) {
impl_->start_decompression(target);
target.freeze_location();
return target.share();
} }
compression_registry& compression_registry::instance() { compression_registry& compression_registry::instance() {
@ -92,8 +108,7 @@ compression_registry::make_compressor(std::string_view spec) const {
std::unique_ptr<block_decompressor::impl> std::unique_ptr<block_decompressor::impl>
compression_registry::make_decompressor(compression_type type, compression_registry::make_decompressor(compression_type type,
std::span<uint8_t const> data, std::span<uint8_t const> data) const {
mutable_byte_buffer target) const {
auto fit = factories_.find(type); auto fit = factories_.find(type);
if (fit == factories_.end()) { if (fit == factories_.end()) {
@ -101,7 +116,7 @@ compression_registry::make_decompressor(compression_type type,
"unsupported compression type: " + get_compression_name(type)); "unsupported compression type: " + get_compression_name(type));
} }
return fit->second->make_decompressor(data, std::move(target)); return fit->second->make_decompressor(data);
} }
void compression_registry::for_each_algorithm( void compression_registry::for_each_algorithm(

50
src/compression/base.cpp Normal file
View File

@ -0,0 +1,50 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
*/
#include <fmt/format.h>
#include <dwarfs/error.h>
#include "base.h"
namespace dwarfs {
void block_decompressor_base::start_decompression(mutable_byte_buffer target) {
DWARFS_CHECK(!decompressed_, "decompression already started");
decompressed_ = std::move(target);
auto size = this->uncompressed_size();
try {
decompressed_.reserve(size);
} catch (std::bad_alloc const&) {
DWARFS_THROW(
runtime_error,
fmt::format("could not reserve {} bytes for decompressed block", size));
}
}
std::optional<std::string> block_decompressor_base::metadata() const {
return std::nullopt;
}
} // namespace dwarfs

35
src/compression/base.h Normal file
View File

@ -0,0 +1,35 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
*/
#include <dwarfs/block_compressor.h>
namespace dwarfs {
class block_decompressor_base : public block_decompressor::impl {
public:
void start_decompression(mutable_byte_buffer target) override;
std::optional<std::string> metadata() const override;
protected:
mutable_byte_buffer decompressed_;
};
} // namespace dwarfs

View File

@ -26,13 +26,14 @@
#include <fmt/format.h> #include <fmt/format.h>
#include <dwarfs/block_compressor.h>
#include <dwarfs/error.h> #include <dwarfs/error.h>
#include <dwarfs/fstypes.h> #include <dwarfs/fstypes.h>
#include <dwarfs/option_map.h> #include <dwarfs/option_map.h>
#include <dwarfs/varint.h> #include <dwarfs/varint.h>
#include <dwarfs/vector_byte_buffer.h> #include <dwarfs/vector_byte_buffer.h>
#include "base.h"
namespace dwarfs { namespace dwarfs {
namespace { namespace {
@ -87,14 +88,12 @@ class brotli_block_compressor final : public block_compressor::impl {
uint32_t const window_bits_; uint32_t const window_bits_;
}; };
class brotli_block_decompressor final : public block_decompressor::impl { class brotli_block_decompressor final : public block_decompressor_base {
public: public:
brotli_block_decompressor(std::span<uint8_t const> data, brotli_block_decompressor(std::span<uint8_t const> data)
mutable_byte_buffer target) : uncompressed_size_{varint::decode(data)}
: decompressed_{std::move(target)} , brotli_data_{data.data()}
, uncompressed_size_{varint::decode(data)} , brotli_size_{data.size()}
, data_{data.data()}
, size_{data.size()}
, decoder_{::BrotliDecoderCreateInstance(nullptr, nullptr, nullptr), , decoder_{::BrotliDecoderCreateInstance(nullptr, nullptr, nullptr),
&::BrotliDecoderDestroyInstance} { &::BrotliDecoderDestroyInstance} {
if (!decoder_) { if (!decoder_) {
@ -104,21 +103,13 @@ class brotli_block_decompressor final : public block_decompressor::impl {
BROTLI_DECODER_PARAM_LARGE_WINDOW, 1)) { BROTLI_DECODER_PARAM_LARGE_WINDOW, 1)) {
DWARFS_THROW(runtime_error, "could not set brotli decoder parameter"); DWARFS_THROW(runtime_error, "could not set brotli decoder parameter");
} }
try {
decompressed_.reserve(uncompressed_size_);
} catch (std::bad_alloc const&) {
DWARFS_THROW(
runtime_error,
fmt::format("could not reserve {} bytes for decompressed block",
uncompressed_size_));
}
} }
compression_type type() const override { return compression_type::BROTLI; } compression_type type() const override { return compression_type::BROTLI; }
std::optional<std::string> metadata() const override { return std::nullopt; }
bool decompress_frame(size_t frame_size) override { bool decompress_frame(size_t frame_size) override {
DWARFS_CHECK(decompressed_, "decompression not started");
size_t pos = decompressed_.size(); size_t pos = decompressed_.size();
if (pos + frame_size > uncompressed_size_) { if (pos + frame_size > uncompressed_size_) {
@ -132,8 +123,9 @@ class brotli_block_decompressor final : public block_decompressor::impl {
decompressed_.resize(pos + frame_size); decompressed_.resize(pos + frame_size);
uint8_t* next_out = decompressed_.data() + pos; uint8_t* next_out = decompressed_.data() + pos;
auto res = ::BrotliDecoderDecompressStream(decoder_.get(), &size_, &data_, auto res = ::BrotliDecoderDecompressStream(decoder_.get(), &brotli_size_,
&frame_size, &next_out, nullptr); &brotli_data_, &frame_size,
&next_out, nullptr);
if (res == BROTLI_DECODER_RESULT_ERROR) { if (res == BROTLI_DECODER_RESULT_ERROR) {
DWARFS_THROW(runtime_error, DWARFS_THROW(runtime_error,
@ -153,10 +145,9 @@ class brotli_block_decompressor final : public block_decompressor::impl {
::BrotliDecoderGetErrorCode(decoder_.get())); ::BrotliDecoderGetErrorCode(decoder_.get()));
} }
mutable_byte_buffer decompressed_;
size_t const uncompressed_size_; size_t const uncompressed_size_;
uint8_t const* data_; uint8_t const* brotli_data_;
size_t size_; size_t brotli_size_;
std::unique_ptr<BrotliDecoderState, decltype(BrotliDecoderDestroyInstance)*> std::unique_ptr<BrotliDecoderState, decltype(BrotliDecoderDestroyInstance)*>
decoder_; decoder_;
}; };
@ -199,9 +190,8 @@ class brotli_compression_factory : public compression_factory {
} }
std::unique_ptr<block_decompressor::impl> std::unique_ptr<block_decompressor::impl>
make_decompressor(std::span<uint8_t const> data, make_decompressor(std::span<uint8_t const> data) const override {
mutable_byte_buffer target) const override { return std::make_unique<brotli_block_decompressor>(data);
return std::make_unique<brotli_block_decompressor>(data, std::move(target));
} }
private: private:

View File

@ -32,7 +32,6 @@
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include <dwarfs/block_compressor.h>
#include <dwarfs/compression.h> #include <dwarfs/compression.h>
#include <dwarfs/error.h> #include <dwarfs/error.h>
#include <dwarfs/option_map.h> #include <dwarfs/option_map.h>
@ -42,6 +41,8 @@
#include <dwarfs/gen-cpp2/compression_types.h> #include <dwarfs/gen-cpp2/compression_types.h>
#include "base.h"
namespace dwarfs { namespace dwarfs {
namespace { namespace {
@ -98,10 +99,9 @@ class dwarfs_flac_stream_encoder final : public FLAC::Encoder::Stream {
class dwarfs_flac_stream_decoder final : public FLAC::Decoder::Stream { class dwarfs_flac_stream_decoder final : public FLAC::Decoder::Stream {
public: public:
dwarfs_flac_stream_decoder( dwarfs_flac_stream_decoder(
mutable_byte_buffer target, std::span<uint8_t const> data, std::span<uint8_t const> data,
thrift::compression::flac_block_header const& header) thrift::compression::flac_block_header const& header)
: target_{std::move(target)} : data_{data}
, data_{data}
, header_{header} , header_{header}
, bytes_per_sample_{(header_.flags().value() & kBytesPerSampleMask) + 1} , bytes_per_sample_{(header_.flags().value() & kBytesPerSampleMask) + 1}
, xfm_{header_.flags().value() & kFlagBigEndian , xfm_{header_.flags().value() & kFlagBigEndian
@ -115,6 +115,11 @@ class dwarfs_flac_stream_decoder final : public FLAC::Decoder::Stream {
: pcm_sample_padding::Msb, : pcm_sample_padding::Msb,
bytes_per_sample_, header_.bits_per_sample().value()} {} bytes_per_sample_, header_.bits_per_sample().value()} {}
void set_target(mutable_byte_buffer target) {
DWARFS_CHECK(!target_, "target buffer already set");
target_ = std::move(target);
}
// NOLINTBEGIN(cppcoreguidelines-avoid-c-arrays) // NOLINTBEGIN(cppcoreguidelines-avoid-c-arrays)
::FLAC__StreamDecoderReadStatus ::FLAC__StreamDecoderReadStatus
read_callback(FLAC__byte buffer[], size_t* bytes) override { read_callback(FLAC__byte buffer[], size_t* bytes) override {
@ -147,6 +152,8 @@ class dwarfs_flac_stream_decoder final : public FLAC::Decoder::Stream {
} }
} }
DWARFS_CHECK(target_, "target buffer not set");
auto pos = target_.size(); auto pos = target_.size();
size_t size = channels * samples * bytes_per_sample_; size_t size = channels * samples * bytes_per_sample_;
@ -384,15 +391,12 @@ class flac_block_compressor final : public block_compressor::impl {
bool const exhaustive_; bool const exhaustive_;
}; };
class flac_block_decompressor final : public block_decompressor::impl { class flac_block_decompressor final : public block_decompressor_base {
public: public:
flac_block_decompressor(std::span<uint8_t const> data, flac_block_decompressor(std::span<uint8_t const> data)
mutable_byte_buffer target) : uncompressed_size_{varint::decode(data)}
: decompressed_{std::move(target)}
, uncompressed_size_{varint::decode(data)}
, header_{decode_header(data)} , header_{decode_header(data)}
, decoder_{std::make_unique<dwarfs_flac_stream_decoder>(decompressed_, , decoder_{std::make_unique<dwarfs_flac_stream_decoder>(data, header_)} {
data, header_)} {
decoder_->set_md5_checking(false); decoder_->set_md5_checking(false);
decoder_->set_metadata_ignore_all(); decoder_->set_metadata_ignore_all();
@ -402,16 +406,11 @@ class flac_block_decompressor final : public block_decompressor::impl {
fmt::format("[FLAC] could not initialize decoder: {}", fmt::format("[FLAC] could not initialize decoder: {}",
FLAC__StreamDecoderInitStatusString[status])); FLAC__StreamDecoderInitStatusString[status]));
} }
}
try { void start_decompression(mutable_byte_buffer target) override {
decompressed_.reserve(uncompressed_size_); block_decompressor_base::start_decompression(std::move(target));
} catch (std::bad_alloc const&) { decoder_->set_target(decompressed_);
DWARFS_THROW(
runtime_error,
fmt::format(
"[FLAC] could not reserve {} bytes for decompressed block",
uncompressed_size_));
}
} }
compression_type type() const override { return compression_type::FLAC; } compression_type type() const override { return compression_type::FLAC; }
@ -430,6 +429,8 @@ class flac_block_decompressor final : public block_decompressor::impl {
} }
bool decompress_frame(size_t frame_size) override { bool decompress_frame(size_t frame_size) override {
DWARFS_CHECK(decompressed_, "decompression not started");
size_t pos = decompressed_.size(); size_t pos = decompressed_.size();
if (pos + frame_size > uncompressed_size_) { if (pos + frame_size > uncompressed_size_) {
@ -471,7 +472,6 @@ class flac_block_decompressor final : public block_decompressor::impl {
return hdr; return hdr;
} }
mutable_byte_buffer decompressed_;
size_t const uncompressed_size_; size_t const uncompressed_size_;
thrift::compression::flac_block_header const header_; thrift::compression::flac_block_header const header_;
std::unique_ptr<dwarfs_flac_stream_decoder> decoder_; std::unique_ptr<dwarfs_flac_stream_decoder> decoder_;
@ -508,9 +508,8 @@ class flac_compression_factory : public compression_factory {
} }
std::unique_ptr<block_decompressor::impl> std::unique_ptr<block_decompressor::impl>
make_decompressor(std::span<uint8_t const> data, make_decompressor(std::span<uint8_t const> data) const override {
mutable_byte_buffer target) const override { return std::make_unique<flac_block_decompressor>(data);
return std::make_unique<flac_block_decompressor>(data, std::move(target));
} }
private: private:

View File

@ -24,13 +24,14 @@
#include <fmt/format.h> #include <fmt/format.h>
#include <dwarfs/block_compressor.h>
#include <dwarfs/conv.h> #include <dwarfs/conv.h>
#include <dwarfs/error.h> #include <dwarfs/error.h>
#include <dwarfs/fstypes.h> #include <dwarfs/fstypes.h>
#include <dwarfs/option_map.h> #include <dwarfs/option_map.h>
#include <dwarfs/vector_byte_buffer.h> #include <dwarfs/vector_byte_buffer.h>
#include "base.h"
namespace dwarfs { namespace dwarfs {
namespace { namespace {
@ -107,28 +108,17 @@ class lz4_block_compressor final : public block_compressor::impl {
int const level_; int const level_;
}; };
class lz4_block_decompressor final : public block_decompressor::impl { class lz4_block_decompressor final : public block_decompressor_base {
public: public:
lz4_block_decompressor(std::span<uint8_t const> data, lz4_block_decompressor(std::span<uint8_t const> data)
mutable_byte_buffer target) : data_(data.subspan(sizeof(uint32_t)))
: decompressed_(std::move(target)) , uncompressed_size_(get_uncompressed_size(data.data())) {}
, data_(data.subspan(sizeof(uint32_t)))
, uncompressed_size_(get_uncompressed_size(data.data())) {
try {
decompressed_.reserve(uncompressed_size_);
} catch (std::bad_alloc const&) {
DWARFS_THROW(
runtime_error,
fmt::format("could not reserve {} bytes for decompressed block",
uncompressed_size_));
}
}
compression_type type() const override { return compression_type::LZ4; } compression_type type() const override { return compression_type::LZ4; }
std::optional<std::string> metadata() const override { return std::nullopt; }
bool decompress_frame(size_t) override { bool decompress_frame(size_t) override {
DWARFS_CHECK(decompressed_, "decompression not started");
if (!error_.empty()) { if (!error_.empty()) {
DWARFS_THROW(runtime_error, error_); DWARFS_THROW(runtime_error, error_);
} }
@ -158,7 +148,6 @@ class lz4_block_decompressor final : public block_decompressor::impl {
return size; return size;
} }
mutable_byte_buffer decompressed_;
std::span<uint8_t const> data_; std::span<uint8_t const> data_;
size_t const uncompressed_size_; size_t const uncompressed_size_;
std::string error_; std::string error_;
@ -188,9 +177,8 @@ class lz4_compression_factory : public compression_factory {
} }
std::unique_ptr<block_decompressor::impl> std::unique_ptr<block_decompressor::impl>
make_decompressor(std::span<uint8_t const> data, make_decompressor(std::span<uint8_t const> data) const override {
mutable_byte_buffer target) const override { return std::make_unique<lz4_block_decompressor>(data);
return std::make_unique<lz4_block_decompressor>(data, std::move(target));
} }
private: private:
@ -225,9 +213,8 @@ class lz4hc_compression_factory : public compression_factory {
} }
std::unique_ptr<block_decompressor::impl> std::unique_ptr<block_decompressor::impl>
make_decompressor(std::span<uint8_t const> data, make_decompressor(std::span<uint8_t const> data) const override {
mutable_byte_buffer target) const override { return std::make_unique<lz4_block_decompressor>(data);
return std::make_unique<lz4_block_decompressor>(data, std::move(target));
} }
private: private:

View File

@ -31,7 +31,6 @@
#include <range/v3/view/join.hpp> #include <range/v3/view/join.hpp>
#include <range/v3/view/map.hpp> #include <range/v3/view/map.hpp>
#include <dwarfs/block_compressor.h>
#include <dwarfs/error.h> #include <dwarfs/error.h>
#include <dwarfs/fstypes.h> #include <dwarfs/fstypes.h>
#include <dwarfs/option_map.h> #include <dwarfs/option_map.h>
@ -39,6 +38,8 @@
#include <dwarfs/types.h> #include <dwarfs/types.h>
#include <dwarfs/vector_byte_buffer.h> #include <dwarfs/vector_byte_buffer.h>
#include "base.h"
namespace dwarfs { namespace dwarfs {
namespace { namespace {
@ -260,12 +261,10 @@ lzma_block_compressor::compress(shared_byte_buffer const& data,
return best; return best;
} }
class lzma_block_decompressor final : public block_decompressor::impl { class lzma_block_decompressor final : public block_decompressor_base {
public: public:
lzma_block_decompressor(std::span<uint8_t const> data, lzma_block_decompressor(std::span<uint8_t const> data)
mutable_byte_buffer target)
: stream_(LZMA_STREAM_INIT) : stream_(LZMA_STREAM_INIT)
, decompressed_(std::move(target))
, uncompressed_size_(get_uncompressed_size(data.data(), data.size())) { , uncompressed_size_(get_uncompressed_size(data.data(), data.size())) {
stream_.next_in = data.data(); stream_.next_in = data.data();
stream_.avail_in = data.size(); stream_.avail_in = data.size();
@ -274,23 +273,15 @@ class lzma_block_decompressor final : public block_decompressor::impl {
DWARFS_THROW(runtime_error, fmt::format("lzma_stream_decoder: {}", DWARFS_THROW(runtime_error, fmt::format("lzma_stream_decoder: {}",
lzma_error_string(ret))); lzma_error_string(ret)));
} }
try {
decompressed_.reserve(uncompressed_size_);
} catch (std::bad_alloc const&) {
DWARFS_THROW(
runtime_error,
fmt::format("could not reserve {} bytes for decompressed block",
uncompressed_size_));
}
} }
~lzma_block_decompressor() override { lzma_end(&stream_); } ~lzma_block_decompressor() override { lzma_end(&stream_); }
compression_type type() const override { return compression_type::LZMA; } compression_type type() const override { return compression_type::LZMA; }
std::optional<std::string> metadata() const override { return std::nullopt; }
bool decompress_frame(size_t frame_size) override { bool decompress_frame(size_t frame_size) override {
DWARFS_CHECK(decompressed_, "decompression not started");
if (!error_.empty()) { if (!error_.empty()) {
DWARFS_THROW(runtime_error, error_); DWARFS_THROW(runtime_error, error_);
} }
@ -333,7 +324,6 @@ class lzma_block_decompressor final : public block_decompressor::impl {
static size_t get_uncompressed_size(uint8_t const* data, size_t size); static size_t get_uncompressed_size(uint8_t const* data, size_t size);
lzma_stream stream_; lzma_stream stream_;
mutable_byte_buffer decompressed_;
size_t const uncompressed_size_; size_t const uncompressed_size_;
std::string error_; std::string error_;
}; };
@ -426,9 +416,8 @@ class lzma_compression_factory : public compression_factory {
} }
std::unique_ptr<block_decompressor::impl> std::unique_ptr<block_decompressor::impl>
make_decompressor(std::span<uint8_t const> data, make_decompressor(std::span<uint8_t const> data) const override {
mutable_byte_buffer target) const override { return std::make_unique<lzma_block_decompressor>(data);
return std::make_unique<lzma_block_decompressor>(data, std::move(target));
} }
private: private:

View File

@ -23,11 +23,12 @@
#include <fmt/format.h> #include <fmt/format.h>
#include <dwarfs/block_compressor.h>
#include <dwarfs/error.h> #include <dwarfs/error.h>
#include <dwarfs/fstypes.h> #include <dwarfs/fstypes.h>
#include <dwarfs/option_map.h> #include <dwarfs/option_map.h>
#include "base.h"
namespace dwarfs { namespace dwarfs {
namespace { namespace {
@ -58,28 +59,16 @@ class null_block_compressor final : public block_compressor::impl {
} }
}; };
class null_block_decompressor final : public block_decompressor::impl { class null_block_decompressor final : public block_decompressor_base {
public: public:
null_block_decompressor(std::span<uint8_t const> data, null_block_decompressor(std::span<uint8_t const> data)
mutable_byte_buffer target) : data_(data) {}
: decompressed_(std::move(target))
, data_(data) {
// TODO: we shouldn't have to copy this to memory at all...
try {
decompressed_.reserve(data.size());
} catch (std::bad_alloc const&) {
DWARFS_THROW(
runtime_error,
fmt::format("could not reserve {} bytes for decompressed block",
data.size()));
}
}
compression_type type() const override { return compression_type::NONE; } compression_type type() const override { return compression_type::NONE; }
std::optional<std::string> metadata() const override { return std::nullopt; }
bool decompress_frame(size_t frame_size) override { bool decompress_frame(size_t frame_size) override {
DWARFS_CHECK(decompressed_, "decompression not started");
if (decompressed_.size() + frame_size > data_.size()) { if (decompressed_.size() + frame_size > data_.size()) {
frame_size = data_.size() - decompressed_.size(); frame_size = data_.size() - decompressed_.size();
} }
@ -98,7 +87,6 @@ class null_block_decompressor final : public block_decompressor::impl {
size_t uncompressed_size() const override { return data_.size(); } size_t uncompressed_size() const override { return data_.size(); }
private: private:
mutable_byte_buffer decompressed_;
std::span<uint8_t const> data_; std::span<uint8_t const> data_;
}; };
@ -122,9 +110,8 @@ class null_compression_factory : public compression_factory {
} }
std::unique_ptr<block_decompressor::impl> std::unique_ptr<block_decompressor::impl>
make_decompressor(std::span<uint8_t const> data, make_decompressor(std::span<uint8_t const> data) const override {
mutable_byte_buffer target) const override { return std::make_unique<null_block_decompressor>(data);
return std::make_unique<null_block_decompressor>(data, std::move(target));
} }
private: private:

View File

@ -27,14 +27,16 @@
#include <ricepp/ricepp.h> #include <ricepp/ricepp.h>
#include <dwarfs/block_compressor.h>
#include <dwarfs/compression.h> #include <dwarfs/compression.h>
#include <dwarfs/error.h> #include <dwarfs/error.h>
#include <dwarfs/option_map.h> #include <dwarfs/option_map.h>
#include <dwarfs/varint.h> #include <dwarfs/varint.h>
#include <dwarfs/vector_byte_buffer.h>
#include <dwarfs/gen-cpp2/compression_types.h> #include <dwarfs/gen-cpp2/compression_types.h>
#include "base.h"
namespace dwarfs { namespace dwarfs {
namespace { namespace {
@ -165,12 +167,10 @@ class ricepp_block_compressor final : public block_compressor::impl {
size_t const block_size_; size_t const block_size_;
}; };
class ricepp_block_decompressor final : public block_decompressor::impl { class ricepp_block_decompressor final : public block_decompressor_base {
public: public:
ricepp_block_decompressor(std::span<uint8_t const> data, ricepp_block_decompressor(std::span<uint8_t const> data)
mutable_byte_buffer target) : uncompressed_size_{varint::decode(data)}
: decompressed_{std::move(target)}
, uncompressed_size_{varint::decode(data)}
, header_{decode_header(data)} , header_{decode_header(data)}
, data_{data} , data_{data}
, codec_{ricepp::create_codec<uint16_t>( , codec_{ricepp::create_codec<uint16_t>(
@ -184,16 +184,6 @@ class ricepp_block_decompressor final : public block_decompressor::impl {
fmt::format("[RICEPP] unsupported bytes per sample: {}", fmt::format("[RICEPP] unsupported bytes per sample: {}",
header_.bytes_per_sample().value())); header_.bytes_per_sample().value()));
} }
try {
decompressed_.reserve(uncompressed_size_);
} catch (std::bad_alloc const&) {
DWARFS_THROW(
runtime_error,
fmt::format(
"[RICEPP] could not reserve {} bytes for decompressed block",
uncompressed_size_));
}
} }
compression_type type() const override { return compression_type::RICEPP; } compression_type type() const override { return compression_type::RICEPP; }
@ -209,6 +199,8 @@ class ricepp_block_decompressor final : public block_decompressor::impl {
} }
bool decompress_frame(size_t) override { bool decompress_frame(size_t) override {
DWARFS_CHECK(decompressed_, "decompression not started");
if (!codec_) { if (!codec_) {
return false; return false;
} }
@ -243,7 +235,6 @@ class ricepp_block_decompressor final : public block_decompressor::impl {
return hdr; return hdr;
} }
mutable_byte_buffer decompressed_;
size_t const uncompressed_size_; size_t const uncompressed_size_;
thrift::compression::ricepp_block_header const header_; thrift::compression::ricepp_block_header const header_;
std::span<uint8_t const> data_; std::span<uint8_t const> data_;
@ -277,9 +268,8 @@ class ricepp_compression_factory : public compression_factory {
} }
std::unique_ptr<block_decompressor::impl> std::unique_ptr<block_decompressor::impl>
make_decompressor(std::span<uint8_t const> data, make_decompressor(std::span<uint8_t const> data) const override {
mutable_byte_buffer target) const override { return std::make_unique<ricepp_block_decompressor>(data);
return std::make_unique<ricepp_block_decompressor>(data, std::move(target));
} }
private: private:

View File

@ -25,13 +25,14 @@
#include <fmt/format.h> #include <fmt/format.h>
#include <dwarfs/block_compressor.h>
#include <dwarfs/error.h> #include <dwarfs/error.h>
#include <dwarfs/fstypes.h> #include <dwarfs/fstypes.h>
#include <dwarfs/option_map.h> #include <dwarfs/option_map.h>
#include <dwarfs/vector_byte_buffer.h> #include <dwarfs/vector_byte_buffer.h>
#include <dwarfs/zstd_context_manager.h> #include <dwarfs/zstd_context_manager.h>
#include "base.h"
#if ZSTD_VERSION_MAJOR > 1 || \ #if ZSTD_VERSION_MAJOR > 1 || \
(ZSTD_VERSION_MAJOR == 1 && ZSTD_VERSION_MINOR >= 4) (ZSTD_VERSION_MAJOR == 1 && ZSTD_VERSION_MINOR >= 4)
#define ZSTD_MIN_LEVEL ZSTD_minCLevel() #define ZSTD_MIN_LEVEL ZSTD_minCLevel()
@ -108,12 +109,10 @@ zstd_block_compressor::compress(shared_byte_buffer const& data,
return compressed.share(); return compressed.share();
} }
class zstd_block_decompressor final : public block_decompressor::impl { class zstd_block_decompressor final : public block_decompressor_base {
public: public:
zstd_block_decompressor(std::span<uint8_t const> data, zstd_block_decompressor(std::span<uint8_t const> data)
mutable_byte_buffer target) : data_(data)
: decompressed_(std::move(target))
, data_(data)
, uncompressed_size_(ZSTD_getFrameContentSize(data.data(), data.size())) { , uncompressed_size_(ZSTD_getFrameContentSize(data.data(), data.size())) {
switch (uncompressed_size_) { switch (uncompressed_size_) {
case ZSTD_CONTENTSIZE_UNKNOWN: case ZSTD_CONTENTSIZE_UNKNOWN:
@ -127,22 +126,13 @@ class zstd_block_decompressor final : public block_decompressor::impl {
default: default:
break; break;
} }
try {
decompressed_.reserve(uncompressed_size_);
} catch (std::bad_alloc const&) {
DWARFS_THROW(
runtime_error,
fmt::format("could not reserve {} bytes for decompressed block",
uncompressed_size_));
}
} }
compression_type type() const override { return compression_type::ZSTD; } compression_type type() const override { return compression_type::ZSTD; }
std::optional<std::string> metadata() const override { return std::nullopt; }
bool decompress_frame(size_t /*frame_size*/) override { bool decompress_frame(size_t /*frame_size*/) override {
DWARFS_CHECK(decompressed_, "decompression not started");
if (!error_.empty()) { if (!error_.empty()) {
DWARFS_THROW(runtime_error, error_); DWARFS_THROW(runtime_error, error_);
} }
@ -163,7 +153,6 @@ class zstd_block_decompressor final : public block_decompressor::impl {
size_t uncompressed_size() const override { return uncompressed_size_; } size_t uncompressed_size() const override { return uncompressed_size_; }
private: private:
mutable_byte_buffer decompressed_;
std::span<uint8_t const> data_; std::span<uint8_t const> data_;
unsigned long long const uncompressed_size_; unsigned long long const uncompressed_size_;
std::string error_; std::string error_;
@ -198,9 +187,8 @@ class zstd_compression_factory : public compression_factory {
} }
std::unique_ptr<block_decompressor::impl> std::unique_ptr<block_decompressor::impl>
make_decompressor(std::span<uint8_t const> data, make_decompressor(std::span<uint8_t const> data) const override {
mutable_byte_buffer target) const override { return std::make_unique<zstd_block_decompressor>(data);
return std::make_unique<zstd_block_decompressor>(data, std::move(target));
} }
private: private:

View File

@ -103,9 +103,7 @@ block_decompressor get_block_decompressor(mmif& mm, fs_section const& sec) {
fmt::format("attempt to access damaged {} section", sec.name())); fmt::format("attempt to access damaged {} section", sec.name()));
} }
auto tmp = vector_byte_buffer::create(); return {sec.compression(), sec.data(mm)};
auto span = sec.data(mm);
return {sec.compression(), span, tmp};
} }
std::optional<block_decompressor> std::optional<block_decompressor>

View File

@ -47,9 +47,9 @@ class cached_block_ final : public cached_block {
cached_block_(logger& lgr, fs_section const& b, std::shared_ptr<mmif> mm, cached_block_(logger& lgr, fs_section const& b, std::shared_ptr<mmif> mm,
bool release, bool disable_integrity_check) bool release, bool disable_integrity_check)
: data_{vector_byte_buffer::create()} : decompressor_{std::make_unique<block_decompressor>(
, decompressor_{std::make_unique<block_decompressor>( b.compression(), mm->span<uint8_t>(b.start(), b.length()))}
b.compression(), mm->span<uint8_t>(b.start(), b.length()), data_)} , data_{decompressor_->start_decompression(vector_byte_buffer::create())}
, mm_(std::move(mm)) , mm_(std::move(mm))
, section_(b) , section_(b)
, LOG_PROXY_INIT(lgr) , LOG_PROXY_INIT(lgr)
@ -101,11 +101,6 @@ class cached_block_ final : public cached_block {
try_release(); try_release();
} }
if (pos == 0) {
// Freeze the location of the data buffer
data_.freeze_location();
}
pos = data_.size(); pos = data_.size();
range_end_.store(pos, std::memory_order_release); range_end_.store(pos, std::memory_order_release);
} }
@ -149,8 +144,8 @@ class cached_block_ final : public cached_block {
} }
std::atomic<size_t> range_end_{0}; std::atomic<size_t> range_end_{0};
mutable_byte_buffer data_;
std::unique_ptr<block_decompressor> decompressor_; std::unique_ptr<block_decompressor> decompressor_;
shared_byte_buffer data_;
std::shared_ptr<mmif> mm_; std::shared_ptr<mmif> mm_;
fs_section section_; fs_section section_;
LOG_PROXY_DECL(LoggerPolicy); LOG_PROXY_DECL(LoggerPolicy);

View File

@ -116,6 +116,12 @@ mutable_byte_buffer vector_byte_buffer::create(size_t size) {
return mutable_byte_buffer{std::make_shared<vector_byte_buffer_impl>(size)}; return mutable_byte_buffer{std::make_shared<vector_byte_buffer_impl>(size)};
} }
mutable_byte_buffer vector_byte_buffer::create_reserve(size_t size) {
auto rv = std::make_shared<vector_byte_buffer_impl>();
rv->reserve(size);
return mutable_byte_buffer{std::move(rv)};
}
mutable_byte_buffer vector_byte_buffer::create(std::string_view data) { mutable_byte_buffer vector_byte_buffer::create(std::string_view data) {
return mutable_byte_buffer{std::make_shared<vector_byte_buffer_impl>(data)}; return mutable_byte_buffer{std::make_shared<vector_byte_buffer_impl>(data)};
} }

View File

@ -385,46 +385,46 @@ class rewritten_fsblock : public fsblock::impl {
std::promise<void> prom; std::promise<void> prom;
future_ = prom.get_future(); future_ = prom.get_future();
wg.add_job( wg.add_job([this, prom = std::move(prom),
[this, prom = std::move(prom), meta = std::move(meta)]() mutable { meta = std::move(meta)]() mutable {
try { try {
shared_byte_buffer block; shared_byte_buffer block;
{ {
// TODO: we don't have to do this for uncompressed blocks // TODO: we don't have to do this for uncompressed blocks
auto buffer = vector_byte_buffer::create(); block_decompressor bd(data_comp_type_, data_);
block_decompressor bd(data_comp_type_, data_, buffer); auto buffer = bd.start_decompression(vector_byte_buffer::create());
bd.decompress_frame(bd.uncompressed_size()); bd.decompress_frame(bd.uncompressed_size());
if (!meta) { if (!meta) {
meta = bd.metadata(); meta = bd.metadata();
}
pctx_->bytes_in += buffer.size(); // TODO: data_.size()?
try {
if (meta) {
block = bc_.compress(buffer.share(), *meta);
} else {
block = bc_.compress(buffer.share());
}
} catch (bad_compression_ratio_error const&) {
comp_type_ = compression_type::NONE;
}
}
pctx_->bytes_out += block.size();
{
std::lock_guard lock(mx_);
block_data_.emplace(std::move(block));
}
prom.set_value();
} catch (...) {
prom.set_exception(std::current_exception());
} }
});
pctx_->bytes_in += buffer.size(); // TODO: data_.size()?
try {
if (meta) {
block = bc_.compress(buffer, *meta);
} else {
block = bc_.compress(buffer);
}
} catch (bad_compression_ratio_error const&) {
comp_type_ = compression_type::NONE;
}
}
pctx_->bytes_out += block.size();
{
std::lock_guard lock(mx_);
block_data_.emplace(std::move(block));
}
prom.set_value();
} catch (...) {
prom.set_exception(std::current_exception());
}
});
} }
void wait_until_compressed() override { future_.get(); } void wait_until_compressed() override { future_.get(); }
@ -894,8 +894,7 @@ void filesystem_writer_<LoggerPolicy>::check_block_compression(
if (auto reqstr = bc->metadata_requirements(); !reqstr.empty()) { if (auto reqstr = bc->metadata_requirements(); !reqstr.empty()) {
auto req = compression_metadata_requirements<nlohmann::json>{reqstr}; auto req = compression_metadata_requirements<nlohmann::json>{reqstr};
auto tmp = vector_byte_buffer::create(); block_decompressor bd(compression, data);
block_decompressor bd(compression, data, tmp);
try { try {
req.check(bd.metadata()); req.check(bd.metadata());

View File

@ -28,6 +28,7 @@
#include <dwarfs/block_compressor.h> #include <dwarfs/block_compressor.h>
#include <dwarfs/pcm_sample_transformer.h> #include <dwarfs/pcm_sample_transformer.h>
#include <dwarfs/vector_byte_buffer.h>
using namespace dwarfs; using namespace dwarfs;