feat(block_cache): add sequential access detector & prefetch

This commit is contained in:
Marcus Holland-Moritz 2024-04-27 19:00:28 +02:00
parent 61db31ff70
commit a216840ea5
2 changed files with 120 additions and 7 deletions

View File

@ -52,6 +52,7 @@ struct block_cache_options {
bool mm_release{true};
bool init_workers{true};
bool disable_block_integrity_check{false};
size_t sequential_access_detector_threshold{0};
};
struct history_config {

View File

@ -36,6 +36,7 @@
#include <fmt/format.h>
#include <folly/ScopeGuard.h>
#include <folly/container/EvictingCacheMap.h>
#include <folly/container/F14Map.h>
#include <folly/stats/Histogram.h>
@ -53,6 +54,78 @@
namespace dwarfs {
class sequential_access_detector {
public:
virtual ~sequential_access_detector() = default;
virtual void set_block_count(size_t) = 0;
virtual void touch(size_t block_no) = 0;
virtual std::optional<size_t> prefetch() const = 0;
};
class no_sequential_access_detector : public sequential_access_detector {
public:
void set_block_count(size_t) override {}
void touch(size_t) override {}
std::optional<size_t> prefetch() const override { return std::nullopt; }
};
class lru_sequential_access_detector : public sequential_access_detector {
public:
explicit lru_sequential_access_detector(size_t seq_blocks)
: lru_{seq_blocks}
, seq_blocks_{seq_blocks} {}
void set_block_count(size_t num_blocks) override {
std::lock_guard lock(mx_);
num_blocks_ = num_blocks;
lru_.clear();
is_sequential_.reset();
}
void touch(size_t block_no) override {
std::lock_guard lock(mx_);
lru_.set(block_no, block_no, true,
[this](size_t, size_t&&) { is_sequential_.reset(); });
}
std::optional<size_t> prefetch() const override {
std::lock_guard lock(mx_);
if (lru_.size() < seq_blocks_) {
return std::nullopt;
}
if (is_sequential_.has_value()) {
return std::nullopt;
}
auto minmax = std::minmax_element(
lru_.begin(), lru_.end(),
[](auto const& a, auto const& b) { return a.first < b.first; });
auto min = minmax.first->first;
auto max = minmax.second->first;
is_sequential_ = max - min + 1 == seq_blocks_;
if (*is_sequential_ && max + 1 < num_blocks_) {
return max + 1;
}
return std::nullopt;
}
private:
using lru_type = folly::EvictingCacheMap<size_t, size_t>;
std::mutex mutable mx_;
lru_type lru_;
std::optional<bool> mutable is_sequential_;
size_t num_blocks_{0};
size_t const seq_blocks_;
};
class block_request {
public:
block_request() = default;
@ -150,6 +223,8 @@ class block_cache_ final : public block_cache::impl {
PERFMON_CLS_TIMER_INIT(get, "block_no", "offset", "size")
PERFMON_CLS_TIMER_INIT(process, "block_no")
PERFMON_CLS_TIMER_INIT(decompress, "range_end") // clang-format on
, seq_access_detector_{create_seq_access_detector(
options.sequential_access_detector_threshold)}
, os_{os}
, options_(options) {
if (options.init_workers) {
@ -202,6 +277,7 @@ class block_cache_ final : public block_cache::impl {
LOG_VERBOSE << "blocks tidied: " << blocks_tidied_.load();
LOG_VERBOSE << "request sets merged: " << sets_merged_.load();
LOG_VERBOSE << "total requests: " << range_requests_.load();
LOG_VERBOSE << "sequential prefetches: " << sequential_prefetches_.load();
LOG_VERBOSE << "active hits (fast): " << active_hits_fast_.load();
LOG_VERBOSE << "active hits (slow): " << active_hits_slow_.load();
LOG_VERBOSE << "cache hits (fast): " << cache_hits_fast_.load();
@ -233,6 +309,7 @@ class block_cache_ final : public block_cache::impl {
void insert(fs_section const& section) override {
block_.emplace_back(section);
seq_access_detector_->set_block_count(block_.size());
}
void set_block_size(size_t size) override {
@ -298,6 +375,20 @@ class block_cache_ final : public block_cache::impl {
PERFMON_CLS_SCOPED_SECTION(get)
PERFMON_SET_CONTEXT(block_no, offset, size)
seq_access_detector_->touch(block_no);
SCOPE_EXIT {
if (auto next = seq_access_detector_->prefetch()) {
sequential_prefetches_.fetch_add(1, std::memory_order_relaxed);
{
std::lock_guard lock(mx_);
create_cached_block(*next, std::promise<block_range>{}, 0,
std::numeric_limits<size_t>::max());
}
}
};
range_requests_.fetch_add(1, std::memory_order_relaxed);
std::promise<block_range> promise;
@ -441,16 +532,34 @@ class block_cache_ final : public block_cache::impl {
// Bummer. We don't know anything about the block.
try {
LOG_TRACE << "block " << block_no << " not found";
LOG_TRACE << "block " << block_no << " not found";
create_cached_block(block_no, std::move(promise), offset, range_end);
return future;
}
private:
static std::unique_ptr<sequential_access_detector>
create_seq_access_detector(size_t threshold) {
if (threshold == 0) {
return std::make_unique<no_sequential_access_detector>();
}
return std::make_unique<lru_sequential_access_detector>(threshold);
}
void create_cached_block(size_t block_no, std::promise<block_range>&& promise,
size_t offset, size_t range_end) const {
try {
std::shared_ptr<cached_block> block = cached_block::create(
LOG_GET_LOGGER, DWARFS_NOTHROW(block_.at(block_no)), mm_,
options_.mm_release, options_.disable_block_integrity_check);
blocks_created_.fetch_add(1, std::memory_order_relaxed);
// Make a new set for the block
brs = std::make_shared<block_request_set>(std::move(block), block_no);
auto brs =
std::make_shared<block_request_set>(std::move(block), block_no);
// Promise will be fulfilled asynchronously
brs->add(offset, range_end, std::move(promise));
@ -462,11 +571,8 @@ class block_cache_ final : public block_cache::impl {
} catch (...) {
promise.set_exception(std::current_exception());
}
return future;
}
private:
void stop_tidy_thread() {
{
std::lock_guard lock(mx_);
@ -549,9 +655,13 @@ class block_cache_ final : public block_cache::impl {
// Process this request!
size_t range_end = req.end();
auto max_end = block->uncompressed_size();
if (range_end == std::numeric_limits<size_t>::max()) {
range_end = max_end;
}
if (is_last_req) {
auto max_end = block->uncompressed_size();
double ratio = double(range_end) / double(max_end);
if (ratio > options_.decompress_ratio) {
LOG_TRACE << "block " << block_no << " over ratio: " << ratio << " > "
@ -665,6 +775,7 @@ class block_cache_ final : public block_cache::impl {
mutable std::atomic<size_t> total_decompressed_bytes_{0};
mutable std::atomic<size_t> blocks_tidied_{0};
mutable std::atomic<size_t> active_expired_{0};
mutable std::atomic<size_t> sequential_prefetches_{0};
mutable folly::Histogram<size_t> active_set_size_{1, 0, 1024};
mutable std::shared_mutex mx_wg_;
@ -676,6 +787,7 @@ class block_cache_ final : public block_cache::impl {
PERFMON_CLS_TIMER_DECL(get)
PERFMON_CLS_TIMER_DECL(process)
PERFMON_CLS_TIMER_DECL(decompress)
std::unique_ptr<sequential_access_detector> seq_access_detector_;
os_access const& os_;
const block_cache_options options_;
cache_tidy_config tidy_config_;