diff --git a/src/dwarfsextract.cpp b/src/dwarfsextract.cpp index 899b8700..56ebd971 100644 --- a/src/dwarfsextract.cpp +++ b/src/dwarfsextract.cpp @@ -20,8 +20,12 @@ */ #include +#include #include #include +#include +#include +#include #include #include @@ -42,6 +46,7 @@ #include "dwarfs/options.h" #include "dwarfs/util.h" #include "dwarfs/version.h" +#include "dwarfs/worker_group.h" namespace po = boost::program_options; @@ -49,6 +54,30 @@ using namespace dwarfs; namespace { +class semaphore { + public: + void post(uint64_t n = 1) { + { + std::lock_guard lock(mx_); + count_ += n; + } + condition_.notify_one(); + } + + void wait(uint64_t n = 1) { + std::unique_lock lock(mx_); + while (count_ < n) { + condition_.wait(lock); + } + count_ -= n; + } + + private: + std::mutex mx_; + std::condition_variable condition_; + uint64_t count_{0}; +}; + int dwarfsextract(int argc, char** argv) { std::string filesystem, output, format, cache_size_str, log_level; size_t num_workers; @@ -66,7 +95,7 @@ int dwarfsextract(int argc, char** argv) { po::value(&format), "output format") ("num-workers,n", - po::value(&num_workers)->default_value(1), + po::value(&num_workers)->default_value(4), "number of worker threads") ("cache-size,s", po::value(&cache_size_str)->default_value("256m"), @@ -150,15 +179,36 @@ int dwarfsextract(int argc, char** argv) { ::archive_entry* spare = nullptr; + worker_group archiver("archiver", 1); + semaphore sem; + sem.post(fsopts.block_cache.max_bytes); + auto do_archive = [&](::archive_entry* ae, entry_view entry) { - check_result(::archive_write_header(a, ae)); - if (auto size = ::archive_entry_size(ae); size > 0) { - int fh = fs.open(entry); - iovec_read_buf irb; - fs.readv(fh, irb, size, 0); - for (auto const& iov : irb.buf) { - check_result(::archive_write_data(a, iov.iov_base, iov.iov_len)); + if (auto size = ::archive_entry_size(ae); + S_ISREG(entry.mode()) && size > 0) { + auto fd = fs.open(entry); + sem.wait(size); + auto ranges = fs.readv(fd, size, 0); + if (!ranges) { + LOG_ERROR << "error reading inode [" << fd + << "]: " << ::strerror(-ranges.error()); + return; } + archiver.add_job([&sem, &check_result, ranges = std::move(*ranges), a, + ae, size]() mutable { + check_result(::archive_write_header(a, ae)); + for (auto& r : ranges) { + auto br = r.get(); + check_result(::archive_write_data(a, br.data(), br.size())); + } + sem.post(size); + ::archive_entry_free(ae); + }); + } else { + archiver.add_job([&check_result, a, ae] { + check_result(::archive_write_header(a, ae)); + ::archive_entry_free(ae); + }); } }; @@ -197,7 +247,6 @@ int dwarfsextract(int argc, char** argv) { if (ae) { do_archive(ae, entry); - ::archive_entry_free(ae); } if (spare) { @@ -207,10 +256,11 @@ int dwarfsextract(int argc, char** argv) { } LOG_DEBUG << "archiving spare " << ::archive_entry_pathname(spare); do_archive(spare, *ev); - ::archive_entry_free(spare); } }); + archiver.wait(); + // As we're visiting *all* hardlinks, we should never see any deferred // entries. ::archive_entry* ae = nullptr;