Compression progress and progress context priority

This commit is contained in:
Marcus Holland-Moritz 2023-08-20 15:46:08 +02:00
parent 6254dcd35b
commit e3f739d6e2
4 changed files with 52 additions and 8 deletions

View File

@ -57,6 +57,7 @@ class progress {
virtual ~context() = default; virtual ~context() = default;
virtual status get_status() const = 0; virtual status get_status() const = 0;
virtual int get_priority() const { return 0; }
speedometer<uint64_t> speed{std::chrono::seconds(5)}; speedometer<uint64_t> speed{std::chrono::seconds(5)};
}; };

View File

@ -133,12 +133,12 @@ void output_context_line(std::ostream& os, progress::context& ctx, size_t width,
} }
} }
os << terminal_colored(st.context, st.color, colored); os << terminal_colored(st.context, st.color, colored, termstyle::BOLD);
os << terminal_colored(fmt::format("{:<{}} {}{}", st.status_string, os << terminal_colored(fmt::format("{:<{}} {}{}", st.status_string,
status_w - st.context.size(), progress, status_w - st.context.size(), progress,
speed), speed),
st.color, colored, termstyle::DIM); st.color, colored);
} }
} // namespace } // namespace

View File

@ -46,10 +46,38 @@ namespace dwarfs {
namespace { namespace {
class compression_progress : public progress::context {
public:
using status = progress::context::status;
compression_progress() = default;
status get_status() const override {
status st;
st.color = termcolor::RED;
st.context = "[compressing] ";
auto bin = bytes_in.load();
auto bout = bytes_out.load();
if (bin > 0 && bout > 0) {
st.status_string = fmt::format("compressed {} to {} (ratio {:.2f}%)",
size_with_unit(bin), size_with_unit(bout),
100.0 * bout / bin);
}
st.bytes_processed.emplace(bytes_in.load());
return st;
}
int get_priority() const override { return -1000; }
std::atomic<size_t> bytes_in{0};
std::atomic<size_t> bytes_out{0};
};
class fsblock { class fsblock {
public: public:
fsblock(section_type type, block_compressor const& bc, fsblock(section_type type, block_compressor const& bc,
std::shared_ptr<block_data>&& data, uint32_t number); std::shared_ptr<block_data>&& data, uint32_t number,
std::shared_ptr<compression_progress> pctx);
fsblock(section_type type, compression_type compression, fsblock(section_type type, compression_type compression,
std::span<uint8_t const> data, uint32_t number); std::span<uint8_t const> data, uint32_t number);
@ -95,13 +123,15 @@ class fsblock {
class raw_fsblock : public fsblock::impl { class raw_fsblock : public fsblock::impl {
public: public:
raw_fsblock(section_type type, const block_compressor& bc, raw_fsblock(section_type type, const block_compressor& bc,
std::shared_ptr<block_data>&& data, uint32_t number) std::shared_ptr<block_data>&& data, uint32_t number,
std::shared_ptr<compression_progress> pctx)
: type_{type} : type_{type}
, bc_{bc} , bc_{bc}
, uncompressed_size_{data->size()} , uncompressed_size_{data->size()}
, data_{std::move(data)} , data_{std::move(data)}
, number_{number} , number_{number}
, comp_type_{bc_.type()} {} , comp_type_{bc_.type()}
, pctx_{std::move(pctx)} {}
void compress(worker_group& wg, std::optional<std::string> meta) override { void compress(worker_group& wg, std::optional<std::string> meta) override {
std::promise<void> prom; std::promise<void> prom;
@ -118,6 +148,9 @@ class raw_fsblock : public fsblock::impl {
tmp = std::make_shared<block_data>(bc_.compress(data_->vec())); tmp = std::make_shared<block_data>(bc_.compress(data_->vec()));
} }
pctx_->bytes_in += data_->vec().size();
pctx_->bytes_out += tmp->vec().size();
{ {
std::lock_guard lock(mx_); std::lock_guard lock(mx_);
data_.swap(tmp); data_.swap(tmp);
@ -163,6 +196,7 @@ class raw_fsblock : public fsblock::impl {
uint32_t const number_; uint32_t const number_;
section_header_v2 header_; section_header_v2 header_;
compression_type comp_type_; compression_type comp_type_;
std::shared_ptr<compression_progress> pctx_;
}; };
class compressed_fsblock : public fsblock::impl { class compressed_fsblock : public fsblock::impl {
@ -212,8 +246,10 @@ class compressed_fsblock : public fsblock::impl {
}; };
fsblock::fsblock(section_type type, block_compressor const& bc, fsblock::fsblock(section_type type, block_compressor const& bc,
std::shared_ptr<block_data>&& data, uint32_t number) std::shared_ptr<block_data>&& data, uint32_t number,
: impl_(std::make_unique<raw_fsblock>(type, bc, std::move(data), number)) {} std::shared_ptr<compression_progress> pctx)
: impl_(std::make_unique<raw_fsblock>(type, bc, std::move(data), number,
std::move(pctx))) {}
fsblock::fsblock(section_type type, compression_type compression, fsblock::fsblock(section_type type, compression_type compression,
std::span<uint8_t const> data, uint32_t number) std::span<uint8_t const> data, uint32_t number)
@ -299,6 +335,7 @@ class filesystem_writer_ final : public filesystem_writer::impl {
const filesystem_writer_options options_; const filesystem_writer_options options_;
LOG_PROXY_DECL(LoggerPolicy); LOG_PROXY_DECL(LoggerPolicy);
std::deque<std::unique_ptr<fsblock>> queue_; std::deque<std::unique_ptr<fsblock>> queue_;
std::shared_ptr<compression_progress> pctx_;
mutable std::mutex mx_; mutable std::mutex mx_;
std::condition_variable cond_; std::condition_variable cond_;
volatile bool flush_; volatile bool flush_;
@ -321,6 +358,7 @@ filesystem_writer_<LoggerPolicy>::filesystem_writer_(
, metadata_bc_(metadata_bc) , metadata_bc_(metadata_bc)
, options_(options) , options_(options)
, LOG_PROXY_INIT(lgr) , LOG_PROXY_INIT(lgr)
, pctx_{prog.create_context<compression_progress>()}
, flush_(false) , flush_(false)
, writer_thread_(&filesystem_writer_::writer_thread, this) { , writer_thread_(&filesystem_writer_::writer_thread, this) {
if (header_) { if (header_) {
@ -453,7 +491,8 @@ uint32_t filesystem_writer_<LoggerPolicy>::write_section(
} }
block_no = section_number_++; block_no = section_number_++;
auto fsb = std::make_unique<fsblock>(type, bc, std::move(data), block_no); auto fsb =
std::make_unique<fsblock>(type, bc, std::move(data), block_no, pctx_);
fsb->compress(wg_, meta); fsb->compress(wg_, meta);

View File

@ -82,6 +82,10 @@ auto progress::get_active_contexts() const
contexts_.end()); contexts_.end());
} }
std::stable_sort(rv.begin(), rv.end(), [](const auto& a, const auto& b) {
return a->get_priority() > b->get_priority();
});
return rv; return rv;
} }