From 694075d7d2ab4df9d73bb124eac43bf7d3167e57 Mon Sep 17 00:00:00 2001 From: Marcus Holland-Moritz Date: Sat, 20 Mar 2021 07:07:09 +0100 Subject: [PATCH] Multithreaded dwarfsck --- include/dwarfs/filesystem_v2.h | 4 +- src/dwarfs/filesystem_v2.cpp | 79 ++++++++++++++++++++++------------ src/dwarfsck.cpp | 8 +++- 3 files changed, 60 insertions(+), 31 deletions(-) diff --git a/include/dwarfs/filesystem_v2.h b/include/dwarfs/filesystem_v2.h index 4b621d37..3046aa56 100644 --- a/include/dwarfs/filesystem_v2.h +++ b/include/dwarfs/filesystem_v2.h @@ -64,8 +64,8 @@ class filesystem_v2 { static void rewrite(logger& lgr, progress& prog, std::shared_ptr mm, filesystem_writer& writer, rewrite_options const& opts); - static void identify(logger& lgr, std::shared_ptr mm, std::ostream& os, - int detail_level = 0); + static int identify(logger& lgr, std::shared_ptr mm, std::ostream& os, + int detail_level = 0, size_t num_readers = 1); void dump(std::ostream& os, int detail_level) const { impl_->dump(os, detail_level); diff --git a/src/dwarfs/filesystem_v2.cpp b/src/dwarfs/filesystem_v2.cpp index 9a723c82..aa9f6e2e 100644 --- a/src/dwarfs/filesystem_v2.cpp +++ b/src/dwarfs/filesystem_v2.cpp @@ -47,6 +47,7 @@ #include "dwarfs/mmif.h" #include "dwarfs/options.h" #include "dwarfs/progress.h" +#include "dwarfs/worker_group.h" namespace dwarfs { @@ -473,8 +474,9 @@ void filesystem_v2::rewrite(logger& lgr, progress& prog, writer.flush(); } -void filesystem_v2::identify(logger& lgr, std::shared_ptr mm, - std::ostream& os, int detail_level) { +int filesystem_v2::identify(logger& lgr, std::shared_ptr mm, + std::ostream& os, int detail_level, + size_t num_readers) { // TODO: LOG_PROXY(debug_logger_policy, lgr); filesystem_parser parser(mm); @@ -483,44 +485,65 @@ void filesystem_v2::identify(logger& lgr, std::shared_ptr mm, os << "FILESYSTEM version " << parser.version() << std::endl; } - section_map sections; + worker_group wg("reader", num_readers); + std::vector> sections; - while (auto s = parser.next_section()) { - std::vector tmp; - block_decompressor bd(s->compression(), mm->as(s->start()), - s->length(), tmp); - float compression_ratio = float(s->length()) / bd.uncompressed_size(); + while (auto sp = parser.next_section()) { + std::packaged_task task{[&, s = *sp] { + std::vector tmp; + block_decompressor bd(s.compression(), mm->as(s.start()), + s.length(), tmp); + float compression_ratio = float(s.length()) / bd.uncompressed_size(); - if (detail_level > 2) { - os << "SECTION " << s->description() - << ", blocksize=" << bd.uncompressed_size() - << ", ratio=" << fmt::format("{:.2f}%", 100.0 * compression_ratio) - << std::endl; - } - - // TODO: don't throw if we're just checking the file system - - if (!s->check_fast(*mm)) { - DWARFS_THROW(runtime_error, "checksum error in section: " + s->name()); - } - if (!s->verify(*mm)) { - DWARFS_THROW(runtime_error, - "integrity check error in section: " + s->name()); - } - if (s->type() != section_type::BLOCK) { - if (!sections.emplace(s->type(), *s).second) { - DWARFS_THROW(runtime_error, "duplicate section: " + s->name()); + if (detail_level > 2) { + os << "SECTION " << s.description() + << ", blocksize=" << bd.uncompressed_size() + << ", ratio=" << fmt::format("{:.2f}%", 100.0 * compression_ratio) + << std::endl; } + + if (!s.check_fast(*mm)) { + DWARFS_THROW(runtime_error, "checksum error in section: " + s.name()); + } + if (!s.verify(*mm)) { + DWARFS_THROW(runtime_error, + "integrity check error in section: " + s.name()); + } + + return s; + }}; + + sections.emplace_back(task.get_future()); + wg.add_job(std::move(task)); + } + + std::unordered_set seen; + int errors = 0; + + for (auto& sf : sections) { + try { + auto s = sf.get(); + + if (s.type() != section_type::BLOCK) { + if (!seen.emplace(s.type()).second) { + DWARFS_THROW(runtime_error, "duplicate section: " + s.name()); + } + } + } catch (runtime_error const& e) { + LOG_ERROR << e.what(); + ++errors; } } - if (detail_level > 0) { + if (errors == 0 and detail_level > 0) { filesystem_options fsopts; if (detail_level > 1) { fsopts.metadata.enable_nlink = true; } filesystem_v2(lgr, mm, fsopts).dump(os, detail_level); } + + return errors; } } // namespace dwarfs diff --git a/src/dwarfsck.cpp b/src/dwarfsck.cpp index 07d61538..b1542f4f 100644 --- a/src/dwarfsck.cpp +++ b/src/dwarfsck.cpp @@ -41,7 +41,10 @@ namespace dwarfs { namespace po = boost::program_options; int dwarfsck(int argc, char** argv) { + const size_t num_cpu = std::max(std::thread::hardware_concurrency(), 1u); + std::string log_level, input, export_metadata; + size_t num_workers; int detail; bool json = false; @@ -54,6 +57,9 @@ int dwarfsck(int argc, char** argv) { ("detail,d", po::value(&detail)->default_value(1), "detail level") + ("num-workers,n", + po::value(&num_workers)->default_value(num_cpu), + "number of reader worker threads") ("json", po::value(&json)->zero_tokens(), "print metadata in JSON format") @@ -105,7 +111,7 @@ int dwarfsck(int argc, char** argv) { filesystem_v2 fs(lgr, mm); std::cout << folly::toPrettyJson(fs.metadata_as_dynamic()) << std::endl; } else { - filesystem_v2::identify(lgr, mm, std::cout, detail); + filesystem_v2::identify(lgr, mm, std::cout, detail, num_workers); } } catch (system_error const& e) { LOG_ERROR << folly::exceptionStr(e);