Multithreaded dwarfsck

This commit is contained in:
Marcus Holland-Moritz 2021-03-20 07:07:09 +01:00
parent db5576ac29
commit 694075d7d2
3 changed files with 60 additions and 31 deletions

View File

@ -64,8 +64,8 @@ class filesystem_v2 {
static void rewrite(logger& lgr, progress& prog, std::shared_ptr<mmif> mm,
filesystem_writer& writer, rewrite_options const& opts);
static void identify(logger& lgr, std::shared_ptr<mmif> mm, std::ostream& os,
int detail_level = 0);
static int identify(logger& lgr, std::shared_ptr<mmif> mm, std::ostream& os,
int detail_level = 0, size_t num_readers = 1);
void dump(std::ostream& os, int detail_level) const {
impl_->dump(os, detail_level);

View File

@ -47,6 +47,7 @@
#include "dwarfs/mmif.h"
#include "dwarfs/options.h"
#include "dwarfs/progress.h"
#include "dwarfs/worker_group.h"
namespace dwarfs {
@ -473,8 +474,9 @@ void filesystem_v2::rewrite(logger& lgr, progress& prog,
writer.flush();
}
void filesystem_v2::identify(logger& lgr, std::shared_ptr<mmif> mm,
std::ostream& os, int detail_level) {
int filesystem_v2::identify(logger& lgr, std::shared_ptr<mmif> mm,
std::ostream& os, int detail_level,
size_t num_readers) {
// TODO:
LOG_PROXY(debug_logger_policy, lgr);
filesystem_parser parser(mm);
@ -483,44 +485,65 @@ void filesystem_v2::identify(logger& lgr, std::shared_ptr<mmif> mm,
os << "FILESYSTEM version " << parser.version() << std::endl;
}
section_map sections;
worker_group wg("reader", num_readers);
std::vector<std::future<fs_section>> sections;
while (auto s = parser.next_section()) {
std::vector<uint8_t> tmp;
block_decompressor bd(s->compression(), mm->as<uint8_t>(s->start()),
s->length(), tmp);
float compression_ratio = float(s->length()) / bd.uncompressed_size();
while (auto sp = parser.next_section()) {
std::packaged_task<fs_section()> task{[&, s = *sp] {
std::vector<uint8_t> tmp;
block_decompressor bd(s.compression(), mm->as<uint8_t>(s.start()),
s.length(), tmp);
float compression_ratio = float(s.length()) / bd.uncompressed_size();
if (detail_level > 2) {
os << "SECTION " << s->description()
<< ", blocksize=" << bd.uncompressed_size()
<< ", ratio=" << fmt::format("{:.2f}%", 100.0 * compression_ratio)
<< std::endl;
}
// TODO: don't throw if we're just checking the file system
if (!s->check_fast(*mm)) {
DWARFS_THROW(runtime_error, "checksum error in section: " + s->name());
}
if (!s->verify(*mm)) {
DWARFS_THROW(runtime_error,
"integrity check error in section: " + s->name());
}
if (s->type() != section_type::BLOCK) {
if (!sections.emplace(s->type(), *s).second) {
DWARFS_THROW(runtime_error, "duplicate section: " + s->name());
if (detail_level > 2) {
os << "SECTION " << s.description()
<< ", blocksize=" << bd.uncompressed_size()
<< ", ratio=" << fmt::format("{:.2f}%", 100.0 * compression_ratio)
<< std::endl;
}
if (!s.check_fast(*mm)) {
DWARFS_THROW(runtime_error, "checksum error in section: " + s.name());
}
if (!s.verify(*mm)) {
DWARFS_THROW(runtime_error,
"integrity check error in section: " + s.name());
}
return s;
}};
sections.emplace_back(task.get_future());
wg.add_job(std::move(task));
}
std::unordered_set<section_type> seen;
int errors = 0;
for (auto& sf : sections) {
try {
auto s = sf.get();
if (s.type() != section_type::BLOCK) {
if (!seen.emplace(s.type()).second) {
DWARFS_THROW(runtime_error, "duplicate section: " + s.name());
}
}
} catch (runtime_error const& e) {
LOG_ERROR << e.what();
++errors;
}
}
if (detail_level > 0) {
if (errors == 0 and detail_level > 0) {
filesystem_options fsopts;
if (detail_level > 1) {
fsopts.metadata.enable_nlink = true;
}
filesystem_v2(lgr, mm, fsopts).dump(os, detail_level);
}
return errors;
}
} // namespace dwarfs

View File

@ -41,7 +41,10 @@ namespace dwarfs {
namespace po = boost::program_options;
int dwarfsck(int argc, char** argv) {
const size_t num_cpu = std::max(std::thread::hardware_concurrency(), 1u);
std::string log_level, input, export_metadata;
size_t num_workers;
int detail;
bool json = false;
@ -54,6 +57,9 @@ int dwarfsck(int argc, char** argv) {
("detail,d",
po::value<int>(&detail)->default_value(1),
"detail level")
("num-workers,n",
po::value<size_t>(&num_workers)->default_value(num_cpu),
"number of reader worker threads")
("json",
po::value<bool>(&json)->zero_tokens(),
"print metadata in JSON format")
@ -105,7 +111,7 @@ int dwarfsck(int argc, char** argv) {
filesystem_v2 fs(lgr, mm);
std::cout << folly::toPrettyJson(fs.metadata_as_dynamic()) << std::endl;
} else {
filesystem_v2::identify(lgr, mm, std::cout, detail);
filesystem_v2::identify(lgr, mm, std::cout, detail, num_workers);
}
} catch (system_error const& e) {
LOG_ERROR << folly::exceptionStr(e);