feat: allow setting worker group cpu affinity

This commit is contained in:
Marcus Holland-Moritz 2023-11-21 15:27:05 +01:00
parent 85bc819fa7
commit fbad13553d
2 changed files with 53 additions and 0 deletions

View File

@ -67,6 +67,9 @@ class worker_group {
size_t size() const { return impl_->size(); } size_t size() const { return impl_->size(); }
size_t queue_size() const { return impl_->queue_size(); } size_t queue_size() const { return impl_->queue_size(); }
double get_cpu_time() const { return impl_->get_cpu_time(); } double get_cpu_time() const { return impl_->get_cpu_time(); }
bool set_affinity(std::vector<int> const& cpus) {
return impl_->set_affinity(cpus);
}
template <typename T> template <typename T>
bool add_job(std::packaged_task<T()>&& task) { bool add_job(std::packaged_task<T()>&& task) {
@ -84,6 +87,7 @@ class worker_group {
virtual size_t size() const = 0; virtual size_t size() const = 0;
virtual size_t queue_size() const = 0; virtual size_t queue_size() const = 0;
virtual double get_cpu_time() const = 0; virtual double get_cpu_time() const = 0;
virtual bool set_affinity(std::vector<int> const& cpus) = 0;
}; };
private: private:

View File

@ -32,6 +32,7 @@
#include <vector> #include <vector>
#include <folly/Conv.h> #include <folly/Conv.h>
#include <folly/String.h>
#include <folly/portability/PThread.h> #include <folly/portability/PThread.h>
#include <folly/portability/Windows.h> #include <folly/portability/Windows.h>
#include <folly/system/ThreadName.h> #include <folly/system/ThreadName.h>
@ -116,6 +117,10 @@ class basic_worker_group final : public worker_group::impl, private Policy {
do_work(niceness > 10); do_work(niceness > 10);
}); });
} }
#ifndef _WIN32
check_set_affinity_from_enviroment(group_name);
#endif
} }
basic_worker_group(const basic_worker_group&) = delete; basic_worker_group(const basic_worker_group&) = delete;
@ -219,9 +224,53 @@ class basic_worker_group final : public worker_group::impl, private Policy {
return t; return t;
} }
bool set_affinity(std::vector<int> const& cpus) override {
if (cpus.empty()) {
return false;
}
#ifndef _WIN32
std::lock_guard lock(mx_);
cpu_set_t cpuset;
for (auto cpu : cpus) {
CPU_SET(cpu, &cpuset);
}
for (size_t i = 0; i < workers_.size(); ++i) {
if (auto error = pthread_setaffinity_np(workers_[i].native_handle(),
sizeof(cpu_set_t), &cpuset);
error != 0) {
return false;
}
}
#endif
return true;
}
private: private:
using jobs_t = std::queue<worker_group::job_t>; using jobs_t = std::queue<worker_group::job_t>;
void check_set_affinity_from_enviroment(const char* group_name) {
if (auto var = std::getenv("DWARFS_WORKER_GROUP_AFFINITY")) {
std::vector<std::string_view> groups;
folly::split(':', var, groups);
for (auto& group : groups) {
std::vector<std::string_view> parts;
folly::split('=', group, parts);
if (parts.size() == 2 && parts[0] == group_name) {
std::vector<int> cpus;
folly::split(',', parts[1], cpus);
set_affinity(cpus);
}
}
}
}
// TODO: move out of this class // TODO: move out of this class
static void set_thread_niceness(int niceness) { static void set_thread_niceness(int niceness) {
if (niceness > 0) { if (niceness > 0) {