Refactored Buffer

This commit is contained in:
arunmu 2016-03-16 14:14:11 +05:30
parent a96466ca18
commit 6a81526254
3 changed files with 61 additions and 27 deletions

BIN
main

Binary file not shown.

View File

@ -21,10 +21,6 @@ extern "C" {
namespace subprocess { namespace subprocess {
using Buffer = std::vector<char>;
using OutBuffer = std::vector<char>;
using ErrBuffer = std::vector<char>;
// Max buffer size allocated on stack for read error // Max buffer size allocated on stack for read error
// from pipe // from pipe
static const size_t SP_MAX_ERR_BUF_SIZ = 1024; static const size_t SP_MAX_ERR_BUF_SIZ = 1024;
@ -271,6 +267,22 @@ struct error
}; };
// ~~~~ End Popen Args ~~~~ // ~~~~ End Popen Args ~~~~
//
class Buffer
{
public:
Buffer() {}
Buffer(size_t cap) { buf.resize(cap); }
void add_cap(size_t cap) { buf.resize(cap); }
public:
std::vector<char> buf;
size_t length = 0;
};
using OutBuffer = Buffer;
using ErrBuffer = Buffer;
// Fwd Decl. // Fwd Decl.
class Popen; class Popen;
@ -323,10 +335,10 @@ public:
void operator=(const Communication&) = delete; void operator=(const Communication&) = delete;
public: public:
int send(const char* msg, size_t length); int send(const char* msg, size_t length);
int send(const Buffer& msg); int send(const std::vector<char>& msg);
std::pair<OutBuffer, ErrBuffer> communicate(const char* msg, size_t length); std::pair<OutBuffer, ErrBuffer> communicate(const char* msg, size_t length);
std::pair<OutBuffer, ErrBuffer> communicate(const Buffer& msg) std::pair<OutBuffer, ErrBuffer> communicate(const std::vector<char>& msg)
{ return communicate(msg.data(), msg.size()); } { return communicate(msg.data(), msg.size()); }
void set_out_buf_cap(size_t cap) { out_buf_cap_ = cap; } void set_out_buf_cap(size_t cap) { out_buf_cap_ = cap; }
@ -393,13 +405,13 @@ public: /* Communication forwarding API's */
int send(const char* msg, size_t length) int send(const char* msg, size_t length)
{ return comm_.send(msg, length); } { return comm_.send(msg, length); }
int send(const Buffer& msg) int send(const std::vector<char>& msg)
{ return comm_.send(msg); } { return comm_.send(msg); }
std::pair<OutBuffer, ErrBuffer> communicate(const char* msg, size_t length) std::pair<OutBuffer, ErrBuffer> communicate(const char* msg, size_t length)
{ return comm_.communicate(msg, length); } { return comm_.communicate(msg, length); }
std::pair<OutBuffer, ErrBuffer> communicate(const Buffer& msg) std::pair<OutBuffer, ErrBuffer> communicate(const std::vector<char>& msg)
{ return comm_.communicate(msg); } { return comm_.communicate(msg); }
@ -469,13 +481,13 @@ public:
int send(const char* msg, size_t length) int send(const char* msg, size_t length)
{ return stream_.send(msg, length); } { return stream_.send(msg, length); }
int send(const Buffer& msg) int send(const std::vector<char>& msg)
{ return stream_.send(msg); } { return stream_.send(msg); }
std::pair<OutBuffer, ErrBuffer> communicate(const char* msg, size_t length) std::pair<OutBuffer, ErrBuffer> communicate(const char* msg, size_t length)
{ return stream_.communicate(msg, length); } { return stream_.communicate(msg, length); }
std::pair<OutBuffer, ErrBuffer> communicate(const Buffer& msg) std::pair<OutBuffer, ErrBuffer> communicate(const std::vector<char>& msg)
{ return stream_.communicate(msg); } { return stream_.communicate(msg); }
private: private:
@ -574,7 +586,11 @@ void Popen::execute_process() throw (CalledProcessError, OSError)
try { try {
char err_buf[SP_MAX_ERR_BUF_SIZ] = {0,}; char err_buf[SP_MAX_ERR_BUF_SIZ] = {0,};
int read_bytes = util::read_atmost_n(err_rd_pipe, err_buf, SP_MAX_ERR_BUF_SIZ);
int read_bytes = util::read_atmost_n(
err_rd_pipe,
err_buf,
SP_MAX_ERR_BUF_SIZ);
close(err_rd_pipe); close(err_rd_pipe);
if (read_bytes || strlen(err_buf)) { if (read_bytes || strlen(err_buf)) {
@ -755,7 +771,7 @@ namespace detail {
return std::fwrite(msg, sizeof(char), length, stream_->input()); return std::fwrite(msg, sizeof(char), length, stream_->input());
} }
int Communication::send(const Buffer& msg) int Communication::send(const std::vector<char>& msg)
{ {
return send(msg.data(), msg.size()); return send(msg.data(), msg.size());
} }
@ -787,9 +803,13 @@ namespace detail {
// Read till EOF // Read till EOF
// ATTN: This could be blocking, if the process // ATTN: This could be blocking, if the process
// at the other end screws up, we get screwed up as well // at the other end screws up, we get screwed up as well
obuf.resize(out_buf_cap_); obuf.add_cap(out_buf_cap_);
int rbytes = util::read_atmost_n(fileno(stream_->output()),
(char*)obuf.data(), obuf.size()); int rbytes = util::read_atmost_n(
fileno(stream_->output()),
obuf.buf.data(),
obuf.buf.size());
if (rbytes == -1) { if (rbytes == -1) {
throw OSError("read to obuf failed", errno); throw OSError("read to obuf failed", errno);
} }
@ -797,9 +817,13 @@ namespace detail {
stream_->output_.reset(); stream_->output_.reset();
} else if (stream_->error()) { } else if (stream_->error()) {
// Same screwness applies here as well // Same screwness applies here as well
ebuf.resize(err_buf_cap_); ebuf.add_cap(err_buf_cap_);
int rbytes = util::read_atmost_n(fileno(stream_->error()),
(char*)ebuf.data(), ebuf.size()); int rbytes = util::read_atmost_n(
fileno(stream_->error()),
ebuf.buf.data(),
ebuf.buf.size());
if (rbytes == -1) { if (rbytes == -1) {
throw OSError("read to ebuf failed", errno); throw OSError("read to ebuf failed", errno);
} }
@ -822,16 +846,26 @@ namespace detail {
std::vector<std::thread> bg_threads; std::vector<std::thread> bg_threads;
if (stream_->output()) { if (stream_->output()) {
obuf.resize(out_buf_cap_); obuf.add_cap(out_buf_cap_);
bg_threads.push_back(std::thread(
util::read_atmost_n, fileno(stream_->output()), bg_threads.push_back(
(char*)obuf.data(), obuf.size())); std::thread(
util::read_atmost_n,
fileno(stream_->output()),
obuf.buf.data(),
obuf.buf.size())
);
} }
if (stream_->error()) { if (stream_->error()) {
ebuf.resize(err_buf_cap_); ebuf.add_cap(err_buf_cap_);
bg_threads.push_back(std::thread(
util::read_atmost_n, fileno(stream_->error()), bg_threads.push_back(
(char*)ebuf.data(), ebuf.size())); std::thread(
util::read_atmost_n,
fileno(stream_->error()),
ebuf.buf.data(),
ebuf.buf.size())
);
} }
if (stream_->input()) { if (stream_->input()) {
if (msg) { if (msg) {

View File

@ -9,7 +9,7 @@ void test_input()
const char* msg = "one\ntwo\nthree\nfour\nfive\n"; const char* msg = "one\ntwo\nthree\nfour\nfive\n";
p.send(msg, strlen(msg)); p.send(msg, strlen(msg));
auto res = p.communicate(nullptr, 0); auto res = p.communicate(nullptr, 0);
std::cout << res.first.data() << std::endl; std::cout << res.first.buf.data() << std::endl;
} }
int main() { int main() {