From 7810758fed0f788e9eb1898af30e57b909e0c54d Mon Sep 17 00:00:00 2001 From: Marcus Holland-Moritz Date: Wed, 25 Nov 2020 01:38:44 +0100 Subject: [PATCH] Add load adaptive worker group and use for scanning --- CMakeLists.txt | 3 +- include/dwarfs/semaphore.h | 54 ++++++ include/dwarfs/worker_group.h | 188 +++----------------- src/dwarfs/worker_group.cpp | 320 ++++++++++++++++++++++++++++++++++ src/mkdwarfs.cpp | 9 +- 5 files changed, 409 insertions(+), 165 deletions(-) create mode 100644 include/dwarfs/semaphore.h create mode 100644 src/dwarfs/worker_group.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index d74a484a..cf853d05 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -107,7 +107,8 @@ list( src/dwarfs/progress.cpp src/dwarfs/scanner.cpp src/dwarfs/similarity.cpp - src/dwarfs/util.cpp) + src/dwarfs/util.cpp + src/dwarfs/worker_group.cpp) if(WITH_LUA) list(APPEND LIBDWARFS_SRC src/dwarfs/lua_script.cpp) diff --git a/include/dwarfs/semaphore.h b/include/dwarfs/semaphore.h new file mode 100644 index 00000000..4313f7f0 --- /dev/null +++ b/include/dwarfs/semaphore.h @@ -0,0 +1,54 @@ +/* vim:set ts=2 sw=2 sts=2 et: */ +/** + * \author Marcus Holland-Moritz (github@mhxnet.de) + * \copyright Copyright (c) Marcus Holland-Moritz + * + * This file is part of dwarfs. + * + * dwarfs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dwarfs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dwarfs. If not, see . + */ + +#pragma once + +#include +#include + +namespace dwarfs { + +class semaphore { + public: + semaphore(int count = 0) + : count_{count} {} + + void release() { + std::unique_lock lock(mx_); + ++count_; + cv_.notify_one(); + } + + void acquire() { + std::unique_lock lock(mx_); + while (count_ == 0) { + cv_.wait(lock); + } + --count_; + } + + private: + std::mutex mx_; + std::condition_variable cv_; + int count_; +}; + +} // namespace dwarfs diff --git a/include/dwarfs/worker_group.h b/include/dwarfs/worker_group.h index e6ab23a1..3ff84d07 100644 --- a/include/dwarfs/worker_group.h +++ b/include/dwarfs/worker_group.h @@ -21,16 +21,10 @@ #pragma once -#include -#include #include -#include -#include -#include +#include -#include #include -#include namespace dwarfs { @@ -45,173 +39,47 @@ class worker_group { public: using job_t = folly::Function; + static struct load_adaptive_tag { + } load_adaptive; + /** * Create a worker group * * \param num_workers Number of worker threads. */ - explicit worker_group( - const char* group_name = nullptr, size_t num_workers = 1, - size_t max_queue_len = std::numeric_limits::max()) - : running_(true) - , pending_(0) - , max_queue_len_(max_queue_len) { - if (num_workers < 1) { - throw std::runtime_error("invalid number of worker threads"); - } - if (!group_name) { - group_name = "worker"; - } - - for (size_t i = 0; i < num_workers; ++i) { - workers_.emplace_back([=, this] { - folly::setThreadName(folly::to(group_name, i + 1)); - do_work(); - }); - } - } - - worker_group(const worker_group&) = delete; - worker_group& operator=(const worker_group&) = delete; + worker_group(const char* group_name = nullptr, size_t num_workers = 1, + size_t max_queue_len = std::numeric_limits::max()); /** - * Stop and destroy a worker group - */ - ~worker_group() noexcept { - try { - stop(); - } catch (...) { - } - } - - /** - * Stop a worker group - */ - void stop() { - if (running_) { - { - std::lock_guard lock(mx_); - running_ = false; - } - - cond_.notify_all(); - - for (auto& w : workers_) { - w.join(); - } - } - } - - /** - * Wait until all work has been done - */ - void wait() { - if (running_) { - std::unique_lock lock(mx_); - wait_.wait(lock, [&] { return pending_ == 0; }); - } - } - - /** - * Check whether the worker group is still running - */ - bool running() const { return running_; } - - /** - * Add a new job to the worker group + * Create a load adaptive worker group * - * The new job will be dispatched to the first available worker thread. - * - * \param job The job to add to the dispatcher. + * \param num_workers Number of worker threads. */ - bool add_job(job_t&& job) { - if (running_) { - { - std::unique_lock lock(mx_); - queue_.wait(lock, [this] { return jobs_.size() < max_queue_len_; }); - jobs_.emplace(std::move(job)); - ++pending_; - } + worker_group(load_adaptive_tag, const char* group_name = nullptr, + size_t max_num_workers = 1, + size_t max_queue_len = std::numeric_limits::max()); - cond_.notify_one(); - } + ~worker_group(); - return false; - } + void stop() { impl_->stop(); } + void wait() { impl_->wait(); } + bool running() const { return impl_->running(); } + bool add_job(job_t&& job) { return impl_->add_job(std::move(job)); } + size_t queue_size() const { return impl_->queue_size(); } - /** - * Return the number of worker threads - * - * \returns The number of worker threads. - */ - size_t size() const { return workers_.size(); } + class impl { + public: + virtual ~impl() = default; - /** - * Return the number of worker threads - * - * \returns The number of worker threads. - */ - size_t queue_size() const { - std::lock_guard lock(mx_); - return jobs_.size(); - } - - /** - * Return the number of queued jobs - * - * \returns The number of queued jobs. - */ - size_t queued_jobs() const { - std::lock_guard lock(mx_); - return jobs_.size(); - } + virtual void stop() = 0; + virtual void wait() = 0; + virtual bool running() const = 0; + virtual bool add_job(job_t&& job) = 0; + virtual size_t queue_size() const = 0; + }; private: - using jobs_t = std::queue; - - void do_work() { - for (;;) { - job_t job; - - { - std::unique_lock lock(mx_); - - while (jobs_.empty() && running_) { - cond_.wait(lock); - } - - if (jobs_.empty()) { - if (running_) - continue; - else - break; - } - - job = std::move(jobs_.front()); - - jobs_.pop(); - } - - job(); - - { - std::lock_guard lock(mx_); - pending_--; - } - - wait_.notify_one(); - queue_.notify_one(); - } - } - - std::vector workers_; - jobs_t jobs_; - std::condition_variable cond_; - std::condition_variable queue_; - std::condition_variable wait_; - mutable std::mutex mx_; - std::atomic running_; - std::atomic pending_; - const size_t max_queue_len_; + std::unique_ptr impl_; }; + } // namespace dwarfs diff --git a/src/dwarfs/worker_group.cpp b/src/dwarfs/worker_group.cpp new file mode 100644 index 00000000..e3c1fbd2 --- /dev/null +++ b/src/dwarfs/worker_group.cpp @@ -0,0 +1,320 @@ +/* vim:set ts=2 sw=2 sts=2 et: */ +/** + * \author Marcus Holland-Moritz (github@mhxnet.de) + * \copyright Copyright (c) Marcus Holland-Moritz + * + * This file is part of dwarfs. + * + * dwarfs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dwarfs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dwarfs. If not, see . + */ + +#include // TODO: remove + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include "dwarfs/semaphore.h" +#include "dwarfs/worker_group.h" + +namespace dwarfs { + +template +class basic_worker_group : public worker_group::impl, private Policy { + public: + template + basic_worker_group(const char* group_name, size_t num_workers, + size_t max_queue_len, Args&&... args) + : Policy(std::forward(args)...) + , running_(true) + , pending_(0) + , max_queue_len_(max_queue_len) { + if (num_workers < 1) { + throw std::runtime_error("invalid number of worker threads"); + } + if (!group_name) { + group_name = "worker"; + } + + for (size_t i = 0; i < num_workers; ++i) { + workers_.emplace_back([=, this] { + folly::setThreadName(folly::to(group_name, i + 1)); + do_work(); + }); + } + } + + basic_worker_group(const basic_worker_group&) = delete; + basic_worker_group& operator=(const basic_worker_group&) = delete; + + /** + * Stop and destroy a worker group + */ + ~basic_worker_group() noexcept { + try { + stop(); + } catch (...) { + } + } + + /** + * Stop a worker group + */ + void stop() { + if (running_) { + { + std::lock_guard lock(mx_); + running_ = false; + } + + cond_.notify_all(); + + for (auto& w : workers_) { + w.join(); + } + } + } + + /** + * Wait until all work has been done + */ + void wait() { + if (running_) { + std::unique_lock lock(mx_); + wait_.wait(lock, [&] { return pending_ == 0; }); + } + } + + /** + * Check whether the worker group is still running + */ + bool running() const { return running_; } + + /** + * Add a new job to the worker group + * + * The new job will be dispatched to the first available worker thread. + * + * \param job The job to add to the dispatcher. + */ + bool add_job(worker_group::job_t&& job) { + if (running_) { + { + std::unique_lock lock(mx_); + queue_.wait(lock, [this] { return jobs_.size() < max_queue_len_; }); + jobs_.emplace(std::move(job)); + ++pending_; + } + + cond_.notify_one(); + } + + return false; + } + + /** + * Return the number of worker threads + * + * \returns The number of worker threads. + */ + size_t size() const { return workers_.size(); } + + /** + * Return the number of queued jobs + * + * \returns The number of queued jobs. + */ + size_t queue_size() const { + std::lock_guard lock(mx_); + return jobs_.size(); + } + + private: + using jobs_t = std::queue; + + void do_work() { + for (;;) { + worker_group::job_t job; + + { + std::unique_lock lock(mx_); + + while (jobs_.empty() && running_) { + cond_.wait(lock); + } + + if (jobs_.empty()) { + if (running_) + continue; + else + break; + } + + job = std::move(jobs_.front()); + + jobs_.pop(); + } + + { + typename Policy::task task(this); + job(); + } + + { + std::lock_guard lock(mx_); + pending_--; + } + + wait_.notify_one(); + queue_.notify_one(); + } + } + + std::vector workers_; + jobs_t jobs_; + std::condition_variable cond_; + std::condition_variable queue_; + std::condition_variable wait_; + mutable std::mutex mx_; + std::atomic running_; + std::atomic pending_; + const size_t max_queue_len_; +}; + +class no_policy { + public: + class task { + public: + task(no_policy*) {} + }; +}; + +class load_adaptive_policy { + public: + class task { + public: + task(load_adaptive_policy* policy) + : policy_(policy) { + policy_->start_task(); + + struct rusage usage; + getrusage(RUSAGE_THREAD, &usage); + utime_ = usage.ru_utime; + stime_ = usage.ru_stime; + clock_gettime(CLOCK_MONOTONIC, &wall_); + } + + ~task(); + + private: + load_adaptive_policy* policy_; + struct timespec wall_; + struct timeval utime_, stime_; + }; + + load_adaptive_policy(size_t workers) + : sem_(workers) + , max_throttled_(workers - 1) {} + + void start_task() { sem_.acquire(); } + + void stop_task(uint64_t wall_ns, uint64_t cpu_ns); + + private: + semaphore sem_; + int max_throttled_; + std::mutex mx_; + uint64_t wall_ns_, cpu_ns_; + int throttled_; +}; + +load_adaptive_policy::task::~task() { + struct rusage usage; + getrusage(RUSAGE_THREAD, &usage); + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + + uint64_t wall_ns = UINT64_C(1000000000) * (now.tv_sec - wall_.tv_sec); + wall_ns += now.tv_nsec; + wall_ns -= wall_.tv_nsec; + + uint64_t cpu_ns = + UINT64_C(1000000000) * (usage.ru_utime.tv_sec + usage.ru_stime.tv_sec - + (utime_.tv_sec + stime_.tv_sec)); + cpu_ns += UINT64_C(1000) * (usage.ru_utime.tv_usec + usage.ru_stime.tv_usec); + cpu_ns -= UINT64_C(1000) * (utime_.tv_usec + stime_.tv_usec); + + policy_->stop_task(wall_ns, cpu_ns); +} + +void load_adaptive_policy::stop_task(uint64_t wall_ns, uint64_t cpu_ns) { + int adjust = 0; + + { + std::unique_lock lock(mx_); + + wall_ns_ += wall_ns; + cpu_ns_ += cpu_ns; + + if (wall_ns_ >= 1000000000) { + auto load = float(cpu_ns_) / float(wall_ns_); + if (load > 0.75f) { + if (throttled_ > 0) { + --throttled_; + adjust = 1; + } + } else if (load < 0.25f) { + if (throttled_ < max_throttled_) { + ++throttled_; + adjust = -1; + } + } + wall_ns_ = 0; + cpu_ns_ = 0; + } + } + + if (adjust < 0) { + return; + } + + if (adjust > 0) { + sem_.release(); + } + + sem_.release(); +} + +worker_group::worker_group(const char* group_name, size_t num_workers, + size_t max_queue_len) + : impl_{std::make_unique>( + group_name, num_workers, max_queue_len)} {} + +worker_group::worker_group(load_adaptive_tag, const char* group_name, + size_t max_num_workers, size_t max_queue_len) + : impl_{std::make_unique>( + group_name, max_num_workers, max_queue_len, max_num_workers)} {} + +worker_group::~worker_group() = default; + +} // namespace dwarfs diff --git a/src/mkdwarfs.cpp b/src/mkdwarfs.cpp index db549638..aadb5bb8 100644 --- a/src/mkdwarfs.cpp +++ b/src/mkdwarfs.cpp @@ -218,7 +218,7 @@ int mkdwarfs(int argc, char** argv) { block_manager::config cfg; std::string path, output, window_sizes, memory_limit, script_path, compression, log_level; - size_t num_workers, num_scanner_workers; + size_t num_workers, max_scanner_workers; bool no_time = false, no_owner = false, recompress = false, no_progress = false; unsigned level; @@ -246,8 +246,8 @@ int mkdwarfs(int argc, char** argv) { ("num-workers,N", po::value(&num_workers)->default_value(num_cpu), "number of writer worker threads") - ("num-scanner-workers,M", - po::value(&num_scanner_workers)->default_value(num_cpu), + ("max-scanner-workers,M", + po::value(&max_scanner_workers)->default_value(num_cpu), "number of scanner worker threads") ("memory-limit,L", po::value(&memory_limit)->default_value("1g"), @@ -382,7 +382,8 @@ int mkdwarfs(int argc, char** argv) { } worker_group wg_writer("writer", num_workers); - worker_group wg_scanner("scanner", num_scanner_workers); + worker_group wg_scanner(worker_group::load_adaptive, "scanner", + max_scanner_workers); console_writer lgr(std::cerr, !no_progress && ::isatty(::fileno(stderr)), get_term_width(), logger::parse_level(log_level));