Add streaming interface for filesystem extractor

This commit is contained in:
Marcus Holland-Moritz 2021-03-28 23:07:19 +02:00
parent 9ddd2a5d71
commit 938a5dd1ce
2 changed files with 68 additions and 4 deletions

View File

@ -22,6 +22,7 @@
#pragma once #pragma once
#include <memory> #include <memory>
#include <ostream>
#include <string> #include <string>
namespace dwarfs { namespace dwarfs {
@ -37,11 +38,15 @@ class filesystem_extractor {
return impl_->open_archive(output, format); return impl_->open_archive(output, format);
} }
void open_stream(std::ostream& os, std::string const& format) {
return impl_->open_stream(os, format);
}
void open_disk(std::string const& output) { return impl_->open_disk(output); } void open_disk(std::string const& output) { return impl_->open_disk(output); }
void close() { return impl_->close(); } void close() { return impl_->close(); }
void extract(filesystem_v2& fs, size_t max_queued_bytes) { void extract(filesystem_v2 const& fs, size_t max_queued_bytes) {
return impl_->extract(fs, max_queued_bytes); return impl_->extract(fs, max_queued_bytes);
} }
@ -51,9 +56,10 @@ class filesystem_extractor {
virtual void virtual void
open_archive(std::string const& output, std::string const& format) = 0; open_archive(std::string const& output, std::string const& format) = 0;
virtual void open_stream(std::ostream& os, std::string const& format) = 0;
virtual void open_disk(std::string const& output) = 0; virtual void open_disk(std::string const& output) = 0;
virtual void close() = 0; virtual void close() = 0;
virtual void extract(filesystem_v2& fs, size_t max_queued_bytes) = 0; virtual void extract(filesystem_v2 const& fs, size_t max_queued_bytes) = 0;
}; };
private: private:

View File

@ -19,8 +19,11 @@
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>. * along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
*/ */
#include <array>
#include <condition_variable> #include <condition_variable>
#include <memory>
#include <mutex> #include <mutex>
#include <thread>
#include <unistd.h> #include <unistd.h>
@ -29,6 +32,7 @@
#include <folly/ExceptionString.h> #include <folly/ExceptionString.h>
#include <folly/ScopeGuard.h> #include <folly/ScopeGuard.h>
#include <folly/system/ThreadName.h>
#include "dwarfs/filesystem_extractor.h" #include "dwarfs/filesystem_extractor.h"
#include "dwarfs/filesystem_v2.h" #include "dwarfs/filesystem_v2.h"
@ -95,6 +99,20 @@ class filesystem_extractor_ final : public filesystem_extractor::impl {
a_, output.empty() ? nullptr : output.c_str())); a_, output.empty() ? nullptr : output.c_str()));
} }
void open_stream(std::ostream& os, std::string const& format) override {
if (::pipe(pipefd_) != 0) {
DWARFS_THROW(system_error, "pipe()");
}
iot_ = std::make_unique<std::thread>(
[this, &os, fd = pipefd_[0]] { pump(os, fd); });
a_ = ::archive_write_new();
check_result(::archive_write_set_format_by_name(a_, format.c_str()));
check_result(::archive_write_open_fd(a_, pipefd_[1]));
}
void open_disk(std::string const& output) override { void open_disk(std::string const& output) override {
if (!output.empty()) { if (!output.empty()) {
if (::chdir(output.c_str()) != 0) { if (::chdir(output.c_str()) != 0) {
@ -117,11 +135,49 @@ class filesystem_extractor_ final : public filesystem_extractor::impl {
check_result(::archive_write_free(a_)); check_result(::archive_write_free(a_));
a_ = nullptr; a_ = nullptr;
} }
closefd(pipefd_[1]);
if (iot_) {
iot_->join();
iot_.reset();
}
closefd(pipefd_[0]);
} }
void extract(filesystem_v2& fs, size_t max_queued_bytes) override; void extract(filesystem_v2 const& fs, size_t max_queued_bytes) override;
private: private:
void closefd(int& fd) {
if (fd >= 0) {
if (::close(fd) != 0) {
DWARFS_THROW(system_error, "close()");
}
fd = -1;
}
}
void pump(std::ostream& os, int fd) {
folly::setThreadName("pump");
std::array<char, 1024> buf;
for (;;) {
auto rv = ::read(fd, buf.data(), buf.size());
if (rv == 0) {
break;
}
if (rv < 0) {
LOG_ERROR << "read(): " << ::strerror(errno);
}
os.write(buf.data(), rv);
}
}
void check_result(int res) { void check_result(int res) {
switch (res) { switch (res) {
case ARCHIVE_OK: case ARCHIVE_OK:
@ -137,10 +193,12 @@ class filesystem_extractor_ final : public filesystem_extractor::impl {
log_proxy<debug_logger_policy> log_; log_proxy<debug_logger_policy> log_;
struct ::archive* a_{nullptr}; struct ::archive* a_{nullptr};
int pipefd_[2]{-1, -1};
std::unique_ptr<std::thread> iot_;
}; };
template <typename LoggerPolicy> template <typename LoggerPolicy>
void filesystem_extractor_<LoggerPolicy>::extract(filesystem_v2& fs, void filesystem_extractor_<LoggerPolicy>::extract(filesystem_v2 const& fs,
size_t max_queued_bytes) { size_t max_queued_bytes) {
DWARFS_CHECK(a_, "filesystem not opened"); DWARFS_CHECK(a_, "filesystem not opened");