Worker threads must be created after fork to background (fix gh #44)

This commit is contained in:
Marcus Holland-Moritz 2021-04-11 22:31:53 +02:00
parent 6d8bc71348
commit 9cbe9fae58
9 changed files with 67 additions and 13 deletions

View File

@ -49,6 +49,8 @@ class block_cache {
void set_block_size(size_t size) { impl_->set_block_size(size); }
void set_num_workers(size_t num) { impl_->set_num_workers(num); }
std::future<block_range>
get(size_t block_no, size_t offset, size_t size) const {
return impl_->get(block_no, offset, size);
@ -61,6 +63,7 @@ class block_cache {
virtual size_t block_count() const = 0;
virtual void insert(fs_section const& section) = 0;
virtual void set_block_size(size_t size) = 0;
virtual void set_num_workers(size_t num) = 0;
virtual std::future<block_range>
get(size_t block_no, size_t offset, size_t length) const = 0;
};

View File

@ -151,6 +151,8 @@ class filesystem_v2 {
std::optional<folly::ByteRange> header() const { return impl_->header(); }
void set_num_workers(size_t num) { return impl_->set_num_workers(num); }
class impl {
public:
virtual ~impl() = default;
@ -185,6 +187,7 @@ class filesystem_v2 {
virtual folly::Expected<std::vector<std::future<block_range>>, int>
readv(uint32_t inode, size_t size, off_t offset) const = 0;
virtual std::optional<folly::ByteRange> header() const = 0;
virtual void set_num_workers(size_t num) = 0;
};
private:

View File

@ -65,6 +65,8 @@ class inode_reader_v2 {
impl_->dump(os, indent, chunks);
}
void set_num_workers(size_t num) { impl_->set_num_workers(num); }
class impl {
public:
virtual ~impl() = default;
@ -77,6 +79,7 @@ class inode_reader_v2 {
readv(size_t size, off_t offset, chunk_range chunks) const = 0;
virtual void dump(std::ostream& os, const std::string& indent,
chunk_range chunks) const = 0;
virtual void set_num_workers(size_t num) = 0;
};
private:

View File

@ -36,6 +36,7 @@ struct block_cache_options {
size_t num_workers{0};
double decompress_ratio{1.0};
bool mm_release{true};
bool init_workers{true};
};
struct metadata_options {

View File

@ -50,7 +50,8 @@ class worker_group {
*
* \param num_workers Number of worker threads.
*/
worker_group(const char* group_name = nullptr, size_t num_workers = 1,
explicit worker_group(
const char* group_name, size_t num_workers = 1,
size_t max_queue_len = std::numeric_limits<size_t>::max(),
int niceness = 0);
@ -59,13 +60,20 @@ class worker_group {
*
* \param num_workers Number of worker threads.
*/
worker_group(load_adaptive_tag, const char* group_name = nullptr,
explicit worker_group(
load_adaptive_tag, const char* group_name = nullptr,
size_t max_num_workers = 1,
size_t max_queue_len = std::numeric_limits<size_t>::max(),
int niceness = 0);
worker_group() = default;
~worker_group() = default;
worker_group(worker_group&&) = default;
worker_group& operator=(worker_group&&) = default;
explicit operator bool() const { return static_cast<bool>(impl_); }
void stop() { impl_->stop(); }
void wait() { impl_->wait(); }
bool running() const { return impl_->running(); }

View File

@ -106,6 +106,17 @@ constexpr struct ::fuse_opt dwarfs_opts[] = {
#define dUSERDATA \
auto userdata = reinterpret_cast<dwarfs_userdata*>(fuse_req_userdata(req))
template <typename LoggerPolicy>
void op_init(void* data, struct fuse_conn_info* /*conn*/) {
auto userdata = reinterpret_cast<dwarfs_userdata*>(data);
LOG_PROXY(LoggerPolicy, userdata->lgr);
LOG_DEBUG << __func__;
// we must do this *after* the fuse driver has forked into background
userdata->fs.set_num_workers(userdata->opts.workers);
}
template <typename LoggerPolicy>
void op_lookup(fuse_req_t req, fuse_ino_t parent, const char* name) {
dUSERDATA;
@ -475,6 +486,7 @@ int option_hdl(void* data, const char* arg, int key,
template <typename LoggerPolicy>
void init_lowlevel_ops(struct fuse_lowlevel_ops& ops) {
ops.init = &op_init<LoggerPolicy>;
ops.lookup = &op_lookup<LoggerPolicy>;
ops.getattr = &op_getattr<LoggerPolicy>;
ops.access = &op_access<LoggerPolicy>;
@ -581,6 +593,7 @@ void load_filesystem(dwarfs_userdata& userdata) {
fsopts.block_cache.num_workers = opts.workers;
fsopts.block_cache.decompress_ratio = opts.decompress_ratio;
fsopts.block_cache.mm_release = !opts.cache_image;
fsopts.block_cache.init_workers = false;
fsopts.metadata.enable_nlink = bool(opts.enable_nlink);
fsopts.metadata.readonly = bool(opts.readonly);

View File

@ -28,6 +28,7 @@
#include <iterator>
#include <mutex>
#include <new>
#include <shared_mutex>
#include <thread>
#include <utility>
#include <vector>
@ -204,17 +205,24 @@ class block_cache_ final : public block_cache::impl {
block_cache_(logger& lgr, std::shared_ptr<mmif> mm,
block_cache_options const& options)
: cache_(0)
, wg_("blkcache", std::max(options.num_workers > 0
? options.num_workers
: std::thread::hardware_concurrency(),
static_cast<size_t>(1)))
, mm_(std::move(mm))
, LOG_PROXY_INIT(lgr)
, options_(options) {}
, options_(options) {
if (options.init_workers) {
wg_ = worker_group("blkcache",
std::max(options.num_workers > 0
? options.num_workers
: std::thread::hardware_concurrency(),
static_cast<size_t>(1)));
}
}
~block_cache_() noexcept override {
LOG_DEBUG << "stopping cache workers";
if (wg_) {
wg_.stop();
}
if (!blocks_created_.load()) {
return;
@ -288,6 +296,16 @@ class block_cache_ final : public block_cache::impl {
});
}
void set_num_workers(size_t num) override {
std::unique_lock lock(mx_wg_);
if (wg_) {
wg_.stop();
}
wg_ = worker_group("blkcache", num);
}
std::future<block_range>
get(size_t block_no, size_t offset, size_t size) const override {
++range_requests_;
@ -442,6 +460,8 @@ class block_cache_ final : public block_cache::impl {
}
void enqueue_job(std::shared_ptr<block_request_set> brs) const {
std::shared_lock lock(mx_wg_);
// Lambda needs to be mutable so we can actually move out of it
wg_.add_job([this, brs = std::move(brs)]() mutable {
process_job(std::move(brs));
@ -555,6 +575,7 @@ class block_cache_ final : public block_cache::impl {
mutable std::atomic<size_t> total_block_bytes_{0};
mutable std::atomic<size_t> total_decompressed_bytes_{0};
mutable std::shared_mutex mx_wg_;
mutable worker_group wg_;
std::vector<fs_section> block_;
std::shared_ptr<mmif> mm_;

View File

@ -288,6 +288,7 @@ class filesystem_ final : public filesystem_v2::impl {
folly::Expected<std::vector<std::future<block_range>>, int>
readv(uint32_t inode, size_t size, off_t offset) const override;
std::optional<folly::ByteRange> header() const override;
void set_num_workers(size_t num) override { ir_.set_num_workers(num); }
private:
LOG_PROXY_DECL(LoggerPolicy);

View File

@ -68,6 +68,7 @@ class inode_reader_ final : public inode_reader_v2::impl {
readv(size_t size, off_t offset, chunk_range chunks) const override;
void dump(std::ostream& os, const std::string& indent,
chunk_range chunks) const override;
void set_num_workers(size_t num) override { cache_.set_num_workers(num); }
private:
folly::Expected<std::vector<std::future<block_range>>, int>