refactor(filesystem_writer): remove compressor arguments in ctor

This commit is contained in:
Marcus Holland-Moritz 2024-08-14 14:41:18 +02:00
parent c37410d602
commit 52aef6af7d
9 changed files with 139 additions and 114 deletions

View File

@ -91,6 +91,8 @@ class block_compressor {
return impl_->get_compression_constraints(metadata);
}
explicit operator bool() const { return static_cast<bool>(impl_); }
class impl {
public:
virtual ~impl() = default;

View File

@ -24,6 +24,7 @@
#include <iosfwd>
#include <dwarfs/block_compressor.h>
#include <dwarfs/fstypes.h>
#include <dwarfs/writer/fragment_category.h>
namespace dwarfs {
@ -46,17 +47,10 @@ class filesystem_writer_detail;
class filesystem_writer {
public:
filesystem_writer(std::ostream& os, logger& lgr, thread_pool& pool,
writer_progress& prog, block_compressor const& bc);
writer_progress& prog);
filesystem_writer(std::ostream& os, logger& lgr, thread_pool& pool,
writer_progress& prog, block_compressor const& schema_bc,
block_compressor const& metadata_bc,
block_compressor const& history_bc);
filesystem_writer(std::ostream& os, logger& lgr, thread_pool& pool,
writer_progress& prog, block_compressor const& schema_bc,
block_compressor const& metadata_bc,
block_compressor const& history_bc,
writer_progress& prog,
filesystem_writer_options const& options,
std::istream* header = nullptr);
@ -67,6 +61,7 @@ class filesystem_writer {
void add_default_compressor(block_compressor bc);
void add_category_compressor(fragment_category::value_type cat,
block_compressor bc);
void add_section_compressor(section_type type, block_compressor bc);
internal::filesystem_writer_detail& get_internal() { return *impl_; }

View File

@ -56,6 +56,8 @@ class filesystem_writer_detail {
virtual void add_default_compressor(block_compressor bc) = 0;
virtual void add_category_compressor(fragment_category::value_type cat,
block_compressor bc) = 0;
virtual void
add_section_compressor(section_type type, block_compressor bc) = 0;
virtual compression_constraints
get_compression_constraints(fragment_category::value_type cat,
std::string const& metadata) const = 0;

View File

@ -35,7 +35,6 @@
#include <dwarfs/block_compressor.h>
#include <dwarfs/checksum.h>
#include <dwarfs/fstypes.h>
#include <dwarfs/logger.h>
#include <dwarfs/thread_pool.h>
#include <dwarfs/util.h>
@ -73,6 +72,23 @@ size_t copy_stream(std::istream& is, std::ostream& os) {
return count;
}
std::string get_friendly_section_name(section_type type) {
switch (type) {
case section_type::METADATA_V2_SCHEMA:
return "schema";
case section_type::METADATA_V2:
return "metadata";
case section_type::HISTORY:
return "history";
case section_type::BLOCK:
return "block";
case section_type::SECTION_INDEX:
return "index";
}
return get_section_name(type);
}
class compression_progress : public progress::context {
public:
using status = progress::context::status;
@ -188,7 +204,9 @@ class raw_fsblock : public fsblock::impl {
, data_{std::move(data)}
, comp_type_{bc_.type()}
, pctx_{std::move(pctx)}
, set_block_cb_{std::move(set_block_cb)} {}
, set_block_cb_{std::move(set_block_cb)} {
DWARFS_CHECK(bc_, "block_compressor must not be null");
}
void compress(worker_group& wg, std::optional<std::string> meta) override {
std::promise<void> prom;
@ -350,7 +368,9 @@ class rewritten_fsblock : public fsblock::impl {
, data_{data}
, comp_type_{bc_.type()}
, pctx_{std::move(pctx)}
, data_comp_type_{data_comp_type} {}
, data_comp_type_{data_comp_type} {
DWARFS_CHECK(bc_, "block_compressor must not be null");
}
void compress(worker_group& wg, std::optional<std::string> meta) override {
std::promise<void> prom;
@ -528,16 +548,14 @@ class filesystem_writer_ final : public filesystem_writer_detail {
filesystem_writer_detail::physical_block_cb_type;
filesystem_writer_(logger& lgr, std::ostream& os, worker_group& wg,
progress& prog, block_compressor const& schema_bc,
block_compressor const& metadata_bc,
block_compressor const& history_bc,
filesystem_writer_options const& options,
progress& prog, filesystem_writer_options const& options,
std::istream* header);
~filesystem_writer_() noexcept override;
void add_default_compressor(block_compressor bc) override;
void add_category_compressor(fragment_category::value_type cat,
block_compressor bc) override;
void add_section_compressor(section_type type, block_compressor bc) override;
compression_constraints
get_compression_constraints(fragment_category::value_type cat,
std::string const& metadata) const override;
@ -579,9 +597,8 @@ class filesystem_writer_ final : public filesystem_writer_detail {
block_compressor const& bc, std::optional<std::string> meta,
physical_block_cb_type physical_block_cb);
void on_block_merged(block_holder_type holder);
void write_section_impl(section_type type, std::shared_ptr<block_data>&& data,
block_compressor const& bc,
std::optional<std::string> meta = std::nullopt);
void
write_section_impl(section_type type, std::shared_ptr<block_data>&& data);
void write(fsblock const& fsb);
void write(const char* data, size_t size);
template <typename T>
@ -598,10 +615,9 @@ class filesystem_writer_ final : public filesystem_writer_detail {
worker_group& wg_;
progress& prog_;
std::optional<block_compressor> default_bc_;
std::unordered_map<fragment_category::value_type, block_compressor> bc_;
block_compressor const& schema_bc_;
block_compressor const& metadata_bc_;
block_compressor const& history_bc_;
std::unordered_map<fragment_category::value_type, block_compressor>
category_bc_;
std::unordered_map<section_type, block_compressor> section_bc_;
filesystem_writer_options const options_;
LOG_PROXY_DECL(LoggerPolicy);
std::deque<block_holder_type> queue_;
@ -621,16 +637,11 @@ class filesystem_writer_ final : public filesystem_writer_detail {
template <typename LoggerPolicy>
filesystem_writer_<LoggerPolicy>::filesystem_writer_(
logger& lgr, std::ostream& os, worker_group& wg, progress& prog,
block_compressor const& schema_bc, block_compressor const& metadata_bc,
block_compressor const& history_bc,
filesystem_writer_options const& options, std::istream* header)
: os_(os)
, header_(header)
, wg_(wg)
, prog_(prog)
, schema_bc_(schema_bc)
, metadata_bc_(metadata_bc)
, history_bc_(history_bc)
, options_(options)
, LOG_PROXY_INIT(lgr)
, flush_{true} {
@ -642,26 +653,6 @@ filesystem_writer_<LoggerPolicy>::filesystem_writer_(
}
}
auto check_compressor = [](std::string_view name,
block_compressor const& bc) {
if (auto reqstr = bc.metadata_requirements(); !reqstr.empty()) {
try {
auto req = compression_metadata_requirements<nlohmann::json>{reqstr};
req.check(std::nullopt);
} catch (std::exception const& e) {
auto msg = fmt::format(
"cannot use '{}' for {} compression because compression "
"metadata requirements are not met: {}",
bc.describe(), name, e.what());
DWARFS_THROW(runtime_error, msg);
}
}
};
check_compressor("schema", schema_bc);
check_compressor("metadata", metadata_bc);
check_compressor("history", history_bc);
// TODO: the whole flush & thread thing needs to be revisited
flush_ = false;
writer_thread_ = std::thread(&filesystem_writer_::writer_thread, this);
@ -709,8 +700,8 @@ void filesystem_writer_<LoggerPolicy>::writer_thread() {
// TODO: this may throw
fsb->wait_until_compressed();
LOG_DEBUG << get_section_name(fsb->type()) << " [" << fsb->block_no()
<< "] compressed from "
LOG_DEBUG << get_friendly_section_name(fsb->type()) << " ["
<< fsb->block_no() << "] compressed from "
<< size_with_unit(fsb->uncompressed_size()) << " to "
<< size_with_unit(fsb->size()) << " [" << fsb->description()
<< "]";
@ -767,7 +758,7 @@ template <typename LoggerPolicy>
block_compressor const&
filesystem_writer_<LoggerPolicy>::compressor_for_category(
fragment_category::value_type cat) const {
if (auto it = bc_.find(cat); it != bc_.end()) {
if (auto it = category_bc_.find(cat); it != category_bc_.end()) {
LOG_DEBUG << "using compressor (" << it->second.describe()
<< ") for category " << cat;
return it->second;
@ -840,8 +831,9 @@ void filesystem_writer_<LoggerPolicy>::finish_category(fragment_category cat) {
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write_section_impl(
section_type type, std::shared_ptr<block_data>&& data,
block_compressor const& bc, std::optional<std::string> meta) {
section_type type, std::shared_ptr<block_data>&& data) {
auto& bc = get_compressor(type, std::nullopt);
uint32_t number;
{
@ -855,7 +847,7 @@ void filesystem_writer_<LoggerPolicy>::write_section_impl(
number = section_number_;
fsb->set_block_no(section_number_++);
fsb->compress(wg_, meta);
fsb->compress(wg_);
queue_.emplace_back(std::move(fsb));
}
@ -949,20 +941,62 @@ void filesystem_writer_<LoggerPolicy>::write_compressed_section(
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::add_default_compressor(
block_compressor bc) {
DWARFS_CHECK(bc, "block_compressor must not be null");
LOG_DEBUG << "adding default compressor (" << bc.describe() << ")";
if (default_bc_) {
DWARFS_THROW(runtime_error, "default compressor registered more than once");
}
default_bc_ = std::move(bc);
}
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::add_category_compressor(
fragment_category::value_type cat, block_compressor bc) {
DWARFS_CHECK(bc, "block_compressor must not be null");
LOG_DEBUG << "adding compressor (" << bc.describe() << ") for category "
<< cat;
if (!bc_.emplace(cat, std::move(bc)).second) {
DWARFS_THROW(runtime_error,
"category compressor registered more than once");
if (!category_bc_.emplace(cat, std::move(bc)).second) {
DWARFS_THROW(
runtime_error,
fmt::format("compressor registered more than once for category {}",
cat));
}
}
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::add_section_compressor(
section_type type, block_compressor bc) {
DWARFS_CHECK(bc, "block_compressor must not be null");
LOG_DEBUG << "adding compressor (" << bc.describe() << ") for section type "
<< get_friendly_section_name(type);
DWARFS_CHECK(type != section_type::SECTION_INDEX,
"SECTION_INDEX is always uncompressed");
if (auto reqstr = bc.metadata_requirements(); !reqstr.empty()) {
try {
auto req = compression_metadata_requirements<nlohmann::json>{reqstr};
req.check(std::nullopt);
} catch (std::exception const& e) {
auto msg =
fmt::format("cannot use '{}' for {} compression because compression "
"metadata requirements are not met: {}",
bc.describe(), get_friendly_section_name(type), e.what());
DWARFS_THROW(runtime_error, msg);
}
}
if (!section_bc_.emplace(type, std::move(bc)).second) {
DWARFS_THROW(
runtime_error,
fmt::format("compressor registered more than once for section type {}",
get_friendly_section_name(type)));
}
}
@ -976,22 +1010,14 @@ auto filesystem_writer_<LoggerPolicy>::get_compression_constraints(
template <typename LoggerPolicy>
block_compressor const& filesystem_writer_<LoggerPolicy>::get_compressor(
section_type type, std::optional<fragment_category::value_type> cat) const {
switch (type) {
case section_type::METADATA_V2_SCHEMA:
return schema_bc_;
case section_type::METADATA_V2:
return metadata_bc_;
case section_type::HISTORY:
return history_bc_;
default:
break;
if (cat) {
DWARFS_CHECK(type == section_type::BLOCK,
"category-specific compressors are only supported for blocks");
return compressor_for_category(*cat);
}
if (cat) {
return compressor_for_category(*cat);
if (auto it = section_bc_.find(type); it != section_bc_.end()) {
return it->second;
}
return default_bc_.value();
@ -1043,20 +1069,19 @@ void filesystem_writer_<LoggerPolicy>::write_block(
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write_metadata_v2_schema(
std::shared_ptr<block_data>&& data) {
write_section_impl(section_type::METADATA_V2_SCHEMA, std::move(data),
schema_bc_);
write_section_impl(section_type::METADATA_V2_SCHEMA, std::move(data));
}
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write_metadata_v2(
std::shared_ptr<block_data>&& data) {
write_section_impl(section_type::METADATA_V2, std::move(data), metadata_bc_);
write_section_impl(section_type::METADATA_V2, std::move(data));
}
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write_history(
std::shared_ptr<block_data>&& data) {
write_section_impl(section_type::HISTORY, std::move(data), history_bc_);
write_section_impl(section_type::HISTORY, std::move(data));
}
template <typename LoggerPolicy>
@ -1104,30 +1129,18 @@ void filesystem_writer_<LoggerPolicy>::write_section_index() {
} // namespace internal
filesystem_writer::filesystem_writer(std::ostream& os, logger& lgr,
thread_pool& pool, writer_progress& prog,
block_compressor const& bc)
: filesystem_writer(os, lgr, pool, prog, bc, bc, bc) {}
thread_pool& pool, writer_progress& prog)
: filesystem_writer(os, lgr, pool, prog, {}) {}
filesystem_writer::filesystem_writer(std::ostream& os, logger& lgr,
thread_pool& pool, writer_progress& prog,
block_compressor const& schema_bc,
block_compressor const& metadata_bc,
block_compressor const& history_bc)
: filesystem_writer(os, lgr, pool, prog, schema_bc, metadata_bc, history_bc,
filesystem_writer_options{}) {}
filesystem_writer::filesystem_writer(std::ostream& os, logger& lgr,
thread_pool& pool, writer_progress& prog,
block_compressor const& schema_bc,
block_compressor const& metadata_bc,
block_compressor const& history_bc,
filesystem_writer_options const& options,
std::istream* header)
: impl_{make_unique_logging_object<internal::filesystem_writer_detail,
internal::filesystem_writer_,
logger_policies>(
lgr, os, pool.get_worker_group(), prog.get_internal(), schema_bc,
metadata_bc, history_bc, options, header)} {}
lgr, os, pool.get_worker_group(), prog.get_internal(), options,
header)} {}
filesystem_writer::~filesystem_writer() = default;
filesystem_writer::filesystem_writer(filesystem_writer&&) = default;
@ -1142,4 +1155,9 @@ void filesystem_writer::add_category_compressor(
impl_->add_category_compressor(cat, std::move(bc));
}
void filesystem_writer::add_section_compressor(section_type type,
block_compressor bc) {
impl_->add_section_compressor(type, std::move(bc));
}
} // namespace dwarfs::writer

View File

@ -1137,7 +1137,7 @@ TEST_P(rewrite, filesystem_rewrite) {
};
{
writer::filesystem_writer fsw(rewritten, lgr, pool, prog, bc, bc, bc);
writer::filesystem_writer fsw(rewritten, lgr, pool, prog);
fsw.add_default_compressor(bc);
auto mm = std::make_shared<mmap>(filename);
EXPECT_NO_THROW(reader::filesystem_v2::identify(lgr, os, mm, idss));
@ -1159,8 +1159,8 @@ TEST_P(rewrite, filesystem_rewrite) {
{
std::istringstream hdr_iss(format_sh);
writer::filesystem_writer_options fsw_opts;
writer::filesystem_writer fsw(rewritten, lgr, pool, prog, bc, bc, bc,
fsw_opts, &hdr_iss);
writer::filesystem_writer fsw(rewritten, lgr, pool, prog, fsw_opts,
&hdr_iss);
fsw.add_default_compressor(bc);
rewrite_fs(fsw, std::make_shared<mmap>(filename));
}
@ -1186,8 +1186,8 @@ TEST_P(rewrite, filesystem_rewrite) {
{
std::istringstream hdr_iss("D");
writer::filesystem_writer_options fsw_opts;
writer::filesystem_writer fsw(rewritten2, lgr, pool, prog, bc, bc, bc,
fsw_opts, &hdr_iss);
writer::filesystem_writer fsw(rewritten2, lgr, pool, prog, fsw_opts,
&hdr_iss);
fsw.add_default_compressor(bc);
rewrite_fs(fsw, std::make_shared<test::mmap_mock>(rewritten.str()));
}
@ -1204,7 +1204,7 @@ TEST_P(rewrite, filesystem_rewrite) {
std::ostringstream rewritten3;
{
writer::filesystem_writer fsw(rewritten3, lgr, pool, prog, bc, bc, bc);
writer::filesystem_writer fsw(rewritten3, lgr, pool, prog);
fsw.add_default_compressor(bc);
rewrite_fs(fsw, std::make_shared<test::mmap_mock>(rewritten2.str()));
}
@ -1223,8 +1223,7 @@ TEST_P(rewrite, filesystem_rewrite) {
{
writer::filesystem_writer_options fsw_opts;
fsw_opts.remove_header = true;
writer::filesystem_writer fsw(rewritten4, lgr, pool, prog, bc, bc, bc,
fsw_opts);
writer::filesystem_writer fsw(rewritten4, lgr, pool, prog, fsw_opts);
fsw.add_default_compressor(bc);
rewrite_fs(fsw, std::make_shared<test::mmap_mock>(rewritten3.str()));
}
@ -1243,8 +1242,7 @@ TEST_P(rewrite, filesystem_rewrite) {
{
writer::filesystem_writer_options fsw_opts;
fsw_opts.no_section_index = true;
writer::filesystem_writer fsw(rewritten5, lgr, pool, prog, bc, bc, bc,
fsw_opts);
writer::filesystem_writer fsw(rewritten5, lgr, pool, prog, fsw_opts);
fsw.add_default_compressor(bc);
rewrite_fs(fsw, std::make_shared<test::mmap_mock>(rewritten4.str()));
}

View File

@ -133,7 +133,7 @@ std::string make_filesystem(::benchmark::State const& state) {
std::ostringstream oss;
block_compressor bc("null");
writer::filesystem_writer fsw(oss, lgr, pool, prog, bc, bc, bc);
writer::filesystem_writer fsw(oss, lgr, pool, prog);
fsw.add_default_compressor(bc);
s.scan(fsw, "", prog);

View File

@ -120,7 +120,7 @@ build_dwarfs(logger& lgr, std::shared_ptr<test::os_access_mock> input,
std::ostringstream oss;
block_compressor bc(compression);
writer::filesystem_writer fsw(oss, lgr, pool, *prog, bc, bc, bc);
writer::filesystem_writer fsw(oss, lgr, pool, *prog);
fsw.add_default_compressor(bc);
s.scan(fsw, std::filesystem::path("/"), *prog, input_list);
@ -992,7 +992,8 @@ class filter_test
block_compressor bc("null");
std::ostringstream null;
writer::filesystem_writer fsw(null, lgr, pool, prog, bc, bc, bc);
writer::filesystem_writer fsw(null, lgr, pool, prog);
fsw.add_default_compressor(bc);
s.scan(fsw, std::filesystem::path("/"), prog);
return oss.str();

View File

@ -48,15 +48,15 @@ TEST(filesystem_writer, compression_metadata_requirements) {
block_compressor bcnull("null");
EXPECT_NO_THROW(
filesystem_writer(devnull, lgr, pool, prog, bcnull, bcnull, bcnull));
EXPECT_NO_THROW(filesystem_writer(devnull, lgr, pool, prog));
#ifdef DWARFS_HAVE_FLAC
block_compressor bcflac("flac:level=1");
EXPECT_THAT(
[&] {
filesystem_writer(devnull, lgr, pool, prog, bcflac, bcnull, bcnull);
auto fsw = filesystem_writer(devnull, lgr, pool, prog);
fsw.add_section_compressor(section_type::METADATA_V2_SCHEMA, bcflac);
},
testing::ThrowsMessage<dwarfs::runtime_error>(testing::HasSubstr(
"cannot use 'flac [level=1]' for schema compression because "
@ -65,7 +65,8 @@ TEST(filesystem_writer, compression_metadata_requirements) {
EXPECT_THAT(
[&] {
filesystem_writer(devnull, lgr, pool, prog, bcnull, bcflac, bcnull);
auto fsw = filesystem_writer(devnull, lgr, pool, prog);
fsw.add_section_compressor(section_type::METADATA_V2, bcflac);
},
testing::ThrowsMessage<dwarfs::runtime_error>(testing::HasSubstr(
"cannot use 'flac [level=1]' for metadata compression because "
@ -74,7 +75,8 @@ TEST(filesystem_writer, compression_metadata_requirements) {
EXPECT_THAT(
[&] {
filesystem_writer(devnull, lgr, pool, prog, bcnull, bcnull, bcflac);
auto fsw = filesystem_writer(devnull, lgr, pool, prog);
fsw.add_section_compressor(section_type::HISTORY, bcflac);
},
testing::ThrowsMessage<dwarfs::runtime_error>(testing::HasSubstr(
"cannot use 'flac [level=1]' for history compression because "
@ -87,7 +89,8 @@ TEST(filesystem_writer, compression_metadata_requirements) {
EXPECT_THAT(
[&] {
filesystem_writer(devnull, lgr, pool, prog, bcrice, bcnull, bcnull);
auto fsw = filesystem_writer(devnull, lgr, pool, prog);
fsw.add_section_compressor(section_type::METADATA_V2_SCHEMA, bcrice);
},
testing::ThrowsMessage<dwarfs::runtime_error>(testing::HasSubstr(
"cannot use 'ricepp [block_size=128]' for schema compression because "
@ -96,7 +99,8 @@ TEST(filesystem_writer, compression_metadata_requirements) {
EXPECT_THAT(
[&] {
filesystem_writer(devnull, lgr, pool, prog, bcnull, bcrice, bcnull);
auto fsw = filesystem_writer(devnull, lgr, pool, prog);
fsw.add_section_compressor(section_type::METADATA_V2, bcrice);
},
testing::ThrowsMessage<dwarfs::runtime_error>(testing::HasSubstr(
"cannot use 'ricepp [block_size=128]' for metadata compression "
@ -106,7 +110,8 @@ TEST(filesystem_writer, compression_metadata_requirements) {
EXPECT_THAT(
[&] {
filesystem_writer(devnull, lgr, pool, prog, bcnull, bcnull, bcrice);
auto fsw = filesystem_writer(devnull, lgr, pool, prog);
fsw.add_section_compressor(section_type::HISTORY, bcrice);
},
testing::ThrowsMessage<dwarfs::runtime_error>(testing::HasSubstr(
"cannot use 'ricepp [block_size=128]' for history compression "

View File

@ -1286,8 +1286,12 @@ int mkdwarfs_main(int argc, sys_char** argv, iolayer const& iol) {
},
[&](std::ostringstream& oss) -> std::ostream& { return oss; }};
fsw.emplace(fsw_os, lgr, compress_pool, prog, schema_bc, metadata_bc,
history_bc, fswopts, header_ifs ? &header_ifs->is() : nullptr);
fsw.emplace(fsw_os, lgr, compress_pool, prog, fswopts,
header_ifs ? &header_ifs->is() : nullptr);
fsw->add_section_compressor(section_type::METADATA_V2_SCHEMA, schema_bc);
fsw->add_section_compressor(section_type::METADATA_V2, metadata_bc);
fsw->add_section_compressor(section_type::HISTORY, history_bc);
writer::categorized_option<block_compressor> compression_opt;
writer::contextual_option_parser cop("--compression", compression_opt, cp,