fix: use correct worker pool, otherwise we can stall

This commit is contained in:
Marcus Holland-Moritz 2023-11-20 15:35:08 +01:00
parent 5579b35a52
commit 18cd20ea03

View File

@ -691,48 +691,44 @@ void scanner_<LoggerPolicy>::scan(
auto cc = fsw.get_compression_constraints(category.value(), meta); auto cc = fsw.get_compression_constraints(category.value(), meta);
wg_ordering.add_job([this, catmgr, blockmgr, category, cat_size, meta, cc, wg_blockify.add_job([this, catmgr, blockmgr, category, cat_size, meta, cc,
&prog, &fsw, &im, &wg_ordering, &wg_blockify] { &prog, &fsw, &im, &wg_ordering] {
wg_blockify.add_job( auto span = im.ordered_span(category, wg_ordering);
[this, catmgr, blockmgr, category, cat_size, meta, cc, &prog, &fsw, auto tv = LOG_CPU_TIMED_VERBOSE;
span = im.ordered_span(category, wg_ordering)]() mutable {
auto tv = LOG_CPU_TIMED_VERBOSE;
auto seg = segmenter_factory_->create( auto seg = segmenter_factory_->create(
category, cat_size, cc, blockmgr, category, cat_size, cc, blockmgr,
[category, meta, &fsw](auto block) { [category, meta, &fsw](auto block) {
return fsw.write_block(category.value(), std::move(block), return fsw.write_block(category.value(), std::move(block), meta);
meta); });
});
for (auto ino : span) { for (auto ino : span) {
prog.current.store(ino.get()); prog.current.store(ino.get());
// TODO: factor this code out // TODO: factor this code out
auto f = ino->any(); auto f = ino->any();
if (auto size = f->size(); size > 0) { if (auto size = f->size(); size > 0) {
auto mm = os_->map_file(f->fs_path(), size); auto mm = os_->map_file(f->fs_path(), size);
file_off_t offset{0}; file_off_t offset{0};
for (auto& frag : ino->fragments()) { for (auto& frag : ino->fragments()) {
if (frag.category() == category) { if (frag.category() == category) {
fragment_chunkable fc(*ino, frag, offset, *mm, catmgr); fragment_chunkable fc(*ino, frag, offset, *mm, catmgr);
seg.add_chunkable(fc); seg.add_chunkable(fc);
prog.fragments_written++; prog.fragments_written++;
}
offset += frag.size();
}
}
prog.inodes_written++; // TODO: remove?
} }
seg.finish(); offset += frag.size();
}
}
tv << category_prefix(catmgr, category) << "segmenting finished"; prog.inodes_written++; // TODO: remove?
}); }
seg.finish();
tv << category_prefix(catmgr, category) << "segmenting finished";
}); });
} }