From a216840ea5dec5afc53a52da1559329a9d6df83f Mon Sep 17 00:00:00 2001 From: Marcus Holland-Moritz Date: Sat, 27 Apr 2024 19:00:28 +0200 Subject: [PATCH] feat(block_cache): add sequential access detector & prefetch --- include/dwarfs/options.h | 1 + src/dwarfs/block_cache.cpp | 126 ++++++++++++++++++++++++++++++++++--- 2 files changed, 120 insertions(+), 7 deletions(-) diff --git a/include/dwarfs/options.h b/include/dwarfs/options.h index 07bf80cc..9fb745be 100644 --- a/include/dwarfs/options.h +++ b/include/dwarfs/options.h @@ -52,6 +52,7 @@ struct block_cache_options { bool mm_release{true}; bool init_workers{true}; bool disable_block_integrity_check{false}; + size_t sequential_access_detector_threshold{0}; }; struct history_config { diff --git a/src/dwarfs/block_cache.cpp b/src/dwarfs/block_cache.cpp index eb5fd0ee..948a1e56 100644 --- a/src/dwarfs/block_cache.cpp +++ b/src/dwarfs/block_cache.cpp @@ -36,6 +36,7 @@ #include +#include #include #include #include @@ -53,6 +54,78 @@ namespace dwarfs { +class sequential_access_detector { + public: + virtual ~sequential_access_detector() = default; + + virtual void set_block_count(size_t) = 0; + virtual void touch(size_t block_no) = 0; + virtual std::optional prefetch() const = 0; +}; + +class no_sequential_access_detector : public sequential_access_detector { + public: + void set_block_count(size_t) override {} + void touch(size_t) override {} + std::optional prefetch() const override { return std::nullopt; } +}; + +class lru_sequential_access_detector : public sequential_access_detector { + public: + explicit lru_sequential_access_detector(size_t seq_blocks) + : lru_{seq_blocks} + , seq_blocks_{seq_blocks} {} + + void set_block_count(size_t num_blocks) override { + std::lock_guard lock(mx_); + num_blocks_ = num_blocks; + lru_.clear(); + is_sequential_.reset(); + } + + void touch(size_t block_no) override { + std::lock_guard lock(mx_); + lru_.set(block_no, block_no, true, + [this](size_t, size_t&&) { is_sequential_.reset(); }); + } + + std::optional prefetch() const override { + std::lock_guard lock(mx_); + + if (lru_.size() < seq_blocks_) { + return std::nullopt; + } + + if (is_sequential_.has_value()) { + return std::nullopt; + } + + auto minmax = std::minmax_element( + lru_.begin(), lru_.end(), + [](auto const& a, auto const& b) { return a.first < b.first; }); + + auto min = minmax.first->first; + auto max = minmax.second->first; + + is_sequential_ = max - min + 1 == seq_blocks_; + + if (*is_sequential_ && max + 1 < num_blocks_) { + return max + 1; + } + + return std::nullopt; + } + + private: + using lru_type = folly::EvictingCacheMap; + + std::mutex mutable mx_; + lru_type lru_; + std::optional mutable is_sequential_; + size_t num_blocks_{0}; + size_t const seq_blocks_; +}; + class block_request { public: block_request() = default; @@ -150,6 +223,8 @@ class block_cache_ final : public block_cache::impl { PERFMON_CLS_TIMER_INIT(get, "block_no", "offset", "size") PERFMON_CLS_TIMER_INIT(process, "block_no") PERFMON_CLS_TIMER_INIT(decompress, "range_end") // clang-format on + , seq_access_detector_{create_seq_access_detector( + options.sequential_access_detector_threshold)} , os_{os} , options_(options) { if (options.init_workers) { @@ -202,6 +277,7 @@ class block_cache_ final : public block_cache::impl { LOG_VERBOSE << "blocks tidied: " << blocks_tidied_.load(); LOG_VERBOSE << "request sets merged: " << sets_merged_.load(); LOG_VERBOSE << "total requests: " << range_requests_.load(); + LOG_VERBOSE << "sequential prefetches: " << sequential_prefetches_.load(); LOG_VERBOSE << "active hits (fast): " << active_hits_fast_.load(); LOG_VERBOSE << "active hits (slow): " << active_hits_slow_.load(); LOG_VERBOSE << "cache hits (fast): " << cache_hits_fast_.load(); @@ -233,6 +309,7 @@ class block_cache_ final : public block_cache::impl { void insert(fs_section const& section) override { block_.emplace_back(section); + seq_access_detector_->set_block_count(block_.size()); } void set_block_size(size_t size) override { @@ -298,6 +375,20 @@ class block_cache_ final : public block_cache::impl { PERFMON_CLS_SCOPED_SECTION(get) PERFMON_SET_CONTEXT(block_no, offset, size) + seq_access_detector_->touch(block_no); + + SCOPE_EXIT { + if (auto next = seq_access_detector_->prefetch()) { + sequential_prefetches_.fetch_add(1, std::memory_order_relaxed); + + { + std::lock_guard lock(mx_); + create_cached_block(*next, std::promise{}, 0, + std::numeric_limits::max()); + } + } + }; + range_requests_.fetch_add(1, std::memory_order_relaxed); std::promise promise; @@ -441,16 +532,34 @@ class block_cache_ final : public block_cache::impl { // Bummer. We don't know anything about the block. - try { - LOG_TRACE << "block " << block_no << " not found"; + LOG_TRACE << "block " << block_no << " not found"; + create_cached_block(block_no, std::move(promise), offset, range_end); + + return future; + } + + private: + static std::unique_ptr + create_seq_access_detector(size_t threshold) { + if (threshold == 0) { + return std::make_unique(); + } + + return std::make_unique(threshold); + } + + void create_cached_block(size_t block_no, std::promise&& promise, + size_t offset, size_t range_end) const { + try { std::shared_ptr block = cached_block::create( LOG_GET_LOGGER, DWARFS_NOTHROW(block_.at(block_no)), mm_, options_.mm_release, options_.disable_block_integrity_check); blocks_created_.fetch_add(1, std::memory_order_relaxed); // Make a new set for the block - brs = std::make_shared(std::move(block), block_no); + auto brs = + std::make_shared(std::move(block), block_no); // Promise will be fulfilled asynchronously brs->add(offset, range_end, std::move(promise)); @@ -462,11 +571,8 @@ class block_cache_ final : public block_cache::impl { } catch (...) { promise.set_exception(std::current_exception()); } - - return future; } - private: void stop_tidy_thread() { { std::lock_guard lock(mx_); @@ -549,9 +655,13 @@ class block_cache_ final : public block_cache::impl { // Process this request! size_t range_end = req.end(); + auto max_end = block->uncompressed_size(); + + if (range_end == std::numeric_limits::max()) { + range_end = max_end; + } if (is_last_req) { - auto max_end = block->uncompressed_size(); double ratio = double(range_end) / double(max_end); if (ratio > options_.decompress_ratio) { LOG_TRACE << "block " << block_no << " over ratio: " << ratio << " > " @@ -665,6 +775,7 @@ class block_cache_ final : public block_cache::impl { mutable std::atomic total_decompressed_bytes_{0}; mutable std::atomic blocks_tidied_{0}; mutable std::atomic active_expired_{0}; + mutable std::atomic sequential_prefetches_{0}; mutable folly::Histogram active_set_size_{1, 0, 1024}; mutable std::shared_mutex mx_wg_; @@ -676,6 +787,7 @@ class block_cache_ final : public block_cache::impl { PERFMON_CLS_TIMER_DECL(get) PERFMON_CLS_TIMER_DECL(process) PERFMON_CLS_TIMER_DECL(decompress) + std::unique_ptr seq_access_detector_; os_access const& os_; const block_cache_options options_; cache_tidy_config tidy_config_;