chore: propagate exceptions in filesystem_writer

This commit is contained in:
Marcus Holland-Moritz 2024-01-04 23:54:20 +01:00
parent 4abbdf09ab
commit b2f368bce0

View File

@ -346,41 +346,46 @@ class rewritten_fsblock : public fsblock::impl {
std::promise<void> prom; std::promise<void> prom;
future_ = prom.get_future(); future_ = prom.get_future();
wg.add_job([this, prom = std::move(prom), wg.add_job(
meta = std::move(meta)]() mutable { [this, prom = std::move(prom), meta = std::move(meta)]() mutable {
// TODO: we don't have to do this for uncompressed blocks try {
std::vector<uint8_t> block; // TODO: we don't have to do this for uncompressed blocks
block_decompressor bd(data_comp_type_, data_.data(), data_.size(), block); std::vector<uint8_t> block;
bd.decompress_frame(bd.uncompressed_size()); block_decompressor bd(data_comp_type_, data_.data(), data_.size(),
block);
bd.decompress_frame(bd.uncompressed_size());
if (!meta) { if (!meta) {
meta = bd.metadata(); meta = bd.metadata();
} }
pctx_->bytes_in += block.size(); // TODO: data_.size()? pctx_->bytes_in += block.size(); // TODO: data_.size()?
try { try {
if (meta) { if (meta) {
block = bc_.compress(block, *meta); block = bc_.compress(block, *meta);
} else { } else {
block = bc_.compress(block); block = bc_.compress(block);
} }
} catch (bad_compression_ratio_error const&) { } catch (bad_compression_ratio_error const&) {
comp_type_ = compression_type::NONE; comp_type_ = compression_type::NONE;
} }
pctx_->bytes_out += block.size(); pctx_->bytes_out += block.size();
{ {
std::lock_guard lock(mx_); std::lock_guard lock(mx_);
block_data_.swap(block); block_data_.swap(block);
} }
prom.set_value(); prom.set_value();
}); } catch (...) {
prom.set_exception(std::current_exception());
}
});
} }
void wait_until_compressed() override { future_.wait(); } void wait_until_compressed() override { future_.get(); }
section_type type() const override { return type_; } section_type type() const override { return type_; }
@ -690,6 +695,7 @@ void filesystem_writer_<LoggerPolicy>::writer_thread() {
auto const& fsb = holder.value(); auto const& fsb = holder.value();
// TODO: this may throw
fsb->wait_until_compressed(); fsb->wait_until_compressed();
LOG_DEBUG << get_section_name(fsb->type()) << " [" << fsb->block_no() LOG_DEBUG << get_section_name(fsb->type()) << " [" << fsb->block_no()