diff --git a/include/dwarfs/block_cache.h b/include/dwarfs/block_cache.h index fd6b1905..7810067b 100644 --- a/include/dwarfs/block_cache.h +++ b/include/dwarfs/block_cache.h @@ -49,6 +49,8 @@ class block_cache { void set_block_size(size_t size) { impl_->set_block_size(size); } + void set_num_workers(size_t num) { impl_->set_num_workers(num); } + std::future get(size_t block_no, size_t offset, size_t size) const { return impl_->get(block_no, offset, size); @@ -61,6 +63,7 @@ class block_cache { virtual size_t block_count() const = 0; virtual void insert(fs_section const& section) = 0; virtual void set_block_size(size_t size) = 0; + virtual void set_num_workers(size_t num) = 0; virtual std::future get(size_t block_no, size_t offset, size_t length) const = 0; }; diff --git a/include/dwarfs/filesystem_v2.h b/include/dwarfs/filesystem_v2.h index 71f25697..c8a6832f 100644 --- a/include/dwarfs/filesystem_v2.h +++ b/include/dwarfs/filesystem_v2.h @@ -151,6 +151,8 @@ class filesystem_v2 { std::optional header() const { return impl_->header(); } + void set_num_workers(size_t num) { return impl_->set_num_workers(num); } + class impl { public: virtual ~impl() = default; @@ -185,6 +187,7 @@ class filesystem_v2 { virtual folly::Expected>, int> readv(uint32_t inode, size_t size, off_t offset) const = 0; virtual std::optional header() const = 0; + virtual void set_num_workers(size_t num) = 0; }; private: diff --git a/include/dwarfs/inode_reader_v2.h b/include/dwarfs/inode_reader_v2.h index 07578731..c2af2d2f 100644 --- a/include/dwarfs/inode_reader_v2.h +++ b/include/dwarfs/inode_reader_v2.h @@ -65,6 +65,8 @@ class inode_reader_v2 { impl_->dump(os, indent, chunks); } + void set_num_workers(size_t num) { impl_->set_num_workers(num); } + class impl { public: virtual ~impl() = default; @@ -77,6 +79,7 @@ class inode_reader_v2 { readv(size_t size, off_t offset, chunk_range chunks) const = 0; virtual void dump(std::ostream& os, const std::string& indent, chunk_range chunks) const = 0; + virtual void set_num_workers(size_t num) = 0; }; private: diff --git a/include/dwarfs/options.h b/include/dwarfs/options.h index 24ed6e06..8aea15e2 100644 --- a/include/dwarfs/options.h +++ b/include/dwarfs/options.h @@ -36,6 +36,7 @@ struct block_cache_options { size_t num_workers{0}; double decompress_ratio{1.0}; bool mm_release{true}; + bool init_workers{true}; }; struct metadata_options { diff --git a/include/dwarfs/worker_group.h b/include/dwarfs/worker_group.h index 4dd35fae..df5f44a4 100644 --- a/include/dwarfs/worker_group.h +++ b/include/dwarfs/worker_group.h @@ -50,22 +50,30 @@ class worker_group { * * \param num_workers Number of worker threads. */ - worker_group(const char* group_name = nullptr, size_t num_workers = 1, - size_t max_queue_len = std::numeric_limits::max(), - int niceness = 0); + explicit worker_group( + const char* group_name, size_t num_workers = 1, + size_t max_queue_len = std::numeric_limits::max(), + int niceness = 0); /** * Create a load adaptive worker group * * \param num_workers Number of worker threads. */ - worker_group(load_adaptive_tag, const char* group_name = nullptr, - size_t max_num_workers = 1, - size_t max_queue_len = std::numeric_limits::max(), - int niceness = 0); + explicit worker_group( + load_adaptive_tag, const char* group_name = nullptr, + size_t max_num_workers = 1, + size_t max_queue_len = std::numeric_limits::max(), + int niceness = 0); + worker_group() = default; ~worker_group() = default; + worker_group(worker_group&&) = default; + worker_group& operator=(worker_group&&) = default; + + explicit operator bool() const { return static_cast(impl_); } + void stop() { impl_->stop(); } void wait() { impl_->wait(); } bool running() const { return impl_->running(); } diff --git a/src/dwarfs.cpp b/src/dwarfs.cpp index fa4418c5..02ed485d 100644 --- a/src/dwarfs.cpp +++ b/src/dwarfs.cpp @@ -106,6 +106,17 @@ constexpr struct ::fuse_opt dwarfs_opts[] = { #define dUSERDATA \ auto userdata = reinterpret_cast(fuse_req_userdata(req)) +template +void op_init(void* data, struct fuse_conn_info* /*conn*/) { + auto userdata = reinterpret_cast(data); + LOG_PROXY(LoggerPolicy, userdata->lgr); + + LOG_DEBUG << __func__; + + // we must do this *after* the fuse driver has forked into background + userdata->fs.set_num_workers(userdata->opts.workers); +} + template void op_lookup(fuse_req_t req, fuse_ino_t parent, const char* name) { dUSERDATA; @@ -475,6 +486,7 @@ int option_hdl(void* data, const char* arg, int key, template void init_lowlevel_ops(struct fuse_lowlevel_ops& ops) { + ops.init = &op_init; ops.lookup = &op_lookup; ops.getattr = &op_getattr; ops.access = &op_access; @@ -581,6 +593,7 @@ void load_filesystem(dwarfs_userdata& userdata) { fsopts.block_cache.num_workers = opts.workers; fsopts.block_cache.decompress_ratio = opts.decompress_ratio; fsopts.block_cache.mm_release = !opts.cache_image; + fsopts.block_cache.init_workers = false; fsopts.metadata.enable_nlink = bool(opts.enable_nlink); fsopts.metadata.readonly = bool(opts.readonly); diff --git a/src/dwarfs/block_cache.cpp b/src/dwarfs/block_cache.cpp index 0aacc70d..b3efe5f3 100644 --- a/src/dwarfs/block_cache.cpp +++ b/src/dwarfs/block_cache.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -204,17 +205,24 @@ class block_cache_ final : public block_cache::impl { block_cache_(logger& lgr, std::shared_ptr mm, block_cache_options const& options) : cache_(0) - , wg_("blkcache", std::max(options.num_workers > 0 - ? options.num_workers - : std::thread::hardware_concurrency(), - static_cast(1))) , mm_(std::move(mm)) , LOG_PROXY_INIT(lgr) - , options_(options) {} + , options_(options) { + if (options.init_workers) { + wg_ = worker_group("blkcache", + std::max(options.num_workers > 0 + ? options.num_workers + : std::thread::hardware_concurrency(), + static_cast(1))); + } + } ~block_cache_() noexcept override { LOG_DEBUG << "stopping cache workers"; - wg_.stop(); + + if (wg_) { + wg_.stop(); + } if (!blocks_created_.load()) { return; @@ -288,6 +296,16 @@ class block_cache_ final : public block_cache::impl { }); } + void set_num_workers(size_t num) override { + std::unique_lock lock(mx_wg_); + + if (wg_) { + wg_.stop(); + } + + wg_ = worker_group("blkcache", num); + } + std::future get(size_t block_no, size_t offset, size_t size) const override { ++range_requests_; @@ -442,6 +460,8 @@ class block_cache_ final : public block_cache::impl { } void enqueue_job(std::shared_ptr brs) const { + std::shared_lock lock(mx_wg_); + // Lambda needs to be mutable so we can actually move out of it wg_.add_job([this, brs = std::move(brs)]() mutable { process_job(std::move(brs)); @@ -555,6 +575,7 @@ class block_cache_ final : public block_cache::impl { mutable std::atomic total_block_bytes_{0}; mutable std::atomic total_decompressed_bytes_{0}; + mutable std::shared_mutex mx_wg_; mutable worker_group wg_; std::vector block_; std::shared_ptr mm_; diff --git a/src/dwarfs/filesystem_v2.cpp b/src/dwarfs/filesystem_v2.cpp index 31db98ed..ca88f109 100644 --- a/src/dwarfs/filesystem_v2.cpp +++ b/src/dwarfs/filesystem_v2.cpp @@ -288,6 +288,7 @@ class filesystem_ final : public filesystem_v2::impl { folly::Expected>, int> readv(uint32_t inode, size_t size, off_t offset) const override; std::optional header() const override; + void set_num_workers(size_t num) override { ir_.set_num_workers(num); } private: LOG_PROXY_DECL(LoggerPolicy); diff --git a/src/dwarfs/inode_reader_v2.cpp b/src/dwarfs/inode_reader_v2.cpp index bd0df60d..de190ae9 100644 --- a/src/dwarfs/inode_reader_v2.cpp +++ b/src/dwarfs/inode_reader_v2.cpp @@ -68,6 +68,7 @@ class inode_reader_ final : public inode_reader_v2::impl { readv(size_t size, off_t offset, chunk_range chunks) const override; void dump(std::ostream& os, const std::string& indent, chunk_range chunks) const override; + void set_num_workers(size_t num) override { cache_.set_num_workers(num); } private: folly::Expected>, int>