From e3f739d6e288c8d4bea5f0ca4fbf539f61a52ee4 Mon Sep 17 00:00:00 2001 From: Marcus Holland-Moritz Date: Sun, 20 Aug 2023 15:46:08 +0200 Subject: [PATCH] Compression progress and progress context priority --- include/dwarfs/progress.h | 1 + src/dwarfs/console_writer.cpp | 4 +-- src/dwarfs/filesystem_writer.cpp | 51 ++++++++++++++++++++++++++++---- src/dwarfs/progress.cpp | 4 +++ 4 files changed, 52 insertions(+), 8 deletions(-) diff --git a/include/dwarfs/progress.h b/include/dwarfs/progress.h index 653215de..eca22237 100644 --- a/include/dwarfs/progress.h +++ b/include/dwarfs/progress.h @@ -57,6 +57,7 @@ class progress { virtual ~context() = default; virtual status get_status() const = 0; + virtual int get_priority() const { return 0; } speedometer speed{std::chrono::seconds(5)}; }; diff --git a/src/dwarfs/console_writer.cpp b/src/dwarfs/console_writer.cpp index 7fe8e8d7..72f66f70 100644 --- a/src/dwarfs/console_writer.cpp +++ b/src/dwarfs/console_writer.cpp @@ -133,12 +133,12 @@ void output_context_line(std::ostream& os, progress::context& ctx, size_t width, } } - os << terminal_colored(st.context, st.color, colored); + os << terminal_colored(st.context, st.color, colored, termstyle::BOLD); os << terminal_colored(fmt::format("{:<{}} {}{}", st.status_string, status_w - st.context.size(), progress, speed), - st.color, colored, termstyle::DIM); + st.color, colored); } } // namespace diff --git a/src/dwarfs/filesystem_writer.cpp b/src/dwarfs/filesystem_writer.cpp index c5dcc679..731fddd6 100644 --- a/src/dwarfs/filesystem_writer.cpp +++ b/src/dwarfs/filesystem_writer.cpp @@ -46,10 +46,38 @@ namespace dwarfs { namespace { +class compression_progress : public progress::context { + public: + using status = progress::context::status; + + compression_progress() = default; + + status get_status() const override { + status st; + st.color = termcolor::RED; + st.context = "[compressing] "; + auto bin = bytes_in.load(); + auto bout = bytes_out.load(); + if (bin > 0 && bout > 0) { + st.status_string = fmt::format("compressed {} to {} (ratio {:.2f}%)", + size_with_unit(bin), size_with_unit(bout), + 100.0 * bout / bin); + } + st.bytes_processed.emplace(bytes_in.load()); + return st; + } + + int get_priority() const override { return -1000; } + + std::atomic bytes_in{0}; + std::atomic bytes_out{0}; +}; + class fsblock { public: fsblock(section_type type, block_compressor const& bc, - std::shared_ptr&& data, uint32_t number); + std::shared_ptr&& data, uint32_t number, + std::shared_ptr pctx); fsblock(section_type type, compression_type compression, std::span data, uint32_t number); @@ -95,13 +123,15 @@ class fsblock { class raw_fsblock : public fsblock::impl { public: raw_fsblock(section_type type, const block_compressor& bc, - std::shared_ptr&& data, uint32_t number) + std::shared_ptr&& data, uint32_t number, + std::shared_ptr pctx) : type_{type} , bc_{bc} , uncompressed_size_{data->size()} , data_{std::move(data)} , number_{number} - , comp_type_{bc_.type()} {} + , comp_type_{bc_.type()} + , pctx_{std::move(pctx)} {} void compress(worker_group& wg, std::optional meta) override { std::promise prom; @@ -118,6 +148,9 @@ class raw_fsblock : public fsblock::impl { tmp = std::make_shared(bc_.compress(data_->vec())); } + pctx_->bytes_in += data_->vec().size(); + pctx_->bytes_out += tmp->vec().size(); + { std::lock_guard lock(mx_); data_.swap(tmp); @@ -163,6 +196,7 @@ class raw_fsblock : public fsblock::impl { uint32_t const number_; section_header_v2 header_; compression_type comp_type_; + std::shared_ptr pctx_; }; class compressed_fsblock : public fsblock::impl { @@ -212,8 +246,10 @@ class compressed_fsblock : public fsblock::impl { }; fsblock::fsblock(section_type type, block_compressor const& bc, - std::shared_ptr&& data, uint32_t number) - : impl_(std::make_unique(type, bc, std::move(data), number)) {} + std::shared_ptr&& data, uint32_t number, + std::shared_ptr pctx) + : impl_(std::make_unique(type, bc, std::move(data), number, + std::move(pctx))) {} fsblock::fsblock(section_type type, compression_type compression, std::span data, uint32_t number) @@ -299,6 +335,7 @@ class filesystem_writer_ final : public filesystem_writer::impl { const filesystem_writer_options options_; LOG_PROXY_DECL(LoggerPolicy); std::deque> queue_; + std::shared_ptr pctx_; mutable std::mutex mx_; std::condition_variable cond_; volatile bool flush_; @@ -321,6 +358,7 @@ filesystem_writer_::filesystem_writer_( , metadata_bc_(metadata_bc) , options_(options) , LOG_PROXY_INIT(lgr) + , pctx_{prog.create_context()} , flush_(false) , writer_thread_(&filesystem_writer_::writer_thread, this) { if (header_) { @@ -453,7 +491,8 @@ uint32_t filesystem_writer_::write_section( } block_no = section_number_++; - auto fsb = std::make_unique(type, bc, std::move(data), block_no); + auto fsb = + std::make_unique(type, bc, std::move(data), block_no, pctx_); fsb->compress(wg_, meta); diff --git a/src/dwarfs/progress.cpp b/src/dwarfs/progress.cpp index 7b08f9e3..d425c68b 100644 --- a/src/dwarfs/progress.cpp +++ b/src/dwarfs/progress.cpp @@ -82,6 +82,10 @@ auto progress::get_active_contexts() const contexts_.end()); } + std::stable_sort(rv.begin(), rv.end(), [](const auto& a, const auto& b) { + return a->get_priority() > b->get_priority(); + }); + return rv; }