Add load adaptive worker group and use for scanning

This commit is contained in:
Marcus Holland-Moritz 2020-11-25 01:38:44 +01:00
parent c9b7acf97f
commit 7810758fed
5 changed files with 409 additions and 165 deletions

View File

@ -107,7 +107,8 @@ list(
src/dwarfs/progress.cpp
src/dwarfs/scanner.cpp
src/dwarfs/similarity.cpp
src/dwarfs/util.cpp)
src/dwarfs/util.cpp
src/dwarfs/worker_group.cpp)
if(WITH_LUA)
list(APPEND LIBDWARFS_SRC src/dwarfs/lua_script.cpp)

View File

@ -0,0 +1,54 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <condition_variable>
#include <mutex>
namespace dwarfs {
class semaphore {
public:
semaphore(int count = 0)
: count_{count} {}
void release() {
std::unique_lock<std::mutex> lock(mx_);
++count_;
cv_.notify_one();
}
void acquire() {
std::unique_lock<std::mutex> lock(mx_);
while (count_ == 0) {
cv_.wait(lock);
}
--count_;
}
private:
std::mutex mx_;
std::condition_variable cv_;
int count_;
};
} // namespace dwarfs

View File

@ -21,16 +21,10 @@
#pragma once
#include <atomic>
#include <condition_variable>
#include <limits>
#include <mutex>
#include <queue>
#include <thread>
#include <memory>
#include <folly/Conv.h>
#include <folly/Function.h>
#include <folly/system/ThreadName.h>
namespace dwarfs {
@ -45,173 +39,47 @@ class worker_group {
public:
using job_t = folly::Function<void()>;
static struct load_adaptive_tag {
} load_adaptive;
/**
* Create a worker group
*
* \param num_workers Number of worker threads.
*/
explicit worker_group(
const char* group_name = nullptr, size_t num_workers = 1,
size_t max_queue_len = std::numeric_limits<size_t>::max())
: running_(true)
, pending_(0)
, max_queue_len_(max_queue_len) {
if (num_workers < 1) {
throw std::runtime_error("invalid number of worker threads");
}
if (!group_name) {
group_name = "worker";
}
for (size_t i = 0; i < num_workers; ++i) {
workers_.emplace_back([=, this] {
folly::setThreadName(folly::to<std::string>(group_name, i + 1));
do_work();
});
}
}
worker_group(const worker_group&) = delete;
worker_group& operator=(const worker_group&) = delete;
worker_group(const char* group_name = nullptr, size_t num_workers = 1,
size_t max_queue_len = std::numeric_limits<size_t>::max());
/**
* Stop and destroy a worker group
*/
~worker_group() noexcept {
try {
stop();
} catch (...) {
}
}
/**
* Stop a worker group
*/
void stop() {
if (running_) {
{
std::lock_guard<std::mutex> lock(mx_);
running_ = false;
}
cond_.notify_all();
for (auto& w : workers_) {
w.join();
}
}
}
/**
* Wait until all work has been done
*/
void wait() {
if (running_) {
std::unique_lock<std::mutex> lock(mx_);
wait_.wait(lock, [&] { return pending_ == 0; });
}
}
/**
* Check whether the worker group is still running
*/
bool running() const { return running_; }
/**
* Add a new job to the worker group
* Create a load adaptive worker group
*
* The new job will be dispatched to the first available worker thread.
*
* \param job The job to add to the dispatcher.
* \param num_workers Number of worker threads.
*/
bool add_job(job_t&& job) {
if (running_) {
{
std::unique_lock<std::mutex> lock(mx_);
queue_.wait(lock, [this] { return jobs_.size() < max_queue_len_; });
jobs_.emplace(std::move(job));
++pending_;
}
worker_group(load_adaptive_tag, const char* group_name = nullptr,
size_t max_num_workers = 1,
size_t max_queue_len = std::numeric_limits<size_t>::max());
cond_.notify_one();
}
~worker_group();
return false;
}
void stop() { impl_->stop(); }
void wait() { impl_->wait(); }
bool running() const { return impl_->running(); }
bool add_job(job_t&& job) { return impl_->add_job(std::move(job)); }
size_t queue_size() const { return impl_->queue_size(); }
/**
* Return the number of worker threads
*
* \returns The number of worker threads.
*/
size_t size() const { return workers_.size(); }
class impl {
public:
virtual ~impl() = default;
/**
* Return the number of worker threads
*
* \returns The number of worker threads.
*/
size_t queue_size() const {
std::lock_guard<std::mutex> lock(mx_);
return jobs_.size();
}
/**
* Return the number of queued jobs
*
* \returns The number of queued jobs.
*/
size_t queued_jobs() const {
std::lock_guard<std::mutex> lock(mx_);
return jobs_.size();
}
virtual void stop() = 0;
virtual void wait() = 0;
virtual bool running() const = 0;
virtual bool add_job(job_t&& job) = 0;
virtual size_t queue_size() const = 0;
};
private:
using jobs_t = std::queue<job_t>;
void do_work() {
for (;;) {
job_t job;
{
std::unique_lock<std::mutex> lock(mx_);
while (jobs_.empty() && running_) {
cond_.wait(lock);
}
if (jobs_.empty()) {
if (running_)
continue;
else
break;
}
job = std::move(jobs_.front());
jobs_.pop();
}
job();
{
std::lock_guard<std::mutex> lock(mx_);
pending_--;
}
wait_.notify_one();
queue_.notify_one();
}
}
std::vector<std::thread> workers_;
jobs_t jobs_;
std::condition_variable cond_;
std::condition_variable queue_;
std::condition_variable wait_;
mutable std::mutex mx_;
std::atomic<bool> running_;
std::atomic<size_t> pending_;
const size_t max_queue_len_;
std::unique_ptr<impl> impl_;
};
} // namespace dwarfs

