add segmenter_factory

This commit is contained in:
Marcus Holland-Moritz 2023-08-13 22:29:13 +02:00
parent 8fa157bf92
commit aacb9a0d94
12 changed files with 286 additions and 108 deletions

View File

@ -399,6 +399,7 @@ list(
src/dwarfs/safe_main.cpp
src/dwarfs/scanner.cpp
src/dwarfs/segmenter.cpp
src/dwarfs/segmenter_factory.cpp
src/dwarfs/similarity.cpp
src/dwarfs/similarity_ordering.cpp
src/dwarfs/string_table.cpp

View File

@ -0,0 +1,48 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include "dwarfs/contextual_option.h"
#include "dwarfs/fragment_category.h"
namespace dwarfs {
namespace detail {
template <typename T>
struct categorized_option_policy {
using ContextArgumentType = fragment_category;
using ContextType = fragment_category::value_type;
using ValueType = T;
static ContextType context_from_arg(ContextArgumentType const& arg) {
return arg.value();
}
};
} // namespace detail
template <typename ValueType>
using categorized_option =
contextual_option<detail::categorized_option_policy<ValueType>>;
} // namespace dwarfs

View File

@ -28,9 +28,8 @@
#include <memory>
#include <optional>
#include "dwarfs/contextual_option.h"
#include "dwarfs/categorized_option.h"
#include "dwarfs/file_stat.h"
#include "dwarfs/fragment_category.h"
#include "dwarfs/types.h"
namespace dwarfs {
@ -38,25 +37,6 @@ namespace dwarfs {
class categorizer_manager;
class entry;
namespace detail {
template <typename T>
struct categorized_option_policy {
using ContextArgumentType = fragment_category;
using ContextType = fragment_category::value_type;
using ValueType = T;
static ContextType context_from_arg(ContextArgumentType const& arg) {
return arg.value();
}
};
} // namespace detail
template <typename ValueType>
using categorized_option =
contextual_option<detail::categorized_option_policy<ValueType>>;
enum class mlock_mode { NONE, TRY, MUST };
enum class cache_tidy_strategy { NONE, EXPIRY_TIME, BLOCK_SWAPPED_OUT };

View File

@ -27,8 +27,6 @@
#include <span>
#include <string>
#include "dwarfs/segmenter.h"
namespace dwarfs {
struct scanner_options;
@ -39,11 +37,12 @@ class logger;
class os_access;
class progress;
class script;
class segmenter_factory;
class worker_group;
class scanner {
public:
scanner(logger& lgr, worker_group& wg, const segmenter::config& cfg,
scanner(logger& lgr, worker_group& wg, std::shared_ptr<segmenter_factory> sf,
std::shared_ptr<entry_factory> ef, std::shared_ptr<os_access> os,
std::shared_ptr<script> scr, const scanner_options& options);

View File

@ -38,12 +38,11 @@ class progress;
class segmenter {
public:
struct config {
unsigned blockhash_window_size;
unsigned blockhash_window_size{12};
unsigned window_increment_shift{1};
size_t max_active_blocks{1};
size_t memory_limit{256 << 20};
unsigned block_size_bits{22};
unsigned bloom_filter_size{4};
unsigned block_size_bits{22};
};
using block_ready_cb = folly::Function<size_t(std::shared_ptr<block_data>)>;

View File

@ -0,0 +1,66 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <memory>
#include "dwarfs/categorized_option.h"
#include "dwarfs/segmenter.h"
namespace dwarfs {
class logger;
class progress;
class segmenter_factory {
public:
struct config {
categorized_option<unsigned> blockhash_window_size;
categorized_option<unsigned> window_increment_shift;
categorized_option<size_t> max_active_blocks;
categorized_option<unsigned> bloom_filter_size;
unsigned block_size_bits{22};
};
segmenter_factory(logger& lgr, progress& prog, config const& cfg);
segmenter create(fragment_category cat, std::shared_ptr<block_manager> blkmgr,
segmenter::block_ready_cb block_ready) const {
return impl_->create(cat, std::move(blkmgr), std::move(block_ready));
}
size_t get_block_size() const { return impl_->get_block_size(); }
class impl {
public:
virtual ~impl() = default;
virtual segmenter
create(fragment_category cat, std::shared_ptr<block_manager> blkmgr,
segmenter::block_ready_cb block_ready) const = 0;
virtual size_t get_block_size() const = 0;
};
private:
std::unique_ptr<impl> impl_;
};
} // namespace dwarfs

View File

@ -57,6 +57,7 @@
#include "dwarfs/progress.h"
#include "dwarfs/scanner.h"
#include "dwarfs/script.h"
#include "dwarfs/segmenter_factory.h"
#include "dwarfs/string_table.h"
#include "dwarfs/util.h"
#include "dwarfs/version.h"
@ -273,7 +274,7 @@ std::string status_string(progress const& p, size_t width) {
template <typename LoggerPolicy>
class scanner_ final : public scanner::impl {
public:
scanner_(logger& lgr, worker_group& wg, const segmenter::config& config,
scanner_(logger& lgr, worker_group& wg, std::shared_ptr<segmenter_factory> sf,
std::shared_ptr<entry_factory> ef, std::shared_ptr<os_access> os,
std::shared_ptr<script> scr, const scanner_options& options);
@ -297,25 +298,25 @@ class scanner_ final : public scanner::impl {
LOG_PROXY_DECL(LoggerPolicy);
worker_group& wg_;
const segmenter::config& cfg_;
const scanner_options& options_;
std::shared_ptr<entry_factory> entry_;
scanner_options const& options_;
std::shared_ptr<segmenter_factory> segmenter_factory_;
std::shared_ptr<entry_factory> entry_factory_;
std::shared_ptr<os_access> os_;
std::shared_ptr<script> script_;
};
template <typename LoggerPolicy>
scanner_<LoggerPolicy>::scanner_(logger& lgr, worker_group& wg,
const segmenter::config& cfg,
std::shared_ptr<segmenter_factory> sf,
std::shared_ptr<entry_factory> ef,
std::shared_ptr<os_access> os,
std::shared_ptr<script> scr,
const scanner_options& options)
: LOG_PROXY_INIT(lgr)
, wg_(wg)
, cfg_(cfg)
, options_(options)
, entry_(std::move(ef))
, wg_{wg}
, options_{options}
, segmenter_factory_{std::move(sf)}
, entry_factory_{std::move(ef)}
, os_(std::move(os))
, script_(std::move(scr)) {}
@ -325,7 +326,7 @@ scanner_<LoggerPolicy>::add_entry(std::filesystem::path const& name,
std::shared_ptr<dir> parent, progress& prog,
detail::file_scanner& fs, bool debug_filter) {
try {
auto pe = entry_->create(*os_, name, parent);
auto pe = entry_factory_->create(*os_, name, parent);
bool exclude = false;
if (script_) {
@ -429,7 +430,7 @@ template <typename LoggerPolicy>
std::shared_ptr<entry>
scanner_<LoggerPolicy>::scan_tree(std::filesystem::path const& path,
progress& prog, detail::file_scanner& fs) {
auto root = entry_->create(*os_, path);
auto root = entry_factory_->create(*os_, path);
bool const debug_filter = options_.debug_filter_function.has_value();
if (root->type() != entry::E_DIR) {
@ -489,7 +490,7 @@ scanner_<LoggerPolicy>::scan_list(std::filesystem::path const& path,
auto ti = LOG_TIMED_INFO;
auto root = entry_->create(*os_, path);
auto root = entry_factory_->create(*os_, path);
if (root->type() != entry::E_DIR) {
DWARFS_THROW(runtime_error,
@ -673,8 +674,6 @@ void scanner_<LoggerPolicy>::scan(
// which gets run on a worker groups; each batch keeps track of
// its CPU time and affects thread naming
// segmenter seg(LOG_GET_LOGGER, prog, cfg_, fsw);
auto blockmgr = std::make_shared<block_manager>();
{
@ -695,12 +694,11 @@ void scanner_<LoggerPolicy>::scan(
wg_blockify.add_job(
[this, catmgr, blockmgr, category, meta, &prog, &fsw,
span = im.ordered_span(category, wg_ordering)]() mutable {
// TODO: segmenter config per-category
segmenter seg(LOG_GET_LOGGER, prog, blockmgr, cfg_,
[category, meta, &fsw](auto block) {
return fsw.write_block(category.value(),
std::move(block), meta);
});
auto seg = segmenter_factory_->create(
category, blockmgr, [category, meta, &fsw](auto block) {
return fsw.write_block(category.value(), std::move(block),
meta);
});
for (auto ino : span) {
prog.current.store(ino.get());
@ -844,7 +842,7 @@ void scanner_<LoggerPolicy>::scan(
mv2.gids() = ge_data.get_gids();
mv2.modes() = ge_data.get_modes();
mv2.timestamp_base() = ge_data.get_timestamp_base();
mv2.block_size() = UINT32_C(1) << cfg_.block_size_bits;
mv2.block_size() = segmenter_factory_->get_block_size();
mv2.total_fs_size() = prog.original_size;
mv2.total_hardlink_size() = prog.hardlink_size;
mv2.options() = fsopts;
@ -870,12 +868,13 @@ void scanner_<LoggerPolicy>::scan(
<< ")";
}
scanner::scanner(logger& lgr, worker_group& wg, const segmenter::config& cfg,
scanner::scanner(logger& lgr, worker_group& wg,
std::shared_ptr<segmenter_factory> sf,
std::shared_ptr<entry_factory> ef,
std::shared_ptr<os_access> os, std::shared_ptr<script> scr,
const scanner_options& options)
: impl_(make_unique_logging_object<impl, scanner_, logger_policies>(
lgr, wg, cfg, std::move(ef), std::move(os), std::move(scr),
lgr, wg, std::move(sf), std::move(ef), std::move(os), std::move(scr),
options)) {}
} // namespace dwarfs

View File

@ -349,7 +349,7 @@ class segmenter_ final : public segmenter::impl {
LOG_PROXY_DECL(LoggerPolicy);
progress& prog_;
std::shared_ptr<block_manager> blkmgr_;
const segmenter::config& cfg_;
segmenter::config const cfg_;
segmenter::block_ready_cb block_ready_;
size_t const window_size_;

View File

@ -0,0 +1,62 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
*/
#include "dwarfs/segmenter_factory.h"
namespace dwarfs {
class segmenter_factory_ final : public segmenter_factory::impl {
public:
segmenter_factory_(logger& lgr, progress& prog,
const segmenter_factory::config& cfg)
: lgr_{lgr}
, prog_{prog}
, cfg_{cfg} {}
segmenter create(fragment_category cat, std::shared_ptr<block_manager> blkmgr,
segmenter::block_ready_cb block_ready) const override {
segmenter::config cfg;
cfg.blockhash_window_size = cfg_.blockhash_window_size.get(cat);
cfg.window_increment_shift = cfg_.window_increment_shift.get(cat);
cfg.max_active_blocks = cfg_.max_active_blocks.get(cat);
cfg.bloom_filter_size = cfg_.bloom_filter_size.get(cat);
cfg.block_size_bits = cfg_.block_size_bits;
return segmenter(lgr_, prog_, std::move(blkmgr), cfg,
std::move(block_ready));
}
size_t get_block_size() const override {
return static_cast<size_t>(1) << cfg_.block_size_bits;
}
private:
logger& lgr_;
progress& prog_;
segmenter_factory::config cfg_;
};
segmenter_factory::segmenter_factory(logger& lgr, progress& prog,
config const& cfg)
: impl_(std::make_unique<segmenter_factory_>(lgr, prog, cfg)) {}
} // namespace dwarfs

View File

@ -72,7 +72,7 @@
#include "dwarfs/progress.h"
#include "dwarfs/scanner.h"
#include "dwarfs/script.h"
#include "dwarfs/segmenter.h"
#include "dwarfs/segmenter_factory.h"
#include "dwarfs/terminal.h"
#include "dwarfs/tool.h"
#include "dwarfs/util.h"
@ -274,7 +274,7 @@ int mkdwarfs_main(int argc, sys_char** argv) {
const size_t num_cpu = std::max(folly::hardware_concurrency(), 1u);
segmenter::config cfg;
segmenter_factory::config sf_config;
sys_string path_str, output_str;
std::string memory_limit, script_arg, header, schema_compression,
metadata_compression, log_level_str, timestamp, time_resolution,
@ -283,7 +283,7 @@ int mkdwarfs_main(int argc, sys_char** argv) {
categorizer_list_str;
std::vector<sys_string> filter;
std::vector<std::string> order, max_lookback_blocks, window_size, window_step,
compression;
bloom_filter_size, compression;
size_t num_workers, num_scanner_workers;
bool no_progress = false, remove_header = false, no_section_index = false,
force_overwrite = false;
@ -294,6 +294,7 @@ int mkdwarfs_main(int argc, sys_char** argv) {
integral_value_parser<size_t> max_lookback_parser;
integral_value_parser<unsigned> window_size_parser(6, 24);
integral_value_parser<unsigned> window_step_parser(0, 8);
integral_value_parser<unsigned> bloom_filter_size_parser(0, 8);
fragment_order_parser order_parser;
block_compressor_parser compressor_parser;
@ -353,7 +354,7 @@ int mkdwarfs_main(int argc, sys_char** argv) {
po::options_description advanced_opts("Advanced options");
advanced_opts.add_options()
("block-size-bits,S",
po::value<unsigned>(&cfg.block_size_bits),
po::value<unsigned>(&sf_config.block_size_bits),
"block size bits (size = 2^arg bits)")
("num-workers,N",
po::value<size_t>(&num_workers)->default_value(num_cpu),
@ -426,7 +427,8 @@ int mkdwarfs_main(int argc, sys_char** argv) {
po::value<std::vector<std::string>>(&window_step)->multitoken(),
"window step (as right shift of size)")
("bloom-filter-size",
po::value<unsigned>(&cfg.bloom_filter_size)->default_value(4),
// po::value<unsigned>(&cfg.bloom_filter_size)->default_value(4), // TODO
po::value<std::vector<std::string>>(&bloom_filter_size)->multitoken(),
"bloom filter size (2^N*values bits)")
;
@ -601,7 +603,7 @@ int mkdwarfs_main(int argc, sys_char** argv) {
auto const& defaults = levels[level];
if (!vm.count("block-size-bits")) {
cfg.block_size_bits = defaults.block_size_bits;
sf_config.block_size_bits = defaults.block_size_bits;
}
if (!vm.count("compression")) {
@ -617,22 +619,23 @@ int mkdwarfs_main(int argc, sys_char** argv) {
}
if (!vm.count("max-lookback-blocks")) {
cfg.max_active_blocks = 1; // TODO
max_lookback_blocks.push_back(folly::to<std::string>(1));
}
if (!vm.count("window-size")) {
cfg.blockhash_window_size = defaults.window_size; // TODO
window_size.push_back(folly::to<std::string>(defaults.window_size));
}
if (!vm.count("window-step")) {
cfg.window_increment_shift = defaults.window_step; // TODO
window_step.push_back(folly::to<std::string>(defaults.window_step));
}
if (cfg.block_size_bits < min_block_size_bits ||
cfg.block_size_bits > max_block_size_bits) {
if (!vm.count("bloom-filter-size")) {
bloom_filter_size.push_back(folly::to<std::string>(4));
}
if (sf_config.block_size_bits < min_block_size_bits ||
sf_config.block_size_bits > max_block_size_bits) {
std::cerr << "error: block size must be between " << min_block_size_bits
<< " and " << max_block_size_bits << "\n";
return 1;
@ -923,13 +926,14 @@ int mkdwarfs_main(int argc, sys_char** argv) {
progress prog(std::move(updater), interval_ms);
auto min_memory_req = num_workers * (UINT64_C(1) << cfg.block_size_bits);
auto min_memory_req =
num_workers * (UINT64_C(1) << sf_config.block_size_bits);
// TODO:
if (mem_limit < min_memory_req /* && compression != "null" */) {
LOG_WARN << "low memory limit (" << size_with_unit(mem_limit) << "), need "
<< size_with_unit(min_memory_req) << " to efficiently compress "
<< size_with_unit(UINT64_C(1) << cfg.block_size_bits)
<< size_with_unit(UINT64_C(1) << sf_config.block_size_bits)
<< " blocks with " << num_workers << " threads";
}
@ -976,44 +980,45 @@ int mkdwarfs_main(int argc, sys_char** argv) {
category_parser cp(options.inode.categorizer_mgr);
try {
contextual_option_parser cop("--order", options.inode.fragment_order, cp,
order_parser);
cop.parse(defaults.order);
cop.parse(order);
LOG_DEBUG << cop.as_string();
} catch (std::exception const& e) {
LOG_ERROR << e.what();
return 1;
}
{
contextual_option_parser cop("--order", options.inode.fragment_order, cp,
order_parser);
cop.parse(defaults.order);
cop.parse(order);
LOG_DEBUG << cop.as_string();
}
try {
categorized_option<size_t> max_lookback_opt;
contextual_option_parser cop("--max-lookback-blocks", max_lookback_opt, cp,
max_lookback_parser);
cop.parse(max_lookback_blocks);
LOG_DEBUG << cop.as_string();
} catch (std::exception const& e) {
LOG_ERROR << e.what();
return 1;
}
{
contextual_option_parser cop("--max-lookback-blocks",
sf_config.max_active_blocks, cp,
max_lookback_parser);
cop.parse(max_lookback_blocks);
LOG_DEBUG << cop.as_string();
}
try {
categorized_option<unsigned> window_size_opt;
contextual_option_parser cop("--window-size", window_size_opt, cp,
window_size_parser);
cop.parse(window_size);
LOG_DEBUG << cop.as_string();
} catch (std::exception const& e) {
LOG_ERROR << e.what();
return 1;
}
{
contextual_option_parser cop("--window-size",
sf_config.blockhash_window_size, cp,
window_size_parser);
cop.parse(window_size);
LOG_DEBUG << cop.as_string();
}
try {
categorized_option<unsigned> window_step_opt;
contextual_option_parser cop("--window-step", window_step_opt, cp,
window_step_parser);
cop.parse(window_step);
LOG_DEBUG << cop.as_string();
{
contextual_option_parser cop("--window-step",
sf_config.window_increment_shift, cp,
window_step_parser);
cop.parse(window_step);
LOG_DEBUG << cop.as_string();
}
{
contextual_option_parser cop("--bloom-filter-size",
sf_config.bloom_filter_size, cp,
bloom_filter_size_parser);
cop.parse(bloom_filter_size);
LOG_DEBUG << cop.as_string();
}
} catch (std::exception const& e) {
LOG_ERROR << e.what();
return 1;
@ -1060,7 +1065,9 @@ int mkdwarfs_main(int argc, sys_char** argv) {
fsw, rw_opts);
wg_compress.wait();
} else {
scanner s(lgr, wg_scanner, cfg, entry_factory::create(),
auto sf = std::make_shared<segmenter_factory>(lgr, prog, sf_config);
scanner s(lgr, wg_scanner, sf, entry_factory::create(),
std::make_shared<os_access_generic>(), std::move(script),
options);

View File

@ -44,6 +44,7 @@
#include "dwarfs/options.h"
#include "dwarfs/progress.h"
#include "dwarfs/scanner.h"
#include "dwarfs/segmenter_factory.h"
#include "dwarfs/vfs_stat.h"
#include "filter_test_data.h"
@ -71,15 +72,26 @@ build_dwarfs(logger& lgr, std::shared_ptr<test::os_access_mock> input,
// force multithreading
worker_group wg("worker", 4);
scanner s(lgr, wg, cfg, entry_factory::create(), input, scr, options);
std::ostringstream oss;
std::unique_ptr<progress> local_prog;
if (!prog) {
local_prog = std::make_unique<progress>([](const progress&, bool) {}, 1000);
prog = local_prog.get();
}
// TODO: ugly hack :-)
segmenter_factory::config sf_cfg;
sf_cfg.block_size_bits = cfg.block_size_bits;
sf_cfg.blockhash_window_size.set_default(cfg.blockhash_window_size);
sf_cfg.window_increment_shift.set_default(cfg.window_increment_shift);
sf_cfg.max_active_blocks.set_default(cfg.max_active_blocks);
sf_cfg.bloom_filter_size.set_default(cfg.bloom_filter_size);
auto sf = std::make_shared<segmenter_factory>(lgr, *prog, sf_cfg);
scanner s(lgr, wg, sf, entry_factory::create(), input, scr, options);
std::ostringstream oss;
block_compressor bc(compression);
filesystem_writer fsw(oss, lgr, wg, *prog, bc, bc);
fsw.add_default_compressor(bc);

View File

@ -35,7 +35,7 @@
#include "dwarfs/options.h"
#include "dwarfs/progress.h"
#include "dwarfs/scanner.h"
#include "dwarfs/segmenter.h"
#include "dwarfs/segmenter_factory.h"
#include "dwarfs/string_table.h"
#include "dwarfs/vfs_stat.h"
#include "dwarfs/worker_group.h"
@ -91,10 +91,13 @@ void PackParamsDirs(::benchmark::internal::Benchmark* b) {
}
std::string make_filesystem(::benchmark::State const& state) {
segmenter::config cfg;
segmenter_factory::config cfg;
scanner_options options;
cfg.blockhash_window_size = 8;
cfg.blockhash_window_size.set_default(12);
cfg.window_increment_shift.set_default(1);
cfg.max_active_blocks.set_default(1);
cfg.bloom_filter_size.set_default(4);
cfg.block_size_bits = 12;
options.with_devices = true;
@ -112,17 +115,19 @@ std::string make_filesystem(::benchmark::State const& state) {
options.plain_symlinks_table = state.range(1);
worker_group wg("writer", 4);
progress prog([](const progress&, bool) {}, 1000);
std::ostringstream logss;
stream_logger lgr(logss); // TODO: mock
lgr.set_policy<prod_logger_policy>();
scanner s(lgr, wg, cfg, entry_factory::create(),
auto sf = std::make_shared<segmenter_factory>(lgr, prog, cfg);
scanner s(lgr, wg, sf, entry_factory::create(),
test::os_access_mock::create_test_instance(),
std::make_shared<test::script_mock>(), options);
std::ostringstream oss;
progress prog([](const progress&, bool) {}, 1000);
block_compressor bc("null");
filesystem_writer fsw(oss, lgr, wg, prog, bc, bc);