From 938a5dd1ce431b02e07049dbaca89c4c1995e67b Mon Sep 17 00:00:00 2001 From: Marcus Holland-Moritz Date: Sun, 28 Mar 2021 23:07:19 +0200 Subject: [PATCH] Add streaming interface for filesystem extractor --- include/dwarfs/filesystem_extractor.h | 10 ++++- src/dwarfs/filesystem_extractor.cpp | 62 ++++++++++++++++++++++++++- 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/include/dwarfs/filesystem_extractor.h b/include/dwarfs/filesystem_extractor.h index eeeaf819..198e25e2 100644 --- a/include/dwarfs/filesystem_extractor.h +++ b/include/dwarfs/filesystem_extractor.h @@ -22,6 +22,7 @@ #pragma once #include +#include #include namespace dwarfs { @@ -37,11 +38,15 @@ class filesystem_extractor { return impl_->open_archive(output, format); } + void open_stream(std::ostream& os, std::string const& format) { + return impl_->open_stream(os, format); + } + void open_disk(std::string const& output) { return impl_->open_disk(output); } void close() { return impl_->close(); } - void extract(filesystem_v2& fs, size_t max_queued_bytes) { + void extract(filesystem_v2 const& fs, size_t max_queued_bytes) { return impl_->extract(fs, max_queued_bytes); } @@ -51,9 +56,10 @@ class filesystem_extractor { virtual void open_archive(std::string const& output, std::string const& format) = 0; + virtual void open_stream(std::ostream& os, std::string const& format) = 0; virtual void open_disk(std::string const& output) = 0; virtual void close() = 0; - virtual void extract(filesystem_v2& fs, size_t max_queued_bytes) = 0; + virtual void extract(filesystem_v2 const& fs, size_t max_queued_bytes) = 0; }; private: diff --git a/src/dwarfs/filesystem_extractor.cpp b/src/dwarfs/filesystem_extractor.cpp index 0a6640c3..a9a895e2 100644 --- a/src/dwarfs/filesystem_extractor.cpp +++ b/src/dwarfs/filesystem_extractor.cpp @@ -19,8 +19,11 @@ * along with dwarfs. If not, see . */ +#include #include +#include #include +#include #include @@ -29,6 +32,7 @@ #include #include +#include #include "dwarfs/filesystem_extractor.h" #include "dwarfs/filesystem_v2.h" @@ -95,6 +99,20 @@ class filesystem_extractor_ final : public filesystem_extractor::impl { a_, output.empty() ? nullptr : output.c_str())); } + void open_stream(std::ostream& os, std::string const& format) override { + if (::pipe(pipefd_) != 0) { + DWARFS_THROW(system_error, "pipe()"); + } + + iot_ = std::make_unique( + [this, &os, fd = pipefd_[0]] { pump(os, fd); }); + + a_ = ::archive_write_new(); + + check_result(::archive_write_set_format_by_name(a_, format.c_str())); + check_result(::archive_write_open_fd(a_, pipefd_[1])); + } + void open_disk(std::string const& output) override { if (!output.empty()) { if (::chdir(output.c_str()) != 0) { @@ -117,11 +135,49 @@ class filesystem_extractor_ final : public filesystem_extractor::impl { check_result(::archive_write_free(a_)); a_ = nullptr; } + + closefd(pipefd_[1]); + + if (iot_) { + iot_->join(); + iot_.reset(); + } + + closefd(pipefd_[0]); } - void extract(filesystem_v2& fs, size_t max_queued_bytes) override; + void extract(filesystem_v2 const& fs, size_t max_queued_bytes) override; private: + void closefd(int& fd) { + if (fd >= 0) { + if (::close(fd) != 0) { + DWARFS_THROW(system_error, "close()"); + } + fd = -1; + } + } + + void pump(std::ostream& os, int fd) { + folly::setThreadName("pump"); + + std::array buf; + + for (;;) { + auto rv = ::read(fd, buf.data(), buf.size()); + + if (rv == 0) { + break; + } + + if (rv < 0) { + LOG_ERROR << "read(): " << ::strerror(errno); + } + + os.write(buf.data(), rv); + } + } + void check_result(int res) { switch (res) { case ARCHIVE_OK: @@ -137,10 +193,12 @@ class filesystem_extractor_ final : public filesystem_extractor::impl { log_proxy log_; struct ::archive* a_{nullptr}; + int pipefd_[2]{-1, -1}; + std::unique_ptr iot_; }; template -void filesystem_extractor_::extract(filesystem_v2& fs, +void filesystem_extractor_::extract(filesystem_v2 const& fs, size_t max_queued_bytes) { DWARFS_CHECK(a_, "filesystem not opened");