From 186eb763a3085e545f33b39fa77a96564ce4b781 Mon Sep 17 00:00:00 2001 From: Marcus Holland-Moritz Date: Fri, 21 Oct 2022 11:12:28 +0200 Subject: [PATCH] Fix #104: read large files in chunks rather than fully This changes the way data is sent to libarchive. For files larger than `max_queued_bytes`, instead of fully reading the compressed file and then sending the whole file to libarchive at once, the code now reads chunks of at most `max_queued_bytes` and sends the chunks to libarchive independently. Small files are treated as before. When extracting large files, this method is actually a lot faster as it puts less strain on the memory allocator. --- src/dwarfs/filesystem_extractor.cpp | 63 +++++++++++++++++++---------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/src/dwarfs/filesystem_extractor.cpp b/src/dwarfs/filesystem_extractor.cpp index 342f7a98..0c2172d6 100644 --- a/src/dwarfs/filesystem_extractor.cpp +++ b/src/dwarfs/filesystem_extractor.cpp @@ -217,6 +217,8 @@ void filesystem_extractor_::extract(filesystem_v2 const& fs, worker_group archiver("archiver", 1); cache_semaphore sem; + LOG_DEBUG << "extractor semaphore size: " << max_queued_bytes << " bytes"; + sem.post(max_queued_bytes); std::atomic abort{false}; @@ -227,29 +229,48 @@ void filesystem_extractor_::extract(filesystem_v2 const& fs, S_ISREG(entry.mode()) && size > 0) { auto fd = fs.open(entry); - sem.wait(size); + size_t pos = 0; + size_t remain = size; - if (auto ranges = fs.readv(fd, size, 0)) { - archiver.add_job([this, &sem, &abort, ranges = std::move(*ranges), ae, - size]() mutable { - SCOPE_EXIT { ::archive_entry_free(ae); }; - try { - LOG_TRACE << "archiving " << ::archive_entry_pathname(ae); - check_result(::archive_write_header(a_, ae)); - for (auto& r : ranges) { - auto br = r.get(); - LOG_TRACE << "writing " << br.size() << " bytes"; - check_result(::archive_write_data(a_, br.data(), br.size())); + while (remain > 0 && !abort) { + size_t bs = remain < max_queued_bytes ? remain : max_queued_bytes; + + sem.wait(bs); + + if (auto ranges = fs.readv(fd, bs, pos)) { + archiver.add_job([this, &sem, &abort, ranges = std::move(*ranges), ae, + pos, remain, bs, size]() mutable { + try { + if (pos == 0) { + LOG_DEBUG << "extracting " << ::archive_entry_pathname(ae) + << " (" << size << " bytes)"; + check_result(::archive_write_header(a_, ae)); + } + for (auto& r : ranges) { + auto br = r.get(); + LOG_TRACE << "[" << pos << "] writing " << br.size() + << " bytes for " << ::archive_entry_pathname(ae); + check_result(::archive_write_data(a_, br.data(), br.size())); + } + if (bs == remain) { + archive_entry_free(ae); + } + sem.post(bs); + } catch (...) { + LOG_ERROR << folly::exceptionStr(std::current_exception()); + abort = true; + archive_entry_free(ae); } - sem.post(size); - } catch (...) { - LOG_ERROR << folly::exceptionStr(std::current_exception()); - abort = true; - } - }); - } else { - LOG_ERROR << "error reading inode [" << fd - << "]: " << ::strerror(-ranges.error()); + }); + } else { + LOG_ERROR << "error reading " << bs << " bytes at offset " << pos + << " from inode [" << fd + << "]: " << ::strerror(-ranges.error()); + break; + } + + pos += bs; + remain -= bs; } } else { archiver.add_job([this, ae, &abort] {