Add multihreaded nilsimsa ordering using similarity_ordering

This commit is contained in:
Marcus Holland-Moritz 2023-08-07 15:39:44 +02:00
parent 94a66087a9
commit a0d00bac2b
6 changed files with 178 additions and 44 deletions

View File

@ -54,8 +54,9 @@ class inode_manager {
size_t count() const { return impl_->count(); }
void order_inodes(std::shared_ptr<script> scr, order_cb const& fn) {
impl_->order_inodes(std::move(scr), fn);
void order_inodes(worker_group& wg, std::shared_ptr<script> scr,
order_cb const& fn) {
impl_->order_inodes(wg, std::move(scr), fn);
}
void for_each_inode_in_order(inode_cb const& fn) const {
@ -84,8 +85,8 @@ class inode_manager {
virtual std::shared_ptr<inode> create_inode() = 0;
virtual size_t count() const = 0;
virtual void
order_inodes(std::shared_ptr<script> scr, order_cb const& fn) = 0;
virtual void order_inodes(worker_group& wg, std::shared_ptr<script> scr,
order_cb const& fn) = 0;
virtual void for_each_inode_in_order(
std::function<void(std::shared_ptr<inode> const&)> const& fn) const = 0;
virtual std::vector<std::pair<fragment_category::value_type, size_t>>

View File

@ -98,7 +98,14 @@ struct filesystem_writer_options {
};
// TODO: rename
enum class file_order_mode { NONE, PATH, SCRIPT, SIMILARITY, NILSIMSA };
enum class file_order_mode {
NONE,
PATH,
SCRIPT,
SIMILARITY,
NILSIMSA,
NILSIMSA2
};
// TODO: rename
struct file_order_options {
@ -106,6 +113,8 @@ struct file_order_options {
int nilsimsa_depth{20000};
int nilsimsa_min_depth{1000};
int nilsimsa_limit{255};
int nilsimsa2_max_children{8192};
int nilsimsa2_max_cluster_size{8192};
};
struct inode_options {

View File

@ -41,6 +41,7 @@ const std::map<std::string_view, file_order_mode> order_choices{
#endif
{"similarity", file_order_mode::SIMILARITY},
{"nilsimsa", file_order_mode::NILSIMSA},
{"nilsimsa2", file_order_mode::NILSIMSA2},
};
void parse_order_option(std::string_view ordname, std::string_view opt,
@ -96,27 +97,48 @@ file_order_options fragment_order_parser::parse(std::string_view arg) const {
rv.mode = it->second;
if (order_opts.size() > 1) {
if (rv.mode != file_order_mode::NILSIMSA) {
throw std::runtime_error(
fmt::format("inode order mode '{}' does not support options",
order_opts.front()));
}
if (order_opts.size() > 4) {
throw std::runtime_error(fmt::format(
"too many options for inode order mode '{}'", order_opts.front()));
}
auto ordname = order_opts[0];
parse_order_option(ordname, order_opts[1], rv.nilsimsa_limit, "limit", 0,
255);
switch (rv.mode) {
case file_order_mode::NILSIMSA:
if (order_opts.size() > 4) {
throw std::runtime_error(fmt::format(
"too many options for inode order mode '{}'", ordname));
}
parse_order_option(ordname, order_opts[2], rv.nilsimsa_depth, "depth", 0);
parse_order_option(ordname, order_opts[1], rv.nilsimsa_limit, "limit",
0, 255);
if (order_opts.size() > 3) {
parse_order_option(ordname, order_opts[3], rv.nilsimsa_min_depth,
"min depth", 0);
if (order_opts.size() > 2) {
parse_order_option(ordname, order_opts[2], rv.nilsimsa_depth, "depth",
0);
if (order_opts.size() > 3) {
parse_order_option(ordname, order_opts[3], rv.nilsimsa_min_depth,
"min depth", 0);
}
}
break;
case file_order_mode::NILSIMSA2:
if (order_opts.size() > 4) {
throw std::runtime_error(fmt::format(
"too many options for inode order mode '{}'", ordname));
}
parse_order_option(ordname, order_opts[1], rv.nilsimsa2_max_children,
"max_children", 0);
if (order_opts.size() > 2) {
parse_order_option(ordname, order_opts[2],
rv.nilsimsa2_max_cluster_size, "max_cluster_size",
0);
}
break;
default:
throw std::runtime_error(fmt::format(
"inode order mode '{}' does not support options", ordname));
}
}
} else {
@ -145,6 +167,11 @@ fragment_order_parser::to_string(file_order_options const& opts) const {
return fmt::format("nilsimsa (limit={}, depth={}, min_depth={})",
opts.nilsimsa_limit, opts.nilsimsa_depth,
opts.nilsimsa_min_depth);
case file_order_mode::NILSIMSA2:
return fmt::format("nilsimsa2 (max_children={}, max_cluster_size={})",
opts.nilsimsa2_max_children,
opts.nilsimsa2_max_cluster_size);
}
return "<unknown>";
}

View File

@ -54,6 +54,7 @@
#include "dwarfs/progress.h"
#include "dwarfs/script.h"
#include "dwarfs/similarity.h"
#include "dwarfs/similarity_ordering.h"
#include "dwarfs/worker_group.h"
#include "dwarfs/gen-cpp2/metadata_types.h"
@ -336,6 +337,7 @@ class inode_ : public inode {
sc.try_emplace(f.category());
break;
case file_order_mode::NILSIMSA:
case file_order_mode::NILSIMSA2:
nc.try_emplace(f.category());
break;
}
@ -397,7 +399,8 @@ class inode_ : public inode {
similarity_.emplace<uint32_t>(sc.finalize());
} break;
case file_order_mode::NILSIMSA: {
case file_order_mode::NILSIMSA:
case file_order_mode::NILSIMSA2: {
nilsimsa nc;
scan_range(mm, 0, mm->size(), nc);
// TODO: can we finalize in-place?
@ -440,6 +443,63 @@ class inode_ : public inode {
nilsimsa_similarity_hash_; // TODO: remove (move to similarity_)
};
class inode_element_view
: public basic_array_similarity_element_view<256, uint64_t> {
public:
inode_element_view(std::vector<std::shared_ptr<inode>> const& inodes)
: inodes_{inodes} {}
bool exists(size_t /*i*/) const override {
// TODO: not true once we use fragments
return true;
}
size_t size() const override { return inodes_.size(); }
size_t weight(size_t i) const override { return inodes_[i]->any()->size(); }
bool bitvec_less(size_t a, size_t b) const override {
auto const& ia = *inodes_[a];
auto const& ib = *inodes_[b];
if (ia.nilsimsa_similarity_hash() < ib.nilsimsa_similarity_hash()) {
return true;
}
if (ia.nilsimsa_similarity_hash() > ib.nilsimsa_similarity_hash()) {
return false;
}
auto const& fa = *ia.any();
auto const& fb = *ib.any();
return fa.less_revpath(fb);
}
bool order_less(size_t a, size_t b) const override {
auto const& ia = *inodes_[a];
auto const& ib = *inodes_[b];
auto const& fa = *ia.any();
auto const& fb = *ib.any();
auto sa = fa.size();
auto sb = fb.size();
return sa > sb || (sa == sb && fa.less_revpath(fb));
}
bool bits_equal(size_t a, size_t b) const override {
return inodes_[a]->nilsimsa_similarity_hash() ==
inodes_[b]->nilsimsa_similarity_hash();
}
std::string description(size_t i) const override {
auto f = inodes_[i]->any();
return fmt::format("{} [{}]", f->path_as_string(), f->size());
}
nilsimsa::hash_type const& get_bits(size_t i) const override {
return inodes_[i]->nilsimsa_similarity_hash();
}
private:
std::vector<std::shared_ptr<inode>> const& inodes_;
};
} // namespace
template <typename LoggerPolicy>
@ -459,7 +519,7 @@ class inode_manager_ final : public inode_manager::impl {
size_t count() const override { return inodes_.size(); }
void order_inodes(std::shared_ptr<script> scr,
void order_inodes(worker_group& wg, std::shared_ptr<script> scr,
inode_manager::order_cb const& fn) override;
void for_each_inode_in_order(
@ -532,7 +592,8 @@ class inode_manager_ final : public inode_manager::impl {
return opts.fragment_order.any_is([](auto const& order) {
return order.mode == file_order_mode::SIMILARITY ||
order.mode == file_order_mode::NILSIMSA;
order.mode == file_order_mode::NILSIMSA ||
order.mode == file_order_mode::NILSIMSA2;
});
}
@ -578,6 +639,7 @@ class inode_manager_ final : public inode_manager::impl {
std::vector<uint32_t>& index);
void order_inodes_by_nilsimsa(inode_manager::order_cb const& fn);
void order_inodes_by_nilsimsa2(worker_group& wg);
LOG_PROXY_DECL(LoggerPolicy);
std::vector<std::shared_ptr<inode>> inodes_;
@ -617,7 +679,10 @@ void inode_manager_<LoggerPolicy>::scan_background(worker_group& wg,
template <typename LoggerPolicy>
void inode_manager_<LoggerPolicy>::order_inodes(
std::shared_ptr<script> scr, inode_manager::order_cb const& fn) {
worker_group& wg, std::shared_ptr<script> scr,
inode_manager::order_cb const& fn) {
// TODO: only use an index, never actually reorder inodes
// TODO:
switch (opts_.fragment_order.get().mode) {
case file_order_mode::NONE:
@ -659,6 +724,15 @@ void inode_manager_<LoggerPolicy>::order_inodes(
ti << count() << " inodes ordered";
return;
}
case file_order_mode::NILSIMSA2: {
LOG_INFO << "ordering " << count()
<< " inodes using new nilsimsa similarity...";
auto ti = LOG_CPU_TIMED_INFO;
order_inodes_by_nilsimsa2(wg);
ti << count() << " inodes ordered";
break;
}
}
LOG_INFO << "assigning file inodes...";
@ -824,6 +898,23 @@ void inode_manager_<LoggerPolicy>::order_inodes_by_nilsimsa(
}
}
template <typename LoggerPolicy>
void inode_manager_<LoggerPolicy>::order_inodes_by_nilsimsa2(worker_group& wg) {
auto const& file_order = opts_.fragment_order.get(); // TODO
similarity_ordering_options opts;
opts.max_children = file_order.nilsimsa2_max_children;
opts.max_cluster_size = file_order.nilsimsa2_max_cluster_size;
auto sim_order = similarity_ordering(LOG_GET_LOGGER, prog_, wg, opts);
inode_element_view ev(inodes_);
auto ordered = sim_order.order_nilsimsa(ev).get();
std::vector<std::shared_ptr<inode>> inodes;
inodes.reserve(inodes_.size());
for (auto i : ordered) {
inodes.push_back(std::move(inodes_[i]));
}
inodes.swap(inodes_);
}
template <typename LoggerPolicy>
void inode_manager_<LoggerPolicy>::dump(std::ostream& os) const {
for_each_inode_in_order(

View File

@ -33,6 +33,7 @@
#include <vector>
#include <folly/ExceptionString.h>
#include <folly/system/HardwareConcurrency.h>
#include <fmt/format.h>
@ -663,25 +664,28 @@ void scanner_<LoggerPolicy>::scan(
worker_group blockify("blockify", 1, 1 << 20);
{
worker_group ordering("ordering", 1);
// TODO
size_t const num_threads = std::max(folly::hardware_concurrency(), 1u);
worker_group wg_order("ordering", num_threads);
ordering.add_job([&] {
im.order_inodes(script_, [&](std::shared_ptr<inode> const& ino) {
blockify.add_job([&] {
prog.current.store(ino.get());
bm.add_inode(ino);
prog.inodes_written++;
});
auto queued_files = blockify.queue_size();
auto queued_blocks = fsw.queue_fill();
prog.blockify_queue = queued_files;
prog.compress_queue = queued_blocks;
return INT64_C(500) * queued_blocks +
static_cast<int64_t>(queued_files);
});
});
// ordering.add_job([&] {
im.order_inodes(wg_order, script_,
[&](std::shared_ptr<inode> const& ino) {
blockify.add_job([&] {
prog.current.store(ino.get());
bm.add_inode(ino);
prog.inodes_written++;
});
auto queued_files = blockify.queue_size();
auto queued_blocks = fsw.queue_fill();
prog.blockify_queue = queued_files;
prog.compress_queue = queued_blocks;
return INT64_C(500) * queued_blocks +
static_cast<int64_t>(queued_files);
});
// });
ordering.wait();
// wg_order.wait();
}
LOG_INFO << "waiting for segmenting/blockifying to finish...";

View File

@ -157,7 +157,8 @@ void basic_end_to_end_test(std::string const& compressor,
auto mm = std::make_shared<test::mmap_mock>(std::move(fsimage));
bool similarity = file_order == file_order_mode::SIMILARITY ||
file_order == file_order_mode::NILSIMSA;
file_order == file_order_mode::NILSIMSA ||
file_order == file_order_mode::NILSIMSA2;
size_t const num_fail_empty = access_fail ? 1 : 0;
@ -599,6 +600,7 @@ INSTANTIATE_TEST_SUITE_P(
::testing::ValuesIn(compressions), ::testing::Values(12, 15, 20, 28),
::testing::Values(file_order_mode::NONE, file_order_mode::PATH,
file_order_mode::SCRIPT, file_order_mode::NILSIMSA,
file_order_mode::NILSIMSA2,
file_order_mode::SIMILARITY),
::testing::Values(std::nullopt, "xxh3-128")));