320
src/dwarfs/worker_group.cpp Normal file
View File

@ -0,0 +1,320 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
*/
#include <iostream> // TODO: remove
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
#include <sys/resource.h>
#include <sys/time.h>
#include <time.h>
#include <folly/Conv.h>
#include <folly/system/ThreadName.h>
#include "dwarfs/semaphore.h"
#include "dwarfs/worker_group.h"
namespace dwarfs {
template <typename Policy>
class basic_worker_group : public worker_group::impl, private Policy {
public:
template <typename... Args>
basic_worker_group(const char* group_name, size_t num_workers,
size_t max_queue_len, Args&&... args)
: Policy(std::forward<Args>(args)...)
, running_(true)
, pending_(0)
, max_queue_len_(max_queue_len) {
if (num_workers < 1) {
throw std::runtime_error("invalid number of worker threads");
}
if (!group_name) {
group_name = "worker";
}
for (size_t i = 0; i < num_workers; ++i) {
workers_.emplace_back([=, this] {
folly::setThreadName(folly::to<std::string>(group_name, i + 1));
do_work();
});
}
}
basic_worker_group(const basic_worker_group&) = delete;
basic_worker_group& operator=(const basic_worker_group&) = delete;
/**
* Stop and destroy a worker group
*/
~basic_worker_group() noexcept {
try {
stop();
} catch (...) {
}
}
/**
* Stop a worker group
*/
void stop() {
if (running_) {
{
std::lock_guard<std::mutex> lock(mx_);
running_ = false;
}
cond_.notify_all();
for (auto& w : workers_) {
w.join();
}
}
}
/**
* Wait until all work has been done
*/
void wait() {
if (running_) {
std::unique_lock<std::mutex> lock(mx_);
wait_.wait(lock, [&] { return pending_ == 0; });
}
}
/**
* Check whether the worker group is still running
*/
bool running() const { return running_; }
/**
* Add a new job to the worker group
*
* The new job will be dispatched to the first available worker thread.
*
* \param job The job to add to the dispatcher.
*/
bool add_job(worker_group::job_t&& job) {
if (running_) {
{
std::unique_lock<std::mutex> lock(mx_);
queue_.wait(lock, [this] { return jobs_.size() < max_queue_len_; });
jobs_.emplace(std::move(job));
++pending_;
}
cond_.notify_one();
}
return false;
}
/**
* Return the number of worker threads
*
* \returns The number of worker threads.
*/
size_t size() const { return workers_.size(); }
/**
* Return the number of queued jobs
*
* \returns The number of queued jobs.
*/
size_t queue_size() const {
std::lock_guard<std::mutex> lock(mx_);
return jobs_.size();
}
private:
using jobs_t = std::queue<worker_group::job_t>;
void do_work() {
for (;;) {
worker_group::job_t job;
{
std::unique_lock<std::mutex> lock(mx_);
while (jobs_.empty() && running_) {
cond_.wait(lock);
}
if (jobs_.empty()) {
if (running_)
continue;
else
break;
}
job = std::move(jobs_.front());
jobs_.pop();
}
{
typename Policy::task task(this);
job();
}
{
std::lock_guard<std::mutex> lock(mx_);
pending_--;
}
wait_.notify_one();
queue_.notify_one();
}
}
std::vector<std::thread> workers_;
jobs_t jobs_;
std::condition_variable cond_;
std::condition_variable queue_;
std::condition_variable wait_;
mutable std::mutex mx_;
std::atomic<bool> running_;
std::atomic<size_t> pending_;
const size_t max_queue_len_;
};
class no_policy {
public:
class task {
public:
task(no_policy*) {}
};
};
class load_adaptive_policy {
public:
class task {
public:
task(load_adaptive_policy* policy)
: policy_(policy) {
policy_->start_task();
struct rusage usage;
getrusage(RUSAGE_THREAD, &usage);
utime_ = usage.ru_utime;
stime_ = usage.ru_stime;
clock_gettime(CLOCK_MONOTONIC, &wall_);
}
~task();
private:
load_adaptive_policy* policy_;
struct timespec wall_;
struct timeval utime_, stime_;
};
load_adaptive_policy(size_t workers)
: sem_(workers)
, max_throttled_(workers - 1) {}
void start_task() { sem_.acquire(); }
void stop_task(uint64_t wall_ns, uint64_t cpu_ns);
private:
semaphore sem_;
int max_throttled_;
std::mutex mx_;
uint64_t wall_ns_, cpu_ns_;
int throttled_;
};
load_adaptive_policy::task::~task() {
struct rusage usage;
getrusage(RUSAGE_THREAD, &usage);
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
uint64_t wall_ns = UINT64_C(1000000000) * (now.tv_sec - wall_.tv_sec);
wall_ns += now.tv_nsec;
wall_ns -= wall_.tv_nsec;
uint64_t cpu_ns =
UINT64_C(1000000000) * (usage.ru_utime.tv_sec + usage.ru_stime.tv_sec -
(utime_.tv_sec + stime_.tv_sec));
cpu_ns += UINT64_C(1000) * (usage.ru_utime.tv_usec + usage.ru_stime.tv_usec);
cpu_ns -= UINT64_C(1000) * (utime_.tv_usec + stime_.tv_usec);
policy_->stop_task(wall_ns, cpu_ns);
}
void load_adaptive_policy::stop_task(uint64_t wall_ns, uint64_t cpu_ns) {
int adjust = 0;
{
std::unique_lock<std::mutex> lock(mx_);
wall_ns_ += wall_ns;
cpu_ns_ += cpu_ns;
if (wall_ns_ >= 1000000000) {
auto load = float(cpu_ns_) / float(wall_ns_);
if (load > 0.75f) {
if (throttled_ > 0) {
--throttled_;
adjust = 1;
}
} else if (load < 0.25f) {
if (throttled_ < max_throttled_) {
++throttled_;
adjust = -1;
}
}
wall_ns_ = 0;
cpu_ns_ = 0;
}
}
if (adjust < 0) {
return;
}
if (adjust > 0) {
sem_.release();
}
sem_.release();
}
worker_group::worker_group(const char* group_name, size_t num_workers,
size_t max_queue_len)
: impl_{std::make_unique<basic_worker_group<no_policy>>(
group_name, num_workers, max_queue_len)} {}
worker_group::worker_group(load_adaptive_tag, const char* group_name,
size_t max_num_workers, size_t max_queue_len)
: impl_{std::make_unique<basic_worker_group<load_adaptive_policy>>(
group_name, max_num_workers, max_queue_len, max_num_workers)} {}
worker_group::~worker_group() = default;
} // namespace dwarfs

