Use asynchronous filesystem access to speed up extraction

This commit is contained in:
Marcus Holland-Moritz 2021-03-05 22:44:03 +01:00
parent 12b949525d
commit 1c080920d0

View File

@ -20,8 +20,12 @@
*/
#include <algorithm>
#include <condition_variable>
#include <cstring>
#include <exception>
#include <future>
#include <memory>
#include <mutex>
#include <vector>
#include <sys/statvfs.h>
@ -42,6 +46,7 @@
#include "dwarfs/options.h"
#include "dwarfs/util.h"
#include "dwarfs/version.h"
#include "dwarfs/worker_group.h"
namespace po = boost::program_options;
@ -49,6 +54,30 @@ using namespace dwarfs;
namespace {
class semaphore {
public:
void post(uint64_t n = 1) {
{
std::lock_guard lock(mx_);
count_ += n;
}
condition_.notify_one();
}
void wait(uint64_t n = 1) {
std::unique_lock lock(mx_);
while (count_ < n) {
condition_.wait(lock);
}
count_ -= n;
}
private:
std::mutex mx_;
std::condition_variable condition_;
uint64_t count_{0};
};
int dwarfsextract(int argc, char** argv) {
std::string filesystem, output, format, cache_size_str, log_level;
size_t num_workers;
@ -66,7 +95,7 @@ int dwarfsextract(int argc, char** argv) {
po::value<std::string>(&format),
"output format")
("num-workers,n",
po::value<size_t>(&num_workers)->default_value(1),
po::value<size_t>(&num_workers)->default_value(4),
"number of worker threads")
("cache-size,s",
po::value<std::string>(&cache_size_str)->default_value("256m"),
@ -150,15 +179,36 @@ int dwarfsextract(int argc, char** argv) {
::archive_entry* spare = nullptr;
worker_group archiver("archiver", 1);
semaphore sem;
sem.post(fsopts.block_cache.max_bytes);
auto do_archive = [&](::archive_entry* ae, entry_view entry) {
check_result(::archive_write_header(a, ae));
if (auto size = ::archive_entry_size(ae); size > 0) {
int fh = fs.open(entry);
iovec_read_buf irb;
fs.readv(fh, irb, size, 0);
for (auto const& iov : irb.buf) {
check_result(::archive_write_data(a, iov.iov_base, iov.iov_len));
if (auto size = ::archive_entry_size(ae);
S_ISREG(entry.mode()) && size > 0) {
auto fd = fs.open(entry);
sem.wait(size);
auto ranges = fs.readv(fd, size, 0);
if (!ranges) {
LOG_ERROR << "error reading inode [" << fd
<< "]: " << ::strerror(-ranges.error());
return;
}
archiver.add_job([&sem, &check_result, ranges = std::move(*ranges), a,
ae, size]() mutable {
check_result(::archive_write_header(a, ae));
for (auto& r : ranges) {
auto br = r.get();
check_result(::archive_write_data(a, br.data(), br.size()));
}
sem.post(size);
::archive_entry_free(ae);
});
} else {
archiver.add_job([&check_result, a, ae] {
check_result(::archive_write_header(a, ae));
::archive_entry_free(ae);
});
}
};
@ -197,7 +247,6 @@ int dwarfsextract(int argc, char** argv) {
if (ae) {
do_archive(ae, entry);
::archive_entry_free(ae);
}
if (spare) {
@ -207,10 +256,11 @@ int dwarfsextract(int argc, char** argv) {
}
LOG_DEBUG << "archiving spare " << ::archive_entry_pathname(spare);
do_archive(spare, *ev);
::archive_entry_free(spare);
}
});
archiver.wait();
// As we're visiting *all* hardlinks, we should never see any deferred
// entries.
::archive_entry* ae = nullptr;