Fix #104: read large files in chunks rather than fully

This changes the way data is sent to libarchive. For files larger
than `max_queued_bytes`, instead of fully reading the compressed
file and then sending the whole file to libarchive at once, the
code now reads chunks of at most `max_queued_bytes` and sends the
chunks to libarchive independently. Small files are treated as
before.

When extracting large files, this method is actually a lot faster
as it puts less strain on the memory allocator.
This commit is contained in:
Marcus Holland-Moritz 2022-10-21 11:12:28 +02:00
parent dc8490f583
commit 186eb763a3

View File

@ -217,6 +217,8 @@ void filesystem_extractor_<LoggerPolicy>::extract(filesystem_v2 const& fs,
worker_group archiver("archiver", 1); worker_group archiver("archiver", 1);
cache_semaphore sem; cache_semaphore sem;
LOG_DEBUG << "extractor semaphore size: " << max_queued_bytes << " bytes";
sem.post(max_queued_bytes); sem.post(max_queued_bytes);
std::atomic<bool> abort{false}; std::atomic<bool> abort{false};
@ -227,29 +229,48 @@ void filesystem_extractor_<LoggerPolicy>::extract(filesystem_v2 const& fs,
S_ISREG(entry.mode()) && size > 0) { S_ISREG(entry.mode()) && size > 0) {
auto fd = fs.open(entry); auto fd = fs.open(entry);
sem.wait(size); size_t pos = 0;
size_t remain = size;
if (auto ranges = fs.readv(fd, size, 0)) { while (remain > 0 && !abort) {
archiver.add_job([this, &sem, &abort, ranges = std::move(*ranges), ae, size_t bs = remain < max_queued_bytes ? remain : max_queued_bytes;
size]() mutable {
SCOPE_EXIT { ::archive_entry_free(ae); }; sem.wait(bs);
try {
LOG_TRACE << "archiving " << ::archive_entry_pathname(ae); if (auto ranges = fs.readv(fd, bs, pos)) {
check_result(::archive_write_header(a_, ae)); archiver.add_job([this, &sem, &abort, ranges = std::move(*ranges), ae,
for (auto& r : ranges) { pos, remain, bs, size]() mutable {
auto br = r.get(); try {
LOG_TRACE << "writing " << br.size() << " bytes"; if (pos == 0) {
check_result(::archive_write_data(a_, br.data(), br.size())); LOG_DEBUG << "extracting " << ::archive_entry_pathname(ae)
<< " (" << size << " bytes)";
check_result(::archive_write_header(a_, ae));
}
for (auto& r : ranges) {
auto br = r.get();
LOG_TRACE << "[" << pos << "] writing " << br.size()
<< " bytes for " << ::archive_entry_pathname(ae);
check_result(::archive_write_data(a_, br.data(), br.size()));
}
if (bs == remain) {
archive_entry_free(ae);
}
sem.post(bs);
} catch (...) {
LOG_ERROR << folly::exceptionStr(std::current_exception());
abort = true;
archive_entry_free(ae);
} }
sem.post(size); });
} catch (...) { } else {
LOG_ERROR << folly::exceptionStr(std::current_exception()); LOG_ERROR << "error reading " << bs << " bytes at offset " << pos
abort = true; << " from inode [" << fd
} << "]: " << ::strerror(-ranges.error());
}); break;
} else { }
LOG_ERROR << "error reading inode [" << fd
<< "]: " << ::strerror(-ranges.error()); pos += bs;
remain -= bs;
} }
} else { } else {
archiver.add_job([this, ae, &abort] { archiver.add_job([this, ae, &abort] {