Add bloom filter to speed up block manager

This commit is contained in:
Marcus Holland-Moritz 2021-03-08 17:51:34 +01:00
parent 35ea333f85
commit 3a78ab9ce4
3 changed files with 193 additions and 60 deletions

View File

@ -41,6 +41,7 @@ class block_manager {
size_t max_active_blocks{1};
size_t memory_limit{256 << 20};
unsigned block_size_bits{22};
unsigned bloom_filter_size{4};
};
block_manager(logger& lgr, progress& prog, const config& cfg,

View File

@ -32,6 +32,7 @@
#include <parallel_hashmap/phmap.h>
#include <folly/hash/Hash.h>
#include <folly/small_vector.h>
#include <folly/stats/Histogram.h>
@ -83,6 +84,9 @@ struct bm_stats {
size_t total_matches{0};
size_t good_matches{0};
size_t bad_matches{0};
size_t bloom_lookups{0};
size_t bloom_hits{0};
size_t bloom_true_positives{0};
folly::Histogram<size_t> l2_collision_vec_size;
};
@ -126,17 +130,91 @@ class fast_multimap {
collision_t collisions_;
};
constexpr unsigned bitcount(unsigned n) {
return n > 0 ? (n & 1) + bitcount(n >> 1) : 0;
}
constexpr uint64_t pow2ceil(uint64_t n) {
n--;
n |= n >> 1;
n |= n >> 2;
n |= n >> 4;
n |= n >> 8;
n |= n >> 16;
n |= n >> 32;
n++;
return n;
}
/**
* A very simple bloom filter. This is not generalized at all and highly
* optimized for the cyclic hash use case.
*
* - Since we're already using a hash value, there's no need to hash the
* value before accessing the bloom filter bit field.
*
* - We can accept a high false positive rate as the secondary lookup
* is not very expensive. However, the bloom filter lookup must be
* extremely cheap, so we can't afford e.g. using two hashes instead
* of one.
*/
class bloom_filter {
public:
using bits_type = uint64_t;
static constexpr size_t value_mask = 8 * sizeof(bits_type) - 1;
static constexpr size_t index_shift = bitcount(value_mask);
bloom_filter(size_t size)
: index_mask_{(size >> index_shift) - 1} {
if (size & (size - 1)) {
throw std::runtime_error("size must be a power of two");
}
bits_.resize(size >> index_shift);
}
void add(size_t val) { set(val); }
bool test(size_t val) const { return isset(val); }
size_t size() const { return bits_.size() << index_shift; }
void clear() { std::fill(bits_.begin(), bits_.end(), 0); }
void merge(bloom_filter const& other) {
if (bits_.size() != other.bits_.size()) {
throw std::runtime_error("size mismatch");
}
std::transform(bits_.cbegin(), bits_.cend(), other.bits_.cbegin(),
bits_.begin(), std::bit_or<>{});
}
private:
void set(size_t ix) {
bits_[(ix >> index_shift) & index_mask_] |= 1 << (ix & value_mask);
}
bool isset(size_t ix) const {
return bits_[(ix >> index_shift) & index_mask_] & (1 << (ix & value_mask));
}
std::vector<bits_type> bits_;
size_t const index_mask_;
};
class active_block {
private:
using offset_t = uint32_t;
using hash_t = uint32_t;
public:
active_block(size_t num, size_t size, size_t window_size, size_t window_step)
active_block(size_t num, size_t size, size_t window_size, size_t window_step,
size_t bloom_filter_size)
: num_(num)
, capacity_(size)
, window_size_(window_size)
, window_step_mask_(window_step - 1)
, filter_(bloom_filter_size)
, data_{std::make_shared<block_data>()} {
DWARFS_CHECK((window_step & window_step_mask_) == 0,
"window step size not a power of two");
@ -149,7 +227,7 @@ class active_block {
bool full() const { return size() == capacity_; }
std::shared_ptr<block_data> data() const { return data_; }
void append(uint8_t const* p, size_t size);
void append(uint8_t const* p, size_t size, bloom_filter* filter);
size_t next_hash_distance() const {
return window_step_mask_ + 1 - (data_->vec().size() & window_step_mask_);
@ -160,6 +238,13 @@ class active_block {
offsets_.for_each_value(key, std::forward<F>(func));
}
template <typename F>
void for_each_offset_filter(hash_t key, F&& func) const {
if (DWARFS_UNLIKELY(filter_.test(key))) {
offsets_.for_each_value(key, std::forward<F>(func));
}
}
void finalize(bm_stats& stats) {
stats.total_hashes += offsets_.values().size();
for (auto& c : offsets_.collisions()) {
@ -169,11 +254,14 @@ class active_block {
}
}
bloom_filter const& filter() const { return filter_; }
private:
static constexpr size_t num_inline_offsets = 4;
size_t num_, capacity_, window_size_, window_step_mask_;
rsync_hash hasher_;
bloom_filter filter_;
fast_multimap<hash_t, offset_t, num_inline_offsets> offsets_;
std::shared_ptr<block_data> data_;
};
@ -192,7 +280,10 @@ class block_manager_ final : public block_manager::impl {
? static_cast<size_t>(1) << cfg.blockhash_window_size
: 0}
, window_step_{window_size_ >> cfg.window_increment_shift}
, block_size_{static_cast<size_t>(1) << cfg.block_size_bits} {}
, block_size_{static_cast<size_t>(1) << cfg.block_size_bits}
, filter_{bloom_filter_size()} {
LOG_INFO << "bloom filter size: " << size_with_unit(filter_.size() / 8);
}
void add_inode(std::shared_ptr<inode> ino) override;
void finish_blocks() override;
@ -208,6 +299,11 @@ class block_manager_ final : public block_manager::impl {
void append_to_block(inode& ino, mmif& mm, size_t offset, size_t size);
void add_data(inode& ino, mmif& mm, size_t offset, size_t size);
void segment_and_add_data(inode& ino, mmif& mm, size_t size);
size_t bloom_filter_size() const {
auto hash_count = pow2ceil(std::max<size_t>(1, cfg_.max_active_blocks)) *
(block_size_ / window_step_);
return (1 << cfg_.bloom_filter_size) * hash_count;
}
log_proxy<LoggerPolicy> log_;
progress& prog_;
@ -222,6 +318,8 @@ class block_manager_ final : public block_manager::impl {
chunk_state chunk_;
bloom_filter filter_;
bm_stats stats_;
// Active blocks are blocks that can still be referenced from new chunks.
@ -259,7 +357,7 @@ class segment_match {
uint8_t const* data_;
};
void active_block::append(uint8_t const* p, size_t size) {
void active_block::append(uint8_t const* p, size_t size, bloom_filter* filter) {
auto& v = data_->vec();
auto offset = v.size();
DWARFS_CHECK(offset + size <= capacity_,
@ -275,7 +373,12 @@ void active_block::append(uint8_t const* p, size_t size) {
} else {
hasher_.update(v[offset - window_size_], v[offset]);
if (DWARFS_UNLIKELY((++offset & window_step_mask_) == 0)) {
offsets_.insert(hasher_(), offset - window_size_);
auto hashval = hasher_();
offsets_.insert(hashval, offset - window_size_);
filter_.add(hashval);
if (filter) {
filter->add(hashval);
}
}
}
}
@ -338,6 +441,13 @@ void block_manager_<LoggerPolicy>::finish_blocks() {
auto l1_collisions = stats_.l2_collision_vec_size.computeTotalCount();
LOG_INFO << "bloom filter reject rate: "
<< fmt::format("{:.3f}%", 100.0 - 100.0 * stats_.bloom_hits /
stats_.bloom_lookups)
<< " (TPR="
<< fmt::format("{:.3f}%", 100.0 * stats_.bloom_true_positives /
stats_.bloom_hits)
<< ", lookups=" << stats_.bloom_lookups << ")";
LOG_INFO << "segmentation matches: good=" << stats_.good_matches
<< ", bad=" << stats_.bad_matches
<< ", total=" << stats_.total_matches;
@ -376,14 +486,19 @@ void block_manager_<LoggerPolicy>::append_to_block(inode& ino, mmif& mm,
blocks_.pop_front();
}
filter_.clear();
for (auto const& b : blocks_) {
filter_.merge(b.filter());
}
blocks_.emplace_back(block_count_++, block_size_,
cfg_.max_active_blocks > 0 ? window_size_ : 0,
window_step_);
window_step_, filter_.size());
}
auto& block = blocks_.back();
block.append(mm.as<uint8_t>(offset), size);
block.append(mm.as<uint8_t>(offset), size, &filter_);
chunk_.size += size;
prog_.filesystem_size += size;
@ -444,71 +559,85 @@ void block_manager_<LoggerPolicy>::segment_and_add_data(inode& ino, mmif& mm,
}
std::vector<segment_match> matches;
const bool single_block_mode = cfg_.max_active_blocks == 1;
while (offset < size) {
for (auto const& block : blocks_) {
block.for_each_offset(hasher(), [&](uint32_t offset) {
matches.emplace_back(&block, offset);
});
}
if (DWARFS_UNLIKELY(!matches.empty())) {
LOG_TRACE << "found " << matches.size() << " matches (hash=" << hasher()
<< ", window size=" << window_size_ << ")";
for (auto& m : matches) {
LOG_TRACE << " block " << m.block_num() << " @ " << m.offset();
m.verify_and_extend(p + offset - window_size_, window_size_,
p + written, p + size);
++stats_.bloom_lookups;
if (DWARFS_UNLIKELY(filter_.test(hasher()))) {
++stats_.bloom_hits;
if (single_block_mode) {
auto& block = blocks_.front();
block.for_each_offset(hasher(), [&](uint32_t offset) {
matches.emplace_back(&block, offset);
});
} else {
for (auto const& block : blocks_) {
block.for_each_offset_filter(hasher(), [&](uint32_t offset) {
matches.emplace_back(&block, offset);
});
}
}
stats_.total_matches += matches.size();
stats_.bad_matches +=
std::count_if(matches.begin(), matches.end(),
[](auto const& m) { return m.size() == 0; });
if (DWARFS_UNLIKELY(!matches.empty())) {
++stats_.bloom_true_positives;
auto best = std::max_element(matches.begin(), matches.end());
auto match_len = best->size();
LOG_TRACE << "found " << matches.size() << " matches (hash=" << hasher()
<< ", window size=" << window_size_ << ")";
if (match_len > 0) {
++stats_.good_matches;
LOG_TRACE << "successful match of length " << match_len << " @ "
<< best->offset();
auto block_num = best->block_num();
auto match_off = best->offset();
auto num_to_write = best->data() - (p + written);
// best->block can be invalidated by this call to add_data()!
add_data(ino, mm, written, num_to_write);
written += num_to_write;
finish_chunk(ino);
ino.add_chunk(block_num, match_off, match_len);
written += match_len;
prog_.saved_by_segmentation += match_len;
offset = written;
if (size - written < window_size_) {
break;
for (auto& m : matches) {
LOG_TRACE << " block " << m.block_num() << " @ " << m.offset();
m.verify_and_extend(p + offset - window_size_, window_size_,
p + written, p + size);
}
hasher.clear();
stats_.total_matches += matches.size();
stats_.bad_matches +=
std::count_if(matches.begin(), matches.end(),
[](auto const& m) { return m.size() == 0; });
for (; offset < written + window_size_; ++offset) {
hasher.update(p[offset]);
auto best = std::max_element(matches.begin(), matches.end());
auto match_len = best->size();
if (match_len > 0) {
++stats_.good_matches;
LOG_TRACE << "successful match of length " << match_len << " @ "
<< best->offset();
auto block_num = best->block_num();
auto match_off = best->offset();
auto num_to_write = best->data() - (p + written);
// best->block can be invalidated by this call to add_data()!
add_data(ino, mm, written, num_to_write);
written += num_to_write;
finish_chunk(ino);
ino.add_chunk(block_num, match_off, match_len);
written += match_len;
prog_.saved_by_segmentation += match_len;
offset = written;
if (size - written < window_size_) {
break;
}
hasher.clear();
for (; offset < written + window_size_; ++offset) {
hasher.update(p[offset]);
}
next_hash_offset =
written + lookback_size + blocks_.back().next_hash_distance();
}
next_hash_offset =
written + lookback_size + blocks_.back().next_hash_distance();
}
matches.clear();
matches.clear();
if (match_len > 0) {
continue;
if (match_len > 0) {
continue;
}
}
}

View File

@ -380,6 +380,9 @@ int mkdwarfs(int argc, char** argv) {
po::value<unsigned>(&cfg.window_increment_shift)
->default_value(1),
"window step (as right shift of size)")
("bloom-filter-size",
po::value<unsigned>(&cfg.bloom_filter_size)->default_value(5),
"bloom filter size (2^N*values bits)")
("memory-limit,L",
po::value<std::string>(&memory_limit)->default_value("1g"),
"block manager memory limit")