View File

@ -218,7 +218,7 @@ int mkdwarfs(int argc, char** argv) {
block_manager::config cfg;
std::string path, output, window_sizes, memory_limit, script_path,
compression, log_level;
size_t num_workers, num_scanner_workers;
size_t num_workers, max_scanner_workers;
bool no_time = false, no_owner = false, recompress = false,
no_progress = false;
unsigned level;
@ -246,8 +246,8 @@ int mkdwarfs(int argc, char** argv) {
("num-workers,N",
po::value<size_t>(&num_workers)->default_value(num_cpu),
"number of writer worker threads")
("num-scanner-workers,M",
po::value<size_t>(&num_scanner_workers)->default_value(num_cpu),
("max-scanner-workers,M",
po::value<size_t>(&max_scanner_workers)->default_value(num_cpu),
"number of scanner worker threads")
("memory-limit,L",
po::value<std::string>(&memory_limit)->default_value("1g"),
@ -382,7 +382,8 @@ int mkdwarfs(int argc, char** argv) {
}
worker_group wg_writer("writer", num_workers);
worker_group wg_scanner("scanner", num_scanner_workers);
worker_group wg_scanner(worker_group::load_adaptive, "scanner",
max_scanner_workers);
console_writer lgr(std::cerr, !no_progress && ::isatty(::fileno(stderr)),
get_term_width(), logger::parse_level(log_level));