communicate implemented - test case not working though

This commit is contained in:
arunmu 2016-03-16 12:12:46 +05:30
parent 9d5ce3cc44
commit 18686a79ed
3 changed files with 131 additions and 26 deletions

BIN
main

Binary file not shown.

View File

@ -8,6 +8,7 @@
#include <vector> #include <vector>
#include <sstream> #include <sstream>
#include <memory> #include <memory>
#include <thread>
#include <initializer_list> #include <initializer_list>
#include <exception> #include <exception>
@ -20,13 +21,14 @@ extern "C" {
namespace subprocess { namespace subprocess {
using Buffer = std::vector<uint8_t>; using Buffer = std::vector<char>;
using OutBuffer = std::vector<uint8_t>; using OutBuffer = std::vector<char>;
using ErrBuffer = std::vector<uint8_t>; using ErrBuffer = std::vector<char>;
// Max buffer size allocated on stack for read error // Max buffer size allocated on stack for read error
// from pipe // from pipe
static const size_t SP_MAX_ERR_BUF_SIZ = 1024; static const size_t SP_MAX_ERR_BUF_SIZ = 1024;
static const size_t DEFAULT_BUF_CAP_BYTES = 8192;
// Exception Classes // Exception Classes
class CalledProcessError: public std::runtime_error class CalledProcessError: public std::runtime_error
@ -320,12 +322,24 @@ public:
{} {}
void operator=(const Communication&) = delete; void operator=(const Communication&) = delete;
public: public:
int send(const uint8_t* msg, size_t length); int send(const char* msg, size_t length);
int send(const Buffer& msg); int send(const Buffer& msg);
int send(Buffer&& msg);
std::pair<OutBuffer, ErrBuffer> communicate(const char* msg, size_t length);
std::pair<OutBuffer, ErrBuffer> communicate(const Buffer& msg)
{ return communicate(msg.data(), msg.size()); }
void set_out_buf_cap(size_t cap) { out_buf_cap_ = cap; }
void set_err_buf_cap(size_t cap) { err_buf_cap_ = cap; }
private:
std::pair<OutBuffer, ErrBuffer> communicate_threaded(
const char* msg, size_t length);
private: private:
Streams* stream_; Streams* stream_;
size_t out_buf_cap_ = DEFAULT_BUF_CAP_BYTES;
size_t err_buf_cap_ = DEFAULT_BUF_CAP_BYTES;
}; };
class Streams class Streams
@ -372,15 +386,22 @@ public:
void output(FILE* fp) { output_.reset(fp, fclose); } void output(FILE* fp) { output_.reset(fp, fclose); }
void error(FILE* fp) { error_.reset(fp, fclose); } void error(FILE* fp) { error_.reset(fp, fclose); }
void set_out_buf_cap(size_t cap) { comm_.set_out_buf_cap(cap); }
void set_err_buf_cap(size_t cap) { comm_.set_err_buf_cap(cap); }
public: /* Communication forwarding API's */ public: /* Communication forwarding API's */
int send(const uint8_t* msg, size_t length) int send(const char* msg, size_t length)
{ return comm_.send(msg, length); } { return comm_.send(msg, length); }
int send(const Buffer& msg) int send(const Buffer& msg)
{ return comm_.send(msg); } { return comm_.send(msg); }
int send(Buffer&& msg) std::pair<OutBuffer, ErrBuffer> communicate(const char* msg, size_t length)
{ return comm_.send(std::move(msg)); } { return comm_.communicate(msg, length); }
std::pair<OutBuffer, ErrBuffer> communicate(const Buffer& msg)
{ return comm_.communicate(msg); }
public:// Yes they are public public:// Yes they are public
@ -441,22 +462,21 @@ public:
int pid() const noexcept { return child_pid_; } int pid() const noexcept { return child_pid_; }
void set_out_buf_cap(); void set_out_buf_cap(size_t cap) { stream_.set_out_buf_cap(cap); }
void set_err_buf_cap(); void set_err_buf_cap(size_t cap) { stream_.set_err_buf_cap(cap); }
int send(const uint8_t* msg, size_t length) int send(const char* msg, size_t length)
{ return stream_.send(msg, length); } { return stream_.send(msg, length); }
int send(const Buffer& msg) int send(const Buffer& msg)
{ return stream_.send(msg); } { return stream_.send(msg); }
int send(Buffer&& msg) std::pair<OutBuffer, ErrBuffer> communicate(const char* msg, size_t length)
{ return stream_.send(std::move(msg)); } { return stream_.communicate(msg, length); }
void communicate(const char* msg, size_t length, bool finish = false); std::pair<OutBuffer, ErrBuffer> communicate(const Buffer& msg)
void communicate(const Buffer& msg, bool finish = false); { return stream_.communicate(msg); }
void communicate(Buffer&& msg, bool finish = false);
private: private:
template <typename F, typename... Args> template <typename F, typename... Args>
@ -729,10 +749,10 @@ namespace detail {
} }
} }
int Communication::send(const uint8_t* msg, size_t length) int Communication::send(const char* msg, size_t length)
{ {
if (stream_->input() == nullptr) return -1; if (stream_->input() == nullptr) return -1;
return std::fwrite(msg, sizeof(uint8_t), length, stream_->input()); return std::fwrite(msg, sizeof(char), length, stream_->input());
} }
int Communication::send(const Buffer& msg) int Communication::send(const Buffer& msg)
@ -740,9 +760,96 @@ namespace detail {
return send(msg.data(), msg.size()); return send(msg.data(), msg.size());
} }
int Communication::send(Buffer&& msg) std::pair<OutBuffer, ErrBuffer>
Communication::communicate(const char* msg, size_t length)
{ {
return send(msg.data(), msg.size()); // Optimization from subprocess.py
// If we are using one pipe, or no pipe
// at all, using select() or threads is unnecessary.
auto hndls = {stream_->input(), stream_->output(), stream_->error()};
int count = std::count(std::begin(hndls), std::end(hndls), nullptr);
if (count >= 2) {
OutBuffer obuf;
ErrBuffer ebuf;
if (stream_->input()) {
if (msg) {
int wbytes = std::fwrite(msg, sizeof(char), length, stream_->input());
if (wbytes < length) {
if (errno != EPIPE && errno != EINVAL) {
throw OSError("fwrite error", errno);
}
}
}
// Close the input stream
stream_->input_.reset();
} else if (stream_->output()) {
// Read till EOF
// ATTN: This could be blocking, if the process
// at the other end screws up, we get screwed up as well
obuf.resize(out_buf_cap_);
int rbytes = util::read_atmost_n(fileno(stream_->output()),
(char*)obuf.data(), obuf.size());
if (rbytes == -1) {
throw OSError("read to obuf failed", errno);
}
// Close the output stream
stream_->output_.reset();
} else if (stream_->error()) {
// Same screwness applies here as well
ebuf.resize(err_buf_cap_);
int rbytes = util::read_atmost_n(fileno(stream_->error()),
(char*)ebuf.data(), ebuf.size());
if (rbytes == -1) {
throw OSError("read to ebuf failed", errno);
}
// Close the error stream
stream_->error_.reset();
}
return std::make_pair(obuf, ebuf);
}
return communicate_threaded(msg, length);
}
std::pair<OutBuffer, ErrBuffer>
Communication::communicate_threaded(const char* msg, size_t length)
{
OutBuffer obuf;
ErrBuffer ebuf;
std::vector<std::thread> bg_threads;
if (stream_->output()) {
obuf.resize(out_buf_cap_);
bg_threads.push_back(std::thread(
util::read_atmost_n, fileno(stream_->output()),
(char*)obuf.data(), obuf.size()));
}
if (stream_->error()) {
ebuf.resize(err_buf_cap_);
bg_threads.push_back(std::thread(
util::read_atmost_n, fileno(stream_->error()),
(char*)ebuf.data(), ebuf.size()));
}
if (stream_->input()) {
if (msg) {
int wbytes = std::fwrite(msg, sizeof(char), length, stream_->input());
if (wbytes < length) {
if (errno != EPIPE && errno != EINVAL) {
for (auto& t: bg_threads) t.join();
throw OSError("fwrite error", errno);
}
}
}
stream_->input_.reset();
}
// Wait for the threads to finish
for(auto& thr : bg_threads) thr.join();
return std::make_pair(obuf, ebuf);
} }
}; // end namespace detail }; // end namespace detail

View File

@ -6,12 +6,10 @@ using namespace subprocess;
void test_input() void test_input()
{ {
auto p = Popen({"grep", "f"}, output{PIPE}, input{PIPE}); auto p = Popen({"grep", "f"}, output{PIPE}, input{PIPE});
const char* msg = "one\ntwo\nfour\n"; const char* msg = "one\two\three\four\five\n";
std::fwrite(msg, 1, strlen(msg), p.input()); p.send(msg, strlen(msg));
fclose (p.input()); auto res = p.communicate(nullptr, 0);
std::cout << res.first.data() << std::endl;
std::vector<uint8_t> rbuf(128);
std::fread(rbuf.data(), 1, 128, p.output());
} }
int main() { int main() {