diff --git a/main b/main index 6b3a534..f1114b0 100755 Binary files a/main and b/main differ diff --git a/subprocess.hpp b/subprocess.hpp index a665288..cfd5d3f 100755 --- a/subprocess.hpp +++ b/subprocess.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -20,13 +21,14 @@ extern "C" { namespace subprocess { -using Buffer = std::vector; -using OutBuffer = std::vector; -using ErrBuffer = std::vector; +using Buffer = std::vector; +using OutBuffer = std::vector; +using ErrBuffer = std::vector; // Max buffer size allocated on stack for read error // from pipe static const size_t SP_MAX_ERR_BUF_SIZ = 1024; +static const size_t DEFAULT_BUF_CAP_BYTES = 8192; // Exception Classes class CalledProcessError: public std::runtime_error @@ -320,12 +322,24 @@ public: {} void operator=(const Communication&) = delete; public: - int send(const uint8_t* msg, size_t length); + int send(const char* msg, size_t length); int send(const Buffer& msg); - int send(Buffer&& msg); + + std::pair communicate(const char* msg, size_t length); + std::pair communicate(const Buffer& msg) + { return communicate(msg.data(), msg.size()); } + + void set_out_buf_cap(size_t cap) { out_buf_cap_ = cap; } + void set_err_buf_cap(size_t cap) { err_buf_cap_ = cap; } + +private: + std::pair communicate_threaded( + const char* msg, size_t length); private: Streams* stream_; + size_t out_buf_cap_ = DEFAULT_BUF_CAP_BYTES; + size_t err_buf_cap_ = DEFAULT_BUF_CAP_BYTES; }; class Streams @@ -372,15 +386,22 @@ public: void output(FILE* fp) { output_.reset(fp, fclose); } void error(FILE* fp) { error_.reset(fp, fclose); } + void set_out_buf_cap(size_t cap) { comm_.set_out_buf_cap(cap); } + void set_err_buf_cap(size_t cap) { comm_.set_err_buf_cap(cap); } + public: /* Communication forwarding API's */ - int send(const uint8_t* msg, size_t length) + int send(const char* msg, size_t length) { return comm_.send(msg, length); } int send(const Buffer& msg) { return comm_.send(msg); } - int send(Buffer&& msg) - { return comm_.send(std::move(msg)); } + std::pair communicate(const char* msg, size_t length) + { return comm_.communicate(msg, length); } + + std::pair communicate(const Buffer& msg) + { return comm_.communicate(msg); } + public:// Yes they are public @@ -441,22 +462,21 @@ public: int pid() const noexcept { return child_pid_; } - void set_out_buf_cap(); + void set_out_buf_cap(size_t cap) { stream_.set_out_buf_cap(cap); } - void set_err_buf_cap(); + void set_err_buf_cap(size_t cap) { stream_.set_err_buf_cap(cap); } - int send(const uint8_t* msg, size_t length) + int send(const char* msg, size_t length) { return stream_.send(msg, length); } int send(const Buffer& msg) { return stream_.send(msg); } - int send(Buffer&& msg) - { return stream_.send(std::move(msg)); } + std::pair communicate(const char* msg, size_t length) + { return stream_.communicate(msg, length); } - void communicate(const char* msg, size_t length, bool finish = false); - void communicate(const Buffer& msg, bool finish = false); - void communicate(Buffer&& msg, bool finish = false); + std::pair communicate(const Buffer& msg) + { return stream_.communicate(msg); } private: template @@ -729,10 +749,10 @@ namespace detail { } } - int Communication::send(const uint8_t* msg, size_t length) + int Communication::send(const char* msg, size_t length) { if (stream_->input() == nullptr) return -1; - return std::fwrite(msg, sizeof(uint8_t), length, stream_->input()); + return std::fwrite(msg, sizeof(char), length, stream_->input()); } int Communication::send(const Buffer& msg) @@ -740,9 +760,96 @@ namespace detail { return send(msg.data(), msg.size()); } - int Communication::send(Buffer&& msg) + std::pair + Communication::communicate(const char* msg, size_t length) { - return send(msg.data(), msg.size()); + // Optimization from subprocess.py + // If we are using one pipe, or no pipe + // at all, using select() or threads is unnecessary. + auto hndls = {stream_->input(), stream_->output(), stream_->error()}; + int count = std::count(std::begin(hndls), std::end(hndls), nullptr); + + if (count >= 2) { + OutBuffer obuf; + ErrBuffer ebuf; + if (stream_->input()) { + if (msg) { + int wbytes = std::fwrite(msg, sizeof(char), length, stream_->input()); + if (wbytes < length) { + if (errno != EPIPE && errno != EINVAL) { + throw OSError("fwrite error", errno); + } + } + } + // Close the input stream + stream_->input_.reset(); + } else if (stream_->output()) { + // Read till EOF + // ATTN: This could be blocking, if the process + // at the other end screws up, we get screwed up as well + obuf.resize(out_buf_cap_); + int rbytes = util::read_atmost_n(fileno(stream_->output()), + (char*)obuf.data(), obuf.size()); + if (rbytes == -1) { + throw OSError("read to obuf failed", errno); + } + // Close the output stream + stream_->output_.reset(); + } else if (stream_->error()) { + // Same screwness applies here as well + ebuf.resize(err_buf_cap_); + int rbytes = util::read_atmost_n(fileno(stream_->error()), + (char*)ebuf.data(), ebuf.size()); + if (rbytes == -1) { + throw OSError("read to ebuf failed", errno); + } + // Close the error stream + stream_->error_.reset(); + } + return std::make_pair(obuf, ebuf); + } + + return communicate_threaded(msg, length); + } + + + std::pair + Communication::communicate_threaded(const char* msg, size_t length) + { + OutBuffer obuf; + ErrBuffer ebuf; + + std::vector bg_threads; + + if (stream_->output()) { + obuf.resize(out_buf_cap_); + bg_threads.push_back(std::thread( + util::read_atmost_n, fileno(stream_->output()), + (char*)obuf.data(), obuf.size())); + } + if (stream_->error()) { + ebuf.resize(err_buf_cap_); + bg_threads.push_back(std::thread( + util::read_atmost_n, fileno(stream_->error()), + (char*)ebuf.data(), ebuf.size())); + } + if (stream_->input()) { + if (msg) { + int wbytes = std::fwrite(msg, sizeof(char), length, stream_->input()); + if (wbytes < length) { + if (errno != EPIPE && errno != EINVAL) { + for (auto& t: bg_threads) t.join(); + throw OSError("fwrite error", errno); + } + } + } + stream_->input_.reset(); + } + + // Wait for the threads to finish + for(auto& thr : bg_threads) thr.join(); + + return std::make_pair(obuf, ebuf); } }; // end namespace detail diff --git a/test/test_subprocess.cc b/test/test_subprocess.cc index dcffb27..0183e1c 100755 --- a/test/test_subprocess.cc +++ b/test/test_subprocess.cc @@ -6,12 +6,10 @@ using namespace subprocess; void test_input() { auto p = Popen({"grep", "f"}, output{PIPE}, input{PIPE}); - const char* msg = "one\ntwo\nfour\n"; - std::fwrite(msg, 1, strlen(msg), p.input()); - fclose (p.input()); - - std::vector rbuf(128); - std::fread(rbuf.data(), 1, 128, p.output()); + const char* msg = "one\two\three\four\five\n"; + p.send(msg, strlen(msg)); + auto res = p.communicate(nullptr, 0); + std::cout << res.first.data() << std::endl; } int main() {