diff --git a/include/dwarfs/block_compressor.h b/include/dwarfs/block_compressor.h index 6edcb8e8..bd9354ac 100644 --- a/include/dwarfs/block_compressor.h +++ b/include/dwarfs/block_compressor.h @@ -91,6 +91,8 @@ class block_compressor { return impl_->get_compression_constraints(metadata); } + explicit operator bool() const { return static_cast(impl_); } + class impl { public: virtual ~impl() = default; diff --git a/include/dwarfs/writer/filesystem_writer.h b/include/dwarfs/writer/filesystem_writer.h index 51c870fb..db5757a1 100644 --- a/include/dwarfs/writer/filesystem_writer.h +++ b/include/dwarfs/writer/filesystem_writer.h @@ -24,6 +24,7 @@ #include #include +#include #include namespace dwarfs { @@ -46,17 +47,10 @@ class filesystem_writer_detail; class filesystem_writer { public: filesystem_writer(std::ostream& os, logger& lgr, thread_pool& pool, - writer_progress& prog, block_compressor const& bc); + writer_progress& prog); filesystem_writer(std::ostream& os, logger& lgr, thread_pool& pool, - writer_progress& prog, block_compressor const& schema_bc, - block_compressor const& metadata_bc, - block_compressor const& history_bc); - - filesystem_writer(std::ostream& os, logger& lgr, thread_pool& pool, - writer_progress& prog, block_compressor const& schema_bc, - block_compressor const& metadata_bc, - block_compressor const& history_bc, + writer_progress& prog, filesystem_writer_options const& options, std::istream* header = nullptr); @@ -67,6 +61,7 @@ class filesystem_writer { void add_default_compressor(block_compressor bc); void add_category_compressor(fragment_category::value_type cat, block_compressor bc); + void add_section_compressor(section_type type, block_compressor bc); internal::filesystem_writer_detail& get_internal() { return *impl_; } diff --git a/include/dwarfs/writer/internal/filesystem_writer_detail.h b/include/dwarfs/writer/internal/filesystem_writer_detail.h index 158e112e..f8eff04d 100644 --- a/include/dwarfs/writer/internal/filesystem_writer_detail.h +++ b/include/dwarfs/writer/internal/filesystem_writer_detail.h @@ -56,6 +56,8 @@ class filesystem_writer_detail { virtual void add_default_compressor(block_compressor bc) = 0; virtual void add_category_compressor(fragment_category::value_type cat, block_compressor bc) = 0; + virtual void + add_section_compressor(section_type type, block_compressor bc) = 0; virtual compression_constraints get_compression_constraints(fragment_category::value_type cat, std::string const& metadata) const = 0; diff --git a/src/writer/filesystem_writer.cpp b/src/writer/filesystem_writer.cpp index ffda259a..ee4f838e 100644 --- a/src/writer/filesystem_writer.cpp +++ b/src/writer/filesystem_writer.cpp @@ -35,7 +35,6 @@ #include #include -#include #include #include #include @@ -73,6 +72,23 @@ size_t copy_stream(std::istream& is, std::ostream& os) { return count; } +std::string get_friendly_section_name(section_type type) { + switch (type) { + case section_type::METADATA_V2_SCHEMA: + return "schema"; + case section_type::METADATA_V2: + return "metadata"; + case section_type::HISTORY: + return "history"; + case section_type::BLOCK: + return "block"; + case section_type::SECTION_INDEX: + return "index"; + } + + return get_section_name(type); +} + class compression_progress : public progress::context { public: using status = progress::context::status; @@ -188,7 +204,9 @@ class raw_fsblock : public fsblock::impl { , data_{std::move(data)} , comp_type_{bc_.type()} , pctx_{std::move(pctx)} - , set_block_cb_{std::move(set_block_cb)} {} + , set_block_cb_{std::move(set_block_cb)} { + DWARFS_CHECK(bc_, "block_compressor must not be null"); + } void compress(worker_group& wg, std::optional meta) override { std::promise prom; @@ -350,7 +368,9 @@ class rewritten_fsblock : public fsblock::impl { , data_{data} , comp_type_{bc_.type()} , pctx_{std::move(pctx)} - , data_comp_type_{data_comp_type} {} + , data_comp_type_{data_comp_type} { + DWARFS_CHECK(bc_, "block_compressor must not be null"); + } void compress(worker_group& wg, std::optional meta) override { std::promise prom; @@ -528,16 +548,14 @@ class filesystem_writer_ final : public filesystem_writer_detail { filesystem_writer_detail::physical_block_cb_type; filesystem_writer_(logger& lgr, std::ostream& os, worker_group& wg, - progress& prog, block_compressor const& schema_bc, - block_compressor const& metadata_bc, - block_compressor const& history_bc, - filesystem_writer_options const& options, + progress& prog, filesystem_writer_options const& options, std::istream* header); ~filesystem_writer_() noexcept override; void add_default_compressor(block_compressor bc) override; void add_category_compressor(fragment_category::value_type cat, block_compressor bc) override; + void add_section_compressor(section_type type, block_compressor bc) override; compression_constraints get_compression_constraints(fragment_category::value_type cat, std::string const& metadata) const override; @@ -579,9 +597,8 @@ class filesystem_writer_ final : public filesystem_writer_detail { block_compressor const& bc, std::optional meta, physical_block_cb_type physical_block_cb); void on_block_merged(block_holder_type holder); - void write_section_impl(section_type type, std::shared_ptr&& data, - block_compressor const& bc, - std::optional meta = std::nullopt); + void + write_section_impl(section_type type, std::shared_ptr&& data); void write(fsblock const& fsb); void write(const char* data, size_t size); template @@ -598,10 +615,9 @@ class filesystem_writer_ final : public filesystem_writer_detail { worker_group& wg_; progress& prog_; std::optional default_bc_; - std::unordered_map bc_; - block_compressor const& schema_bc_; - block_compressor const& metadata_bc_; - block_compressor const& history_bc_; + std::unordered_map + category_bc_; + std::unordered_map section_bc_; filesystem_writer_options const options_; LOG_PROXY_DECL(LoggerPolicy); std::deque queue_; @@ -621,16 +637,11 @@ class filesystem_writer_ final : public filesystem_writer_detail { template filesystem_writer_::filesystem_writer_( logger& lgr, std::ostream& os, worker_group& wg, progress& prog, - block_compressor const& schema_bc, block_compressor const& metadata_bc, - block_compressor const& history_bc, filesystem_writer_options const& options, std::istream* header) : os_(os) , header_(header) , wg_(wg) , prog_(prog) - , schema_bc_(schema_bc) - , metadata_bc_(metadata_bc) - , history_bc_(history_bc) , options_(options) , LOG_PROXY_INIT(lgr) , flush_{true} { @@ -642,26 +653,6 @@ filesystem_writer_::filesystem_writer_( } } - auto check_compressor = [](std::string_view name, - block_compressor const& bc) { - if (auto reqstr = bc.metadata_requirements(); !reqstr.empty()) { - try { - auto req = compression_metadata_requirements{reqstr}; - req.check(std::nullopt); - } catch (std::exception const& e) { - auto msg = fmt::format( - "cannot use '{}' for {} compression because compression " - "metadata requirements are not met: {}", - bc.describe(), name, e.what()); - DWARFS_THROW(runtime_error, msg); - } - } - }; - - check_compressor("schema", schema_bc); - check_compressor("metadata", metadata_bc); - check_compressor("history", history_bc); - // TODO: the whole flush & thread thing needs to be revisited flush_ = false; writer_thread_ = std::thread(&filesystem_writer_::writer_thread, this); @@ -709,8 +700,8 @@ void filesystem_writer_::writer_thread() { // TODO: this may throw fsb->wait_until_compressed(); - LOG_DEBUG << get_section_name(fsb->type()) << " [" << fsb->block_no() - << "] compressed from " + LOG_DEBUG << get_friendly_section_name(fsb->type()) << " [" + << fsb->block_no() << "] compressed from " << size_with_unit(fsb->uncompressed_size()) << " to " << size_with_unit(fsb->size()) << " [" << fsb->description() << "]"; @@ -767,7 +758,7 @@ template block_compressor const& filesystem_writer_::compressor_for_category( fragment_category::value_type cat) const { - if (auto it = bc_.find(cat); it != bc_.end()) { + if (auto it = category_bc_.find(cat); it != category_bc_.end()) { LOG_DEBUG << "using compressor (" << it->second.describe() << ") for category " << cat; return it->second; @@ -840,8 +831,9 @@ void filesystem_writer_::finish_category(fragment_category cat) { template void filesystem_writer_::write_section_impl( - section_type type, std::shared_ptr&& data, - block_compressor const& bc, std::optional meta) { + section_type type, std::shared_ptr&& data) { + auto& bc = get_compressor(type, std::nullopt); + uint32_t number; { @@ -855,7 +847,7 @@ void filesystem_writer_::write_section_impl( number = section_number_; fsb->set_block_no(section_number_++); - fsb->compress(wg_, meta); + fsb->compress(wg_); queue_.emplace_back(std::move(fsb)); } @@ -949,20 +941,62 @@ void filesystem_writer_::write_compressed_section( template void filesystem_writer_::add_default_compressor( block_compressor bc) { + DWARFS_CHECK(bc, "block_compressor must not be null"); + + LOG_DEBUG << "adding default compressor (" << bc.describe() << ")"; + if (default_bc_) { DWARFS_THROW(runtime_error, "default compressor registered more than once"); } + default_bc_ = std::move(bc); } template void filesystem_writer_::add_category_compressor( fragment_category::value_type cat, block_compressor bc) { + DWARFS_CHECK(bc, "block_compressor must not be null"); + LOG_DEBUG << "adding compressor (" << bc.describe() << ") for category " << cat; - if (!bc_.emplace(cat, std::move(bc)).second) { - DWARFS_THROW(runtime_error, - "category compressor registered more than once"); + + if (!category_bc_.emplace(cat, std::move(bc)).second) { + DWARFS_THROW( + runtime_error, + fmt::format("compressor registered more than once for category {}", + cat)); + } +} + +template +void filesystem_writer_::add_section_compressor( + section_type type, block_compressor bc) { + DWARFS_CHECK(bc, "block_compressor must not be null"); + + LOG_DEBUG << "adding compressor (" << bc.describe() << ") for section type " + << get_friendly_section_name(type); + + DWARFS_CHECK(type != section_type::SECTION_INDEX, + "SECTION_INDEX is always uncompressed"); + + if (auto reqstr = bc.metadata_requirements(); !reqstr.empty()) { + try { + auto req = compression_metadata_requirements{reqstr}; + req.check(std::nullopt); + } catch (std::exception const& e) { + auto msg = + fmt::format("cannot use '{}' for {} compression because compression " + "metadata requirements are not met: {}", + bc.describe(), get_friendly_section_name(type), e.what()); + DWARFS_THROW(runtime_error, msg); + } + } + + if (!section_bc_.emplace(type, std::move(bc)).second) { + DWARFS_THROW( + runtime_error, + fmt::format("compressor registered more than once for section type {}", + get_friendly_section_name(type))); } } @@ -976,22 +1010,14 @@ auto filesystem_writer_::get_compression_constraints( template block_compressor const& filesystem_writer_::get_compressor( section_type type, std::optional cat) const { - switch (type) { - case section_type::METADATA_V2_SCHEMA: - return schema_bc_; - - case section_type::METADATA_V2: - return metadata_bc_; - - case section_type::HISTORY: - return history_bc_; - - default: - break; + if (cat) { + DWARFS_CHECK(type == section_type::BLOCK, + "category-specific compressors are only supported for blocks"); + return compressor_for_category(*cat); } - if (cat) { - return compressor_for_category(*cat); + if (auto it = section_bc_.find(type); it != section_bc_.end()) { + return it->second; } return default_bc_.value(); @@ -1043,20 +1069,19 @@ void filesystem_writer_::write_block( template void filesystem_writer_::write_metadata_v2_schema( std::shared_ptr&& data) { - write_section_impl(section_type::METADATA_V2_SCHEMA, std::move(data), - schema_bc_); + write_section_impl(section_type::METADATA_V2_SCHEMA, std::move(data)); } template void filesystem_writer_::write_metadata_v2( std::shared_ptr&& data) { - write_section_impl(section_type::METADATA_V2, std::move(data), metadata_bc_); + write_section_impl(section_type::METADATA_V2, std::move(data)); } template void filesystem_writer_::write_history( std::shared_ptr&& data) { - write_section_impl(section_type::HISTORY, std::move(data), history_bc_); + write_section_impl(section_type::HISTORY, std::move(data)); } template @@ -1104,30 +1129,18 @@ void filesystem_writer_::write_section_index() { } // namespace internal filesystem_writer::filesystem_writer(std::ostream& os, logger& lgr, - thread_pool& pool, writer_progress& prog, - block_compressor const& bc) - : filesystem_writer(os, lgr, pool, prog, bc, bc, bc) {} + thread_pool& pool, writer_progress& prog) + : filesystem_writer(os, lgr, pool, prog, {}) {} filesystem_writer::filesystem_writer(std::ostream& os, logger& lgr, thread_pool& pool, writer_progress& prog, - block_compressor const& schema_bc, - block_compressor const& metadata_bc, - block_compressor const& history_bc) - : filesystem_writer(os, lgr, pool, prog, schema_bc, metadata_bc, history_bc, - filesystem_writer_options{}) {} - -filesystem_writer::filesystem_writer(std::ostream& os, logger& lgr, - thread_pool& pool, writer_progress& prog, - block_compressor const& schema_bc, - block_compressor const& metadata_bc, - block_compressor const& history_bc, filesystem_writer_options const& options, std::istream* header) : impl_{make_unique_logging_object( - lgr, os, pool.get_worker_group(), prog.get_internal(), schema_bc, - metadata_bc, history_bc, options, header)} {} + lgr, os, pool.get_worker_group(), prog.get_internal(), options, + header)} {} filesystem_writer::~filesystem_writer() = default; filesystem_writer::filesystem_writer(filesystem_writer&&) = default; @@ -1142,4 +1155,9 @@ void filesystem_writer::add_category_compressor( impl_->add_category_compressor(cat, std::move(bc)); } +void filesystem_writer::add_section_compressor(section_type type, + block_compressor bc) { + impl_->add_section_compressor(type, std::move(bc)); +} + } // namespace dwarfs::writer diff --git a/test/compat_test.cpp b/test/compat_test.cpp index 4e317eaf..980c8fb4 100644 --- a/test/compat_test.cpp +++ b/test/compat_test.cpp @@ -1137,7 +1137,7 @@ TEST_P(rewrite, filesystem_rewrite) { }; { - writer::filesystem_writer fsw(rewritten, lgr, pool, prog, bc, bc, bc); + writer::filesystem_writer fsw(rewritten, lgr, pool, prog); fsw.add_default_compressor(bc); auto mm = std::make_shared(filename); EXPECT_NO_THROW(reader::filesystem_v2::identify(lgr, os, mm, idss)); @@ -1159,8 +1159,8 @@ TEST_P(rewrite, filesystem_rewrite) { { std::istringstream hdr_iss(format_sh); writer::filesystem_writer_options fsw_opts; - writer::filesystem_writer fsw(rewritten, lgr, pool, prog, bc, bc, bc, - fsw_opts, &hdr_iss); + writer::filesystem_writer fsw(rewritten, lgr, pool, prog, fsw_opts, + &hdr_iss); fsw.add_default_compressor(bc); rewrite_fs(fsw, std::make_shared(filename)); } @@ -1186,8 +1186,8 @@ TEST_P(rewrite, filesystem_rewrite) { { std::istringstream hdr_iss("D"); writer::filesystem_writer_options fsw_opts; - writer::filesystem_writer fsw(rewritten2, lgr, pool, prog, bc, bc, bc, - fsw_opts, &hdr_iss); + writer::filesystem_writer fsw(rewritten2, lgr, pool, prog, fsw_opts, + &hdr_iss); fsw.add_default_compressor(bc); rewrite_fs(fsw, std::make_shared(rewritten.str())); } @@ -1204,7 +1204,7 @@ TEST_P(rewrite, filesystem_rewrite) { std::ostringstream rewritten3; { - writer::filesystem_writer fsw(rewritten3, lgr, pool, prog, bc, bc, bc); + writer::filesystem_writer fsw(rewritten3, lgr, pool, prog); fsw.add_default_compressor(bc); rewrite_fs(fsw, std::make_shared(rewritten2.str())); } @@ -1223,8 +1223,7 @@ TEST_P(rewrite, filesystem_rewrite) { { writer::filesystem_writer_options fsw_opts; fsw_opts.remove_header = true; - writer::filesystem_writer fsw(rewritten4, lgr, pool, prog, bc, bc, bc, - fsw_opts); + writer::filesystem_writer fsw(rewritten4, lgr, pool, prog, fsw_opts); fsw.add_default_compressor(bc); rewrite_fs(fsw, std::make_shared(rewritten3.str())); } @@ -1243,8 +1242,7 @@ TEST_P(rewrite, filesystem_rewrite) { { writer::filesystem_writer_options fsw_opts; fsw_opts.no_section_index = true; - writer::filesystem_writer fsw(rewritten5, lgr, pool, prog, bc, bc, bc, - fsw_opts); + writer::filesystem_writer fsw(rewritten5, lgr, pool, prog, fsw_opts); fsw.add_default_compressor(bc); rewrite_fs(fsw, std::make_shared(rewritten4.str())); } diff --git a/test/dwarfs_benchmark.cpp b/test/dwarfs_benchmark.cpp index a00fdd01..af861d92 100644 --- a/test/dwarfs_benchmark.cpp +++ b/test/dwarfs_benchmark.cpp @@ -133,7 +133,7 @@ std::string make_filesystem(::benchmark::State const& state) { std::ostringstream oss; block_compressor bc("null"); - writer::filesystem_writer fsw(oss, lgr, pool, prog, bc, bc, bc); + writer::filesystem_writer fsw(oss, lgr, pool, prog); fsw.add_default_compressor(bc); s.scan(fsw, "", prog); diff --git a/test/dwarfs_test.cpp b/test/dwarfs_test.cpp index 8ae02ee9..a9a9cde5 100644 --- a/test/dwarfs_test.cpp +++ b/test/dwarfs_test.cpp @@ -120,7 +120,7 @@ build_dwarfs(logger& lgr, std::shared_ptr input, std::ostringstream oss; block_compressor bc(compression); - writer::filesystem_writer fsw(oss, lgr, pool, *prog, bc, bc, bc); + writer::filesystem_writer fsw(oss, lgr, pool, *prog); fsw.add_default_compressor(bc); s.scan(fsw, std::filesystem::path("/"), *prog, input_list); @@ -992,7 +992,8 @@ class filter_test block_compressor bc("null"); std::ostringstream null; - writer::filesystem_writer fsw(null, lgr, pool, prog, bc, bc, bc); + writer::filesystem_writer fsw(null, lgr, pool, prog); + fsw.add_default_compressor(bc); s.scan(fsw, std::filesystem::path("/"), prog); return oss.str(); diff --git a/test/filesystem_writer_test.cpp b/test/filesystem_writer_test.cpp index 4842e00b..252ca6ec 100644 --- a/test/filesystem_writer_test.cpp +++ b/test/filesystem_writer_test.cpp @@ -48,15 +48,15 @@ TEST(filesystem_writer, compression_metadata_requirements) { block_compressor bcnull("null"); - EXPECT_NO_THROW( - filesystem_writer(devnull, lgr, pool, prog, bcnull, bcnull, bcnull)); + EXPECT_NO_THROW(filesystem_writer(devnull, lgr, pool, prog)); #ifdef DWARFS_HAVE_FLAC block_compressor bcflac("flac:level=1"); EXPECT_THAT( [&] { - filesystem_writer(devnull, lgr, pool, prog, bcflac, bcnull, bcnull); + auto fsw = filesystem_writer(devnull, lgr, pool, prog); + fsw.add_section_compressor(section_type::METADATA_V2_SCHEMA, bcflac); }, testing::ThrowsMessage(testing::HasSubstr( "cannot use 'flac [level=1]' for schema compression because " @@ -65,7 +65,8 @@ TEST(filesystem_writer, compression_metadata_requirements) { EXPECT_THAT( [&] { - filesystem_writer(devnull, lgr, pool, prog, bcnull, bcflac, bcnull); + auto fsw = filesystem_writer(devnull, lgr, pool, prog); + fsw.add_section_compressor(section_type::METADATA_V2, bcflac); }, testing::ThrowsMessage(testing::HasSubstr( "cannot use 'flac [level=1]' for metadata compression because " @@ -74,7 +75,8 @@ TEST(filesystem_writer, compression_metadata_requirements) { EXPECT_THAT( [&] { - filesystem_writer(devnull, lgr, pool, prog, bcnull, bcnull, bcflac); + auto fsw = filesystem_writer(devnull, lgr, pool, prog); + fsw.add_section_compressor(section_type::HISTORY, bcflac); }, testing::ThrowsMessage(testing::HasSubstr( "cannot use 'flac [level=1]' for history compression because " @@ -87,7 +89,8 @@ TEST(filesystem_writer, compression_metadata_requirements) { EXPECT_THAT( [&] { - filesystem_writer(devnull, lgr, pool, prog, bcrice, bcnull, bcnull); + auto fsw = filesystem_writer(devnull, lgr, pool, prog); + fsw.add_section_compressor(section_type::METADATA_V2_SCHEMA, bcrice); }, testing::ThrowsMessage(testing::HasSubstr( "cannot use 'ricepp [block_size=128]' for schema compression because " @@ -96,7 +99,8 @@ TEST(filesystem_writer, compression_metadata_requirements) { EXPECT_THAT( [&] { - filesystem_writer(devnull, lgr, pool, prog, bcnull, bcrice, bcnull); + auto fsw = filesystem_writer(devnull, lgr, pool, prog); + fsw.add_section_compressor(section_type::METADATA_V2, bcrice); }, testing::ThrowsMessage(testing::HasSubstr( "cannot use 'ricepp [block_size=128]' for metadata compression " @@ -106,7 +110,8 @@ TEST(filesystem_writer, compression_metadata_requirements) { EXPECT_THAT( [&] { - filesystem_writer(devnull, lgr, pool, prog, bcnull, bcnull, bcrice); + auto fsw = filesystem_writer(devnull, lgr, pool, prog); + fsw.add_section_compressor(section_type::HISTORY, bcrice); }, testing::ThrowsMessage(testing::HasSubstr( "cannot use 'ricepp [block_size=128]' for history compression " diff --git a/tools/src/mkdwarfs_main.cpp b/tools/src/mkdwarfs_main.cpp index 72ca634d..e6eef550 100644 --- a/tools/src/mkdwarfs_main.cpp +++ b/tools/src/mkdwarfs_main.cpp @@ -1286,8 +1286,12 @@ int mkdwarfs_main(int argc, sys_char** argv, iolayer const& iol) { }, [&](std::ostringstream& oss) -> std::ostream& { return oss; }}; - fsw.emplace(fsw_os, lgr, compress_pool, prog, schema_bc, metadata_bc, - history_bc, fswopts, header_ifs ? &header_ifs->is() : nullptr); + fsw.emplace(fsw_os, lgr, compress_pool, prog, fswopts, + header_ifs ? &header_ifs->is() : nullptr); + + fsw->add_section_compressor(section_type::METADATA_V2_SCHEMA, schema_bc); + fsw->add_section_compressor(section_type::METADATA_V2, metadata_bc); + fsw->add_section_compressor(section_type::HISTORY, history_bc); writer::categorized_option compression_opt; writer::contextual_option_parser cop("--compression", compression_opt, cp,