wip: block_compressor / memory_manager

This commit is contained in:
Marcus Holland-Moritz 2025-05-27 22:38:51 +02:00
parent 08d97e79ce
commit 76aa5d40e5
10 changed files with 114 additions and 49 deletions

View File

@ -38,6 +38,8 @@
namespace dwarfs {
class memory_manager;
class bad_compression_ratio_error : public std::runtime_error {
public:
bad_compression_ratio_error()
@ -56,13 +58,15 @@ class block_compressor {
block_compressor(block_compressor&& bc) = default;
block_compressor& operator=(block_compressor&& rhs) = default;
shared_byte_buffer compress(shared_byte_buffer const& data) const {
return impl_->compress(data, nullptr);
shared_byte_buffer compress(shared_byte_buffer const& data,
memory_manager* memmgr = nullptr) const {
return impl_->compress(data, nullptr, memmgr);
}
shared_byte_buffer
compress(shared_byte_buffer const& data, std::string const& metadata) const {
return impl_->compress(data, &metadata);
compress(shared_byte_buffer const& data, std::string const& metadata,
memory_manager* memmgr = nullptr) const {
return impl_->compress(data, &metadata, memmgr);
}
compression_type type() const { return impl_->type(); }
@ -90,8 +94,9 @@ class block_compressor {
virtual std::unique_ptr<impl> clone() const = 0;
virtual shared_byte_buffer compress(shared_byte_buffer const& data,
std::string const* metadata) const = 0;
virtual shared_byte_buffer
compress(shared_byte_buffer const& data, std::string const* metadata,
memory_manager* memmgr) const = 0;
virtual compression_type type() const = 0;
virtual std::string describe() const = 0;

View File

@ -143,30 +143,7 @@ class memory_manager : public std::enable_shared_from_this<memory_manager> {
}
std::string status() const {
struct request_info {
size_t active_size{0};
size_t active_count{0};
size_t pending_size{0};
size_t pending_count{0};
};
std::unordered_map<std::string_view, request_info> requests;
{
std::lock_guard lock(mutex_);
for (auto const& [_, info] : active_) {
auto& entry = requests[info.tag];
entry.active_size += info.size;
++entry.active_count;
}
for (auto const& req : pending_) {
auto& entry = requests[req->tag];
entry.pending_size += req->size;
++entry.pending_count;
}
}
auto const requests = get_request_info();
std::vector<std::string_view> tags(requests.size());
std::ranges::transform(requests, tags.begin(),
@ -177,7 +154,7 @@ class memory_manager : public std::enable_shared_from_this<memory_manager> {
fmt::format("{}/{}", size_with_unit(used_), size_with_unit(limit_));
for (auto const& tag : tags) {
auto const& info = requests[tag];
auto const& info = requests.at(tag);
result +=
fmt::format("; {}: {} ({}) A, {} ({}) P", tag,
size_with_unit(info.active_size), info.active_count,
@ -187,7 +164,54 @@ class memory_manager : public std::enable_shared_from_this<memory_manager> {
return result;
}
struct usage_info {
std::string_view tag;
size_t size;
};
std::vector<usage_info> get_usage_info() const {
auto const requests = get_request_info();
std::vector<usage_info> usage;
size_t total_used{0};
for (auto const& [tag, info] : requests) {
total_used += info.active_size;
usage.emplace_back(usage_info{tag, info.active_size});
}
DWARFS_CHECK(total_used <= limit_,
fmt::format("Total used memory exceeds limit: {} > {}",
total_used, limit_));
usage.emplace_back(usage_info{"free", limit_ - total_used});
return usage;
}
private:
struct request_info {
size_t active_size{0};
size_t active_count{0};
size_t pending_size{0};
size_t pending_count{0};
};
std::unordered_map<std::string_view, request_info> get_request_info() const {
std::unordered_map<std::string_view, request_info> requests;
std::lock_guard lock(mutex_);
for (auto const& [_, info] : active_) {
auto& entry = requests[info.tag];
entry.active_size += info.size;
++entry.active_count;
}
for (auto const& req : pending_) {
auto& entry = requests[req->tag];
entry.pending_size += req->size;
++entry.pending_count;
}
return requests;
}
void fulfill_and_unlock(std::unique_lock<std::mutex>& lock) {
std::vector<request_ptr> granted;
@ -233,7 +257,8 @@ class memory_manager : public std::enable_shared_from_this<memory_manager> {
void release_partial(size_t released_size, size_t sequence) {
std::unique_lock lock(mutex_);
if (active_.contains(sequence)) {
if (auto it = active_.find(sequence); it != active_.end()) {
it->second.size -= released_size;
used_ -= released_size;
fulfill_and_unlock(lock);
}

View File

@ -549,8 +549,9 @@ class brotli_block_compressor final : public block_compressor::impl {
return std::make_unique<brotli_block_compressor>(*this);
}
shared_byte_buffer compress(shared_byte_buffer const& data,
std::string const* /*metadata*/) const override {
shared_byte_buffer
compress(shared_byte_buffer const& data, std::string const* /*metadata*/,
memory_manager* /*memmgr*/) const override {
auto compressed = malloc_byte_buffer::create(); // TODO: make configurable
compressed.resize(varint::max_size +
::BrotliEncoderMaxCompressedSize(data.size()));

View File

@ -224,8 +224,9 @@ class flac_block_compressor final : public block_compressor::impl {
return std::make_unique<flac_block_compressor>(*this);
}
shared_byte_buffer compress(shared_byte_buffer const& data,
std::string const* metadata) const override {
shared_byte_buffer
compress(shared_byte_buffer const& data, std::string const* metadata,
memory_manager* /*memmgr*/) const override {
if (!metadata) {
DWARFS_THROW(runtime_error,
"internal error: flac compression requires metadata");

View File

@ -84,8 +84,9 @@ class lz4_block_compressor final : public block_compressor::impl {
return std::make_unique<lz4_block_compressor>(*this);
}
shared_byte_buffer compress(shared_byte_buffer const& data,
std::string const* /*metadata*/) const override {
shared_byte_buffer
compress(shared_byte_buffer const& data, std::string const* /*metadata*/,
memory_manager* /*memmgr*/) const override {
auto compressed = malloc_byte_buffer::create(); // TODO: make configurable
compressed.resize(sizeof(uint32_t) +
LZ4_compressBound(to<int>(data.size())));

View File

@ -122,8 +122,9 @@ class lzma_block_compressor final : public block_compressor::impl {
return std::make_unique<lzma_block_compressor>(*this);
}
shared_byte_buffer compress(shared_byte_buffer const& data,
std::string const* metadata) const override;
shared_byte_buffer
compress(shared_byte_buffer const& data, std::string const* metadata,
memory_manager* /*memmgr*/) const override;
compression_type type() const override { return compression_type::LZMA; }
@ -258,7 +259,8 @@ lzma_block_compressor::compress(shared_byte_buffer const& data,
shared_byte_buffer
lzma_block_compressor::compress(shared_byte_buffer const& data,
std::string const* /*metadata*/) const {
std::string const* /*metadata*/,
memory_manager* /*memmgr*/) const {
auto lzma_opts = opt_lzma_;
std::array<lzma_filter, 3> filters{{{binary_vli_, nullptr},
{LZMA_FILTER_LZMA2, &lzma_opts},

View File

@ -51,8 +51,9 @@ class null_block_compressor final : public block_compressor::impl {
return std::make_unique<null_block_compressor>(*this);
}
shared_byte_buffer compress(shared_byte_buffer const& data,
std::string const* /*metadata*/) const override {
shared_byte_buffer
compress(shared_byte_buffer const& data, std::string const* /*metadata*/,
memory_manager* /*memmgr*/) const override {
return data;
}

View File

@ -63,8 +63,9 @@ class ricepp_block_compressor final : public block_compressor::impl {
return std::make_unique<ricepp_block_compressor>(*this);
}
shared_byte_buffer compress(shared_byte_buffer const& data,
std::string const* metadata) const override {
shared_byte_buffer
compress(shared_byte_buffer const& data, std::string const* metadata,
memory_manager* /*memmgr*/) const override {
if (!metadata) {
DWARFS_THROW(runtime_error,
"internal error: ricepp compression requires metadata");

View File

@ -114,8 +114,9 @@ class zstd_block_compressor final : public block_compressor::impl {
return std::make_unique<zstd_block_compressor>(*this);
}
shared_byte_buffer compress(shared_byte_buffer const& data,
std::string const* metadata) const override;
shared_byte_buffer
compress(shared_byte_buffer const& data, std::string const* metadata,
memory_manager* /*memmgr*/) const override;
compression_type type() const override { return compression_type::ZSTD; }
@ -152,7 +153,8 @@ class zstd_block_compressor final : public block_compressor::impl {
shared_byte_buffer
zstd_block_compressor::compress(shared_byte_buffer const& data,
std::string const* /*metadata*/) const {
std::string const* /*metadata*/,
memory_manager* /*memmgr*/) const {
auto compressed = malloc_byte_buffer::create(); // TODO: make configurable
compressed.resize(ZSTD_compressBound(data.size()));
auto size = ZSTD_compress(compressed.data(), compressed.size(), data.data(),

View File

@ -32,6 +32,7 @@
#include <dwarfs/logger.h>
#include <dwarfs/memory_manager.h>
#include <dwarfs/terminal.h>
#include <dwarfs/termutil.h>
#include <dwarfs/util.h>
#include <dwarfs/writer/console_writer.h>
#include <dwarfs/writer/entry_interface.h>
@ -307,7 +308,32 @@ void console_writer::update(writer_progress& prog, bool last) {
oss << newline;
if (memmgr_) {
// TODO: clean up, move to separate function
oss << "memmgr: " << memmgr_->status() << newline;
auto usage = memmgr_->get_usage_info();
std::unordered_map<std::string_view, size_t> usage_map;
for (auto const& u : usage) {
usage_map.emplace(u.tag, u.size);
}
using namespace std::string_view_literals;
static constexpr std::array tags{
"segm"sv, "sblk"sv, "data"sv, "comp"sv, "cblk"sv, "free"sv,
};
std::vector<bar_chart_section> sections;
for (auto const& tag : tags) {
if (auto it = usage_map.find(tag); it != usage_map.end()) {
sections.emplace_back(bar_chart_section{
std::string{tag}, static_cast<double>(it->second)});
}
}
oss << render_bar_chart(term(), sections) << newline;
}
}
@ -375,7 +401,7 @@ void console_writer::update(writer_progress& prog, bool last) {
oss.clear();
oss.seekp(0);
rewind(oss, (mode_ == NORMAL ? 9 : 4) + (memmgr_ ? 1 : 0) + ctxs.size());
rewind(oss, (mode_ == NORMAL ? 9 : 4) + (memmgr_ ? 2 : 0) + ctxs.size());
oss << statebuf_;
write_nolock(oss.str());