Code refatored - enter streams

This commit is contained in:
arunmu 2016-03-15 21:51:24 +05:30
parent 7260f1a358
commit 7e4289edcb
2 changed files with 137 additions and 99 deletions

BIN
main

Binary file not shown.

View File

@ -20,6 +20,9 @@ extern "C" {
namespace subprocess {
using OutBuffer = std::vector<uint8_t>;
using ErrBuffer = std::vector<uint8_t>;
// Max buffer size allocated on stack for read error
// from pipe
static const size_t SP_MAX_ERR_BUF_SIZ = 1024;
@ -307,6 +310,65 @@ private:
int err_wr_pipe_ = -1;
};
class Streams
{
public:
Streams() {}
void operator=(const Streams&) = delete;
public:
void setup_comm_channels();
void cleanup_fds()
{
if (write_to_child_ != -1 && read_from_parent_ != -1) {
close(write_to_child_);
}
if (write_to_parent_ != -1 && read_from_child_ != -1) {
close(read_from_child_);
}
if (err_write_ != -1 && err_read_ != -1) {
close(err_read_);
}
}
void close_parent_fds()
{
if (write_to_child_ != -1) close(write_to_child_);
if (read_from_child_ != -1) close(read_from_child_);
if (err_read_ != -1) close(err_read_);
}
void close_child_fds()
{
if (write_to_parent_ != -1) close(write_to_parent_);
if (read_from_parent_ != -1) close(read_from_parent_);
if (err_write_ != -1) close(err_write_);
}
public:// Yes they are public
std::shared_ptr<FILE> input_ = nullptr;
std::shared_ptr<FILE> output_ = nullptr;
std::shared_ptr<FILE> error_ = nullptr;
// Buffer size for the IO streams
int bufsiz_ = 0;
// Pipes for communicating with child
// Emulates stdin
int write_to_child_ = -1; // Parent owned descriptor
int read_from_parent_ = -1; // Child owned descriptor
// Emulates stdout
int write_to_parent_ = -1; // Child owned descriptor
int read_from_child_ = -1; // Parent owned descriptor
// Emulates stderr
int err_write_ = -1; // Write error to parent (Child owned)
int err_read_ = -1; // Read error from child (Parent owned)
};
}; // end namespace detail
@ -335,23 +397,19 @@ public:
if (!defer_process_start_) execute_process();
}
void start_process() throw (CalledProcessError, OSError)
{
// The process was started/tried to be started
// in the constructor itself.
// For explicitly calling this API to start the
// process, 'defer_spawn' argument must be set to
// true in the constructor.
if (!defer_process_start_) {
assert (0);
return;
}
execute_process();
}
void start_process() throw (CalledProcessError, OSError);
FILE* input() { return input_.get(); }
FILE* output() { return output_.get(); }
FILE* error() { return error_.get(); }
int pid() const noexcept { return child_pid_; }
void communicate(bool finish = false);
void set_out_buf_cap();
void set_err_buf_cap();
OutBuffer& out_buf();
ErrBuffer& err_buf();
private:
template <typename F, typename... Args>
@ -359,35 +417,16 @@ private:
void init_args();
void populate_c_argv();
void execute_process() throw (CalledProcessError, OSError);
void cleanup_fds();
void setup_comm_channels();
private:
detail::Streams stream_;
bool defer_process_start_ = false;
bool close_fds_ = false;
int bufsiz_ = 0;
std::string exe_name_;
std::string cwd_;
std::map<std::string, std::string> env_;
std::shared_ptr<FILE> input_ = nullptr;
std::shared_ptr<FILE> output_ = nullptr;
std::shared_ptr<FILE> error_ = nullptr;
// Pipes for communicating with child
// Emulates stdin
int write_to_child_ = -1; // Parent owned descriptor
int read_from_parent_ = -1; // Child owned descriptor
// Emulates stdout
int write_to_parent_ = -1; // Child owned descriptor
int read_from_child_ = -1; // Parent owned descriptor
// Emulates stderr
int err_write_ = -1; // Write error to parent (Child owned)
int err_read_ = -1; // Read error from child (Parent owned)
// Command in string format
std::string args_;
// Comamnd provided as sequence
@ -416,43 +455,20 @@ void Popen::populate_c_argv()
for (auto& arg : vargs_) cargv_.push_back(&arg[0]);
}
void Popen::cleanup_fds()
void Popen::start_process() throw (CalledProcessError, OSError)
{
if (write_to_child_ != -1 && read_from_parent_ != -1) {
close(write_to_child_);
}
if (write_to_parent_ != -1 && read_from_child_ != -1) {
close(read_from_child_);
}
if (err_write_ != -1 && err_read_ != -1) {
close(err_read_);
// The process was started/tried to be started
// in the constructor itself.
// For explicitly calling this API to start the
// process, 'defer_spawn' argument must be set to
// true in the constructor.
if (!defer_process_start_) {
assert (0);
return;
}
execute_process();
}
void Popen::setup_comm_channels()
{
if (write_to_child_ != -1) input_.reset(fdopen(write_to_child_, "wb"), fclose);
if (read_from_child_ != -1) output_.reset(fdopen(read_from_child_, "rb"), fclose);
if (err_read_ != -1) error_.reset(fdopen(err_read_, "rb"), fclose);
auto handles = {input_.get(), output_.get(), error_.get()};
for (auto& h : handles) {
if (h == nullptr) continue;
switch (bufsiz_) {
case 0:
setvbuf(h, nullptr, _IONBF, BUFSIZ);
break;
case 1:
setvbuf(h, nullptr, _IONBF, BUFSIZ);
break;
default:
setvbuf(h, nullptr, _IOFBF, bufsiz_);
};
}
}
void Popen::execute_process() throw (CalledProcessError, OSError)
{
@ -474,9 +490,8 @@ void Popen::execute_process() throw (CalledProcessError, OSError)
if (child_pid_ == 0)
{
// Close descriptors belonging to parent
if (write_to_child_ != -1) close(write_to_child_);
if (read_from_child_ != -1) close(read_from_child_);
if (err_read_ != -1) close(err_read_);
stream_.close_parent_fds();
//Close the read end of the error pipe
close(err_rd_pipe);
@ -488,10 +503,7 @@ void Popen::execute_process() throw (CalledProcessError, OSError)
int sys_ret = -1;
close (err_wr_pipe);// close child side of pipe, else get stuck in read below
// Close child pipes
if (write_to_parent_ != -1) close(write_to_parent_);
if (read_from_parent_ != -1) close(read_from_parent_);
if (err_write_ != -1) close(err_write_);
stream_.close_child_fds();
try {
char err_buf[SP_MAX_ERR_BUF_SIZ] = {0,};
@ -509,12 +521,12 @@ void Popen::execute_process() throw (CalledProcessError, OSError)
throw CalledProcessError(err_buf);
}
} catch (std::exception& exp) {
cleanup_fds();
stream_.cleanup_fds();
throw exp;
}
// Setup the communication channels of the Popen class
setup_comm_channels();
stream_.setup_comm_channels();
}
}
@ -529,7 +541,7 @@ namespace detail {
}
void ArgumentDeducer::set_option(bufsize&& bsiz) {
popen_->bufsiz_ = bsiz.bufsiz;
popen_->stream_.bufsiz_ = bsiz.bufsiz;
}
void ArgumentDeducer::set_option(environment&& env) {
@ -541,18 +553,18 @@ namespace detail {
}
void ArgumentDeducer::set_option(input&& inp) {
popen_->read_from_parent_ = inp.rd_ch_;
if (inp.wr_ch_ != -1) popen_->write_to_child_ = inp.wr_ch_;
popen_->stream_.read_from_parent_ = inp.rd_ch_;
if (inp.wr_ch_ != -1) popen_->stream_.write_to_child_ = inp.wr_ch_;
}
void ArgumentDeducer::set_option(output&& out) {
popen_->write_to_parent_ = out.wr_ch_;
if (out.rd_ch_ != -1) popen_->read_from_child_ = out.rd_ch_;
popen_->stream_.write_to_parent_ = out.wr_ch_;
if (out.rd_ch_ != -1) popen_->stream_.read_from_child_ = out.rd_ch_;
}
void ArgumentDeducer::set_option(error&& err) {
popen_->err_write_ = err.wr_ch_;
if (err.rd_ch_ != -1) popen_->err_read_ = err.rd_ch_;
popen_->stream_.err_write_ = err.wr_ch_;
if (err.rd_ch_ != -1) popen_->stream_.err_read_ = err.rd_ch_;
}
void ArgumentDeducer::set_option(close_fds&& cfds) {
@ -562,11 +574,14 @@ namespace detail {
void Child::execute_child() {
int sys_ret = -1;
auto& stream = parent_->stream_;
try {
if (parent_->write_to_parent_ == 0)
parent_->write_to_parent_ = dup(parent_->write_to_parent_);
if (parent_->err_write_ == 0 || parent_->err_write_ == 1)
parent_->err_write_ = dup(parent_->err_write_);
if (stream.write_to_parent_ == 0)
stream.write_to_parent_ = dup(stream.write_to_parent_);
if (stream.err_write_ == 0 || stream.err_write_ == 1)
stream.err_write_ = dup(stream.err_write_);
// Make the child owned descriptors as the
// stdin, stdout and stderr for the child process
@ -587,19 +602,19 @@ namespace detail {
};
// Create the standard streams
_dup2_(parent_->read_from_parent_, 0); // Input stream
_dup2_(parent_->write_to_parent_, 1); // Output stream
_dup2_(parent_->err_write_, 2); // Error stream
_dup2_(stream.read_from_parent_, 0); // Input stream
_dup2_(stream.write_to_parent_, 1); // Output stream
_dup2_(stream.err_write_, 2); // Error stream
// Close the duped descriptors
if (parent_->read_from_parent_ != -1 && parent_->read_from_parent_ > 2)
close(parent_->read_from_parent_);
if (stream.read_from_parent_ != -1 && stream.read_from_parent_ > 2)
close(stream.read_from_parent_);
if (parent_->write_to_parent_ != -1 && parent_->write_to_parent_ > 2)
close(parent_->write_to_parent_);
if (stream.write_to_parent_ != -1 && stream.write_to_parent_ > 2)
close(stream.write_to_parent_);
if (parent_->err_write_ != -1 && parent_->err_write_ > 2)
close(parent_->err_write_);
if (stream.err_write_ != -1 && stream.err_write_ > 2)
close(stream.err_write_);
// Close all the inherited fd's except the error write pipe
if (parent_->close_fds_) {
@ -641,7 +656,30 @@ namespace detail {
// Calling application would not get this
// exit failure
exit (EXIT_FAILURE);
}
void Streams::setup_comm_channels()
{
if (write_to_child_ != -1) input_.reset(fdopen(write_to_child_, "wb"), fclose);
if (read_from_child_ != -1) output_.reset(fdopen(read_from_child_, "rb"), fclose);
if (err_read_ != -1) error_.reset(fdopen(err_read_, "rb"), fclose);
auto handles = {input_.get(), output_.get(), error_.get()};
for (auto& h : handles) {
if (h == nullptr) continue;
switch (bufsiz_) {
case 0:
setvbuf(h, nullptr, _IONBF, BUFSIZ);
break;
case 1:
setvbuf(h, nullptr, _IONBF, BUFSIZ);
break;
default:
setvbuf(h, nullptr, _IOFBF, bufsiz_);
};
}
}