Perf improvement

This commit is contained in:
arunmu 2016-03-16 22:01:36 +05:30
parent 6a81526254
commit 1128b46e5e

View File

@ -1,14 +1,15 @@
#include <map> #include <map>
#include <algorithm>
#include <iostream> #include <iostream>
#include <string> #include <string>
#include <cstdlib> #include <cstdlib>
#include <cassert> #include <cassert>
#include <cstring> #include <cstring>
#include <cstdio> #include <cstdio>
#include <future>
#include <vector> #include <vector>
#include <sstream> #include <sstream>
#include <memory> #include <memory>
#include <thread>
#include <initializer_list> #include <initializer_list>
#include <exception> #include <exception>
@ -141,18 +142,18 @@ namespace util
} }
static static
int wait_for_child_exit(int pid) std::pair<int, int> wait_for_child_exit(int pid)
{ {
int status; int status = 0;
int ret = -1; int ret = -1;
while (1) { while (1) {
ret = waitpid(pid, &status, WNOHANG); ret = waitpid(pid, &status, WNOHANG);
if (ret == -1) break; if (ret == -1) break;
if (ret == 0) continue; if (ret == 0) continue;
return pid; return std::make_pair(ret, status);
} }
return ret; return std::make_pair(ret, status);
} }
@ -276,6 +277,7 @@ public:
Buffer() {} Buffer() {}
Buffer(size_t cap) { buf.resize(cap); } Buffer(size_t cap) { buf.resize(cap); }
void add_cap(size_t cap) { buf.resize(cap); } void add_cap(size_t cap) { buf.resize(cap); }
public: public:
std::vector<char> buf; std::vector<char> buf;
size_t length = 0; size_t length = 0;
@ -474,6 +476,8 @@ public:
int pid() const noexcept { return child_pid_; } int pid() const noexcept { return child_pid_; }
bool wait() throw(OSError);
void set_out_buf_cap(size_t cap) { stream_.set_out_buf_cap(cap); } void set_out_buf_cap(size_t cap) { stream_.set_out_buf_cap(cap); }
void set_err_buf_cap(size_t cap) { stream_.set_err_buf_cap(cap); } void set_err_buf_cap(size_t cap) { stream_.set_err_buf_cap(cap); }
@ -485,10 +489,18 @@ public:
{ return stream_.send(msg); } { return stream_.send(msg); }
std::pair<OutBuffer, ErrBuffer> communicate(const char* msg, size_t length) std::pair<OutBuffer, ErrBuffer> communicate(const char* msg, size_t length)
{ return stream_.communicate(msg, length); } {
auto res = stream_.communicate(msg, length);
wait();
return res;
}
std::pair<OutBuffer, ErrBuffer> communicate(const std::vector<char>& msg) std::pair<OutBuffer, ErrBuffer> communicate(const std::vector<char>& msg)
{ return stream_.communicate(msg); } {
auto res = stream_.communicate(msg);
wait();
return res;
}
private: private:
template <typename F, typename... Args> template <typename F, typename... Args>
@ -548,6 +560,19 @@ void Popen::start_process() throw (CalledProcessError, OSError)
execute_process(); execute_process();
} }
bool Popen::wait() throw (OSError)
{
int ret, status;
std::tie(ret, status) = util::wait_for_child_exit(pid());
if (ret == -1) {
if (errno != ECHILD) throw OSError("waitpid failed", errno);
return true;
}
if (!WIFEXITED(status)) return false;
return true;
}
void Popen::execute_process() throw (CalledProcessError, OSError) void Popen::execute_process() throw (CalledProcessError, OSError)
{ {
@ -597,8 +622,7 @@ void Popen::execute_process() throw (CalledProcessError, OSError)
// Call waitpid to reap the child process // Call waitpid to reap the child process
// waitpid suspends the calling process until the // waitpid suspends the calling process until the
// child terminates. // child terminates.
sys_ret = util::wait_for_child_exit(child_pid_); wait();
if (sys_ret == -1) throw OSError("child exit", errno);
// Throw whatever information we have about child failure // Throw whatever information we have about child failure
throw CalledProcessError(err_buf); throw CalledProcessError(err_buf);
@ -830,7 +854,7 @@ namespace detail {
// Close the error stream // Close the error stream
stream_->error_.reset(); stream_->error_.reset();
} }
return std::make_pair(obuf, ebuf); return std::make_pair(std::move(obuf), std::move(ebuf));
} }
return communicate_threaded(msg, length); return communicate_threaded(msg, length);
@ -843,36 +867,31 @@ namespace detail {
OutBuffer obuf; OutBuffer obuf;
ErrBuffer ebuf; ErrBuffer ebuf;
std::vector<std::thread> bg_threads; std::future<int> out_fut, err_fut;
if (stream_->output()) { if (stream_->output()) {
obuf.add_cap(out_buf_cap_); obuf.add_cap(out_buf_cap_);
bg_threads.push_back( out_fut = std::async(std::launch::async,
std::thread( util::read_atmost_n,
util::read_atmost_n, fileno(stream_->output()),
fileno(stream_->output()), obuf.buf.data(),
obuf.buf.data(), obuf.buf.size());
obuf.buf.size())
);
} }
if (stream_->error()) { if (stream_->error()) {
ebuf.add_cap(err_buf_cap_); ebuf.add_cap(err_buf_cap_);
bg_threads.push_back( err_fut = std::async(std::launch::async,
std::thread( util::read_atmost_n,
util::read_atmost_n, fileno(stream_->error()),
fileno(stream_->error()), ebuf.buf.data(),
ebuf.buf.data(), ebuf.buf.size());
ebuf.buf.size())
);
} }
if (stream_->input()) { if (stream_->input()) {
if (msg) { if (msg) {
int wbytes = std::fwrite(msg, sizeof(char), length, stream_->input()); int wbytes = std::fwrite(msg, sizeof(char), length, stream_->input());
if (wbytes < length) { if (wbytes < length) {
if (errno != EPIPE && errno != EINVAL) { if (errno != EPIPE && errno != EINVAL) {
for (auto& t: bg_threads) t.join();
throw OSError("fwrite error", errno); throw OSError("fwrite error", errno);
} }
} }
@ -880,12 +899,26 @@ namespace detail {
stream_->input_.reset(); stream_->input_.reset();
} }
// Wait for the threads to finish if (out_fut.valid()) {
for(auto& thr : bg_threads) thr.join(); int res = out_fut.get();
if (res != -1) obuf.length = res;
else obuf.length = 0;
}
if (err_fut.valid()) {
int res = err_fut.get();
if (res != -1) ebuf.length = res;
else ebuf.length = 0;
}
return std::make_pair(obuf, ebuf); return std::make_pair(std::move(obuf), std::move(ebuf));
} }
}; // end namespace detail }; // end namespace detail
// Convenience Functions
//
}; };