From 76aa5d40e58b1103a903eabff57b1f2967800922 Mon Sep 17 00:00:00 2001 From: Marcus Holland-Moritz Date: Tue, 27 May 2025 22:38:51 +0200 Subject: [PATCH] wip: block_compressor / memory_manager --- include/dwarfs/block_compressor.h | 17 ++++--- include/dwarfs/memory_manager.h | 77 ++++++++++++++++++++----------- src/compression/brotli.cpp | 5 +- src/compression/flac.cpp | 5 +- src/compression/lz4.cpp | 5 +- src/compression/lzma.cpp | 8 ++-- src/compression/null.cpp | 5 +- src/compression/ricepp.cpp | 5 +- src/compression/zstd.cpp | 8 ++-- src/writer/console_writer.cpp | 28 ++++++++++- 10 files changed, 114 insertions(+), 49 deletions(-) diff --git a/include/dwarfs/block_compressor.h b/include/dwarfs/block_compressor.h index b0831d0f..8ee29943 100644 --- a/include/dwarfs/block_compressor.h +++ b/include/dwarfs/block_compressor.h @@ -38,6 +38,8 @@ namespace dwarfs { +class memory_manager; + class bad_compression_ratio_error : public std::runtime_error { public: bad_compression_ratio_error() @@ -56,13 +58,15 @@ class block_compressor { block_compressor(block_compressor&& bc) = default; block_compressor& operator=(block_compressor&& rhs) = default; - shared_byte_buffer compress(shared_byte_buffer const& data) const { - return impl_->compress(data, nullptr); + shared_byte_buffer compress(shared_byte_buffer const& data, + memory_manager* memmgr = nullptr) const { + return impl_->compress(data, nullptr, memmgr); } shared_byte_buffer - compress(shared_byte_buffer const& data, std::string const& metadata) const { - return impl_->compress(data, &metadata); + compress(shared_byte_buffer const& data, std::string const& metadata, + memory_manager* memmgr = nullptr) const { + return impl_->compress(data, &metadata, memmgr); } compression_type type() const { return impl_->type(); } @@ -90,8 +94,9 @@ class block_compressor { virtual std::unique_ptr clone() const = 0; - virtual shared_byte_buffer compress(shared_byte_buffer const& data, - std::string const* metadata) const = 0; + virtual shared_byte_buffer + compress(shared_byte_buffer const& data, std::string const* metadata, + memory_manager* memmgr) const = 0; virtual compression_type type() const = 0; virtual std::string describe() const = 0; diff --git a/include/dwarfs/memory_manager.h b/include/dwarfs/memory_manager.h index de5aa601..17f2e5bf 100644 --- a/include/dwarfs/memory_manager.h +++ b/include/dwarfs/memory_manager.h @@ -143,30 +143,7 @@ class memory_manager : public std::enable_shared_from_this { } std::string status() const { - struct request_info { - size_t active_size{0}; - size_t active_count{0}; - size_t pending_size{0}; - size_t pending_count{0}; - }; - - std::unordered_map requests; - - { - std::lock_guard lock(mutex_); - - for (auto const& [_, info] : active_) { - auto& entry = requests[info.tag]; - entry.active_size += info.size; - ++entry.active_count; - } - - for (auto const& req : pending_) { - auto& entry = requests[req->tag]; - entry.pending_size += req->size; - ++entry.pending_count; - } - } + auto const requests = get_request_info(); std::vector tags(requests.size()); std::ranges::transform(requests, tags.begin(), @@ -177,7 +154,7 @@ class memory_manager : public std::enable_shared_from_this { fmt::format("{}/{}", size_with_unit(used_), size_with_unit(limit_)); for (auto const& tag : tags) { - auto const& info = requests[tag]; + auto const& info = requests.at(tag); result += fmt::format("; {}: {} ({}) A, {} ({}) P", tag, size_with_unit(info.active_size), info.active_count, @@ -187,7 +164,54 @@ class memory_manager : public std::enable_shared_from_this { return result; } + struct usage_info { + std::string_view tag; + size_t size; + }; + + std::vector get_usage_info() const { + auto const requests = get_request_info(); + std::vector usage; + size_t total_used{0}; + for (auto const& [tag, info] : requests) { + total_used += info.active_size; + usage.emplace_back(usage_info{tag, info.active_size}); + } + DWARFS_CHECK(total_used <= limit_, + fmt::format("Total used memory exceeds limit: {} > {}", + total_used, limit_)); + usage.emplace_back(usage_info{"free", limit_ - total_used}); + return usage; + } + private: + struct request_info { + size_t active_size{0}; + size_t active_count{0}; + size_t pending_size{0}; + size_t pending_count{0}; + }; + + std::unordered_map get_request_info() const { + std::unordered_map requests; + + std::lock_guard lock(mutex_); + + for (auto const& [_, info] : active_) { + auto& entry = requests[info.tag]; + entry.active_size += info.size; + ++entry.active_count; + } + + for (auto const& req : pending_) { + auto& entry = requests[req->tag]; + entry.pending_size += req->size; + ++entry.pending_count; + } + + return requests; + } + void fulfill_and_unlock(std::unique_lock& lock) { std::vector granted; @@ -233,7 +257,8 @@ class memory_manager : public std::enable_shared_from_this { void release_partial(size_t released_size, size_t sequence) { std::unique_lock lock(mutex_); - if (active_.contains(sequence)) { + if (auto it = active_.find(sequence); it != active_.end()) { + it->second.size -= released_size; used_ -= released_size; fulfill_and_unlock(lock); } diff --git a/src/compression/brotli.cpp b/src/compression/brotli.cpp index 618f75a0..15538bfc 100644 --- a/src/compression/brotli.cpp +++ b/src/compression/brotli.cpp @@ -549,8 +549,9 @@ class brotli_block_compressor final : public block_compressor::impl { return std::make_unique(*this); } - shared_byte_buffer compress(shared_byte_buffer const& data, - std::string const* /*metadata*/) const override { + shared_byte_buffer + compress(shared_byte_buffer const& data, std::string const* /*metadata*/, + memory_manager* /*memmgr*/) const override { auto compressed = malloc_byte_buffer::create(); // TODO: make configurable compressed.resize(varint::max_size + ::BrotliEncoderMaxCompressedSize(data.size())); diff --git a/src/compression/flac.cpp b/src/compression/flac.cpp index 1bbd62d2..577e8d8b 100644 --- a/src/compression/flac.cpp +++ b/src/compression/flac.cpp @@ -224,8 +224,9 @@ class flac_block_compressor final : public block_compressor::impl { return std::make_unique(*this); } - shared_byte_buffer compress(shared_byte_buffer const& data, - std::string const* metadata) const override { + shared_byte_buffer + compress(shared_byte_buffer const& data, std::string const* metadata, + memory_manager* /*memmgr*/) const override { if (!metadata) { DWARFS_THROW(runtime_error, "internal error: flac compression requires metadata"); diff --git a/src/compression/lz4.cpp b/src/compression/lz4.cpp index c23d53c3..e8cfd4b7 100644 --- a/src/compression/lz4.cpp +++ b/src/compression/lz4.cpp @@ -84,8 +84,9 @@ class lz4_block_compressor final : public block_compressor::impl { return std::make_unique(*this); } - shared_byte_buffer compress(shared_byte_buffer const& data, - std::string const* /*metadata*/) const override { + shared_byte_buffer + compress(shared_byte_buffer const& data, std::string const* /*metadata*/, + memory_manager* /*memmgr*/) const override { auto compressed = malloc_byte_buffer::create(); // TODO: make configurable compressed.resize(sizeof(uint32_t) + LZ4_compressBound(to(data.size()))); diff --git a/src/compression/lzma.cpp b/src/compression/lzma.cpp index 61149639..e7a0a82a 100644 --- a/src/compression/lzma.cpp +++ b/src/compression/lzma.cpp @@ -122,8 +122,9 @@ class lzma_block_compressor final : public block_compressor::impl { return std::make_unique(*this); } - shared_byte_buffer compress(shared_byte_buffer const& data, - std::string const* metadata) const override; + shared_byte_buffer + compress(shared_byte_buffer const& data, std::string const* metadata, + memory_manager* /*memmgr*/) const override; compression_type type() const override { return compression_type::LZMA; } @@ -258,7 +259,8 @@ lzma_block_compressor::compress(shared_byte_buffer const& data, shared_byte_buffer lzma_block_compressor::compress(shared_byte_buffer const& data, - std::string const* /*metadata*/) const { + std::string const* /*metadata*/, + memory_manager* /*memmgr*/) const { auto lzma_opts = opt_lzma_; std::array filters{{{binary_vli_, nullptr}, {LZMA_FILTER_LZMA2, &lzma_opts}, diff --git a/src/compression/null.cpp b/src/compression/null.cpp index 137e7367..090010a7 100644 --- a/src/compression/null.cpp +++ b/src/compression/null.cpp @@ -51,8 +51,9 @@ class null_block_compressor final : public block_compressor::impl { return std::make_unique(*this); } - shared_byte_buffer compress(shared_byte_buffer const& data, - std::string const* /*metadata*/) const override { + shared_byte_buffer + compress(shared_byte_buffer const& data, std::string const* /*metadata*/, + memory_manager* /*memmgr*/) const override { return data; } diff --git a/src/compression/ricepp.cpp b/src/compression/ricepp.cpp index 7260d8b1..b3efec2c 100644 --- a/src/compression/ricepp.cpp +++ b/src/compression/ricepp.cpp @@ -63,8 +63,9 @@ class ricepp_block_compressor final : public block_compressor::impl { return std::make_unique(*this); } - shared_byte_buffer compress(shared_byte_buffer const& data, - std::string const* metadata) const override { + shared_byte_buffer + compress(shared_byte_buffer const& data, std::string const* metadata, + memory_manager* /*memmgr*/) const override { if (!metadata) { DWARFS_THROW(runtime_error, "internal error: ricepp compression requires metadata"); diff --git a/src/compression/zstd.cpp b/src/compression/zstd.cpp index 93820709..ad63abed 100644 --- a/src/compression/zstd.cpp +++ b/src/compression/zstd.cpp @@ -114,8 +114,9 @@ class zstd_block_compressor final : public block_compressor::impl { return std::make_unique(*this); } - shared_byte_buffer compress(shared_byte_buffer const& data, - std::string const* metadata) const override; + shared_byte_buffer + compress(shared_byte_buffer const& data, std::string const* metadata, + memory_manager* /*memmgr*/) const override; compression_type type() const override { return compression_type::ZSTD; } @@ -152,7 +153,8 @@ class zstd_block_compressor final : public block_compressor::impl { shared_byte_buffer zstd_block_compressor::compress(shared_byte_buffer const& data, - std::string const* /*metadata*/) const { + std::string const* /*metadata*/, + memory_manager* /*memmgr*/) const { auto compressed = malloc_byte_buffer::create(); // TODO: make configurable compressed.resize(ZSTD_compressBound(data.size())); auto size = ZSTD_compress(compressed.data(), compressed.size(), data.data(), diff --git a/src/writer/console_writer.cpp b/src/writer/console_writer.cpp index 2cb77feb..d10b5d48 100644 --- a/src/writer/console_writer.cpp +++ b/src/writer/console_writer.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -307,7 +308,32 @@ void console_writer::update(writer_progress& prog, bool last) { oss << newline; if (memmgr_) { + // TODO: clean up, move to separate function + oss << "memmgr: " << memmgr_->status() << newline; + + auto usage = memmgr_->get_usage_info(); + std::unordered_map usage_map; + for (auto const& u : usage) { + usage_map.emplace(u.tag, u.size); + } + + using namespace std::string_view_literals; + + static constexpr std::array tags{ + "segm"sv, "sblk"sv, "data"sv, "comp"sv, "cblk"sv, "free"sv, + }; + + std::vector sections; + + for (auto const& tag : tags) { + if (auto it = usage_map.find(tag); it != usage_map.end()) { + sections.emplace_back(bar_chart_section{ + std::string{tag}, static_cast(it->second)}); + } + } + + oss << render_bar_chart(term(), sections) << newline; } } @@ -375,7 +401,7 @@ void console_writer::update(writer_progress& prog, bool last) { oss.clear(); oss.seekp(0); - rewind(oss, (mode_ == NORMAL ? 9 : 4) + (memmgr_ ? 1 : 0) + ctxs.size()); + rewind(oss, (mode_ == NORMAL ? 9 : 4) + (memmgr_ ? 2 : 0) + ctxs.size()); oss << statebuf_; write_nolock(oss.str());