first draft of adding granularity support to segmenter

This commit is contained in:
Marcus Holland-Moritz 2023-08-18 21:50:21 +02:00
parent c903f6b265
commit 029b4def9c

View File

@ -24,6 +24,7 @@
#include <cstdint>
#include <cstring>
#include <deque>
#include <ranges>
#include <string>
#include <type_traits>
#include <utility>
@ -120,6 +121,24 @@ class fast_multimap {
}
}
template <typename F>
bool any_value_is(KeyT const& key, F&& func) const {
if (auto it = values_.find(key); it != values_.end()) [[unlikely]] {
if (func(it->second)) {
return true;
}
if (auto it2 = collisions_.find(key); it2 != collisions_.end())
[[unlikely]] {
for (auto const& val : it2->second) {
if (func(val)) {
return true;
}
}
}
}
return false;
}
void clear() {
values_.clear();
collisions_.clear();
@ -133,6 +152,9 @@ class fast_multimap {
collision_t collisions_;
};
using repeating_sequence_map_type = phmap::flat_hash_map<uint32_t, uint8_t>;
using repeating_collisions_map_type = std::unordered_map<uint8_t, uint32_t>;
constexpr unsigned bitcount(unsigned n) {
return n > 0 ? (n & 1) + bitcount(n >> 1) : 0;
}
@ -267,10 +289,13 @@ class ConstantGranularityPolicy : private GranularityPolicyBase {
static constexpr size_t const kGranularity{N};
template <typename T>
static void
add_new_block(T& blocks, size_t num, size_t size, size_t window_size,
size_t window_step, size_t bloom_filter_size) {
blocks.emplace_back(num, size, window_size, window_step, bloom_filter_size);
static void add_new_block(T& blocks, logger& lgr,
repeating_sequence_map_type const& repseqmap,
repeating_collisions_map_type& repcoll, size_t num,
size_t size, size_t window_size, size_t window_step,
size_t bloom_filter_size) {
blocks.emplace_back(lgr, repseqmap, repcoll, num, size, window_size,
window_step, bloom_filter_size);
}
template <typename T, typename U>
@ -299,6 +324,25 @@ class ConstantGranularityPolicy : private GranularityPolicyBase {
}
return size;
}
template <typename T, typename... Args>
static T create(Args&&... args) {
return T(std::forward<Args>(args)...);
}
static size_t bytes_to_frames(size_t size) {
assert(size % kGranularity == 0);
return size / kGranularity;
}
static size_t frames_to_bytes(size_t size) { return size * kGranularity; }
template <typename T>
static void for_bytes_in_frame(T&& func) {
for (size_t i = 0; i < kGranularity; ++i) {
func();
}
}
};
class VariableGranularityPolicy : private GranularityPolicyBase {
@ -307,10 +351,13 @@ class VariableGranularityPolicy : private GranularityPolicyBase {
: granularity_{granularity} {}
template <typename T>
void add_new_block(T& blocks, size_t num, size_t size, size_t window_size,
size_t window_step, size_t bloom_filter_size) const {
blocks.emplace_back(num, size, window_size, window_step, bloom_filter_size,
granularity_);
void add_new_block(T& blocks, logger& lgr,
repeating_sequence_map_type const& repseqmap,
repeating_collisions_map_type& repcoll, size_t num,
size_t size, size_t window_size, size_t window_step,
size_t bloom_filter_size) const {
blocks.emplace_back(lgr, repseqmap, repcoll, num, size, window_size,
window_step, bloom_filter_size, granularity_);
}
template <typename T, typename U>
@ -336,11 +383,93 @@ class VariableGranularityPolicy : private GranularityPolicyBase {
return size;
}
template <typename T, typename... Args>
T create(Args&&... args) const {
return T(std::forward<Args>(args)..., granularity_);
}
size_t bytes_to_frames(size_t size) const {
assert(size % granularity_ == 0);
return size / granularity_;
}
size_t frames_to_bytes(size_t size) const { return size * granularity_; }
template <typename T>
void for_bytes_in_frame(T&& func) const {
for (size_t i = 0; i < granularity_; ++i) {
func();
}
}
private:
uint_fast32_t const granularity_;
};
template <typename GranularityPolicy>
template <typename T, typename GranularityPolicy>
class granular_span_adapter : private GranularityPolicy {
public:
template <typename... PolicyArgs>
granular_span_adapter(std::span<T> s, PolicyArgs&&... args)
: GranularityPolicy(std::forward<PolicyArgs>(args)...)
, s_{s} {}
size_t size() const { return this->bytes_to_frames(s_.size()); }
std::span<T> raw() const { return s_; }
template <typename H>
void update_hash(H& hasher, size_t offset) const {
offset = this->frames_to_bytes(offset);
this->for_bytes_in_frame([&] { hasher.update(s_[offset++]); });
}
template <typename H>
void update_hash(H& hasher, size_t from, size_t to) const {
from = this->frames_to_bytes(from);
to = this->frames_to_bytes(to);
this->for_bytes_in_frame([&] { hasher.update(s_[from++], s_[to++]); });
}
private:
std::span<T> s_;
};
template <typename T, typename GranularityPolicy>
class granular_vector_adapter : private GranularityPolicy {
public:
template <typename... PolicyArgs>
granular_vector_adapter(std::vector<T>& v, PolicyArgs&&... args)
: GranularityPolicy(std::forward<PolicyArgs>(args)...)
, v_{v} {}
size_t size() const { return this->bytes_to_frames(v_.size()); }
void append(granular_span_adapter<T const, GranularityPolicy> span) {
auto raw = span.raw();
auto off = v_.size();
v_.resize(off + raw.size());
::memcpy(v_.data() + off, raw.data(), raw.size());
}
template <typename H>
void update_hash(H& hasher, size_t offset) const {
offset = this->frames_to_bytes(offset);
this->for_bytes_in_frame([&] { hasher.update(v_[offset++]); });
}
template <typename H>
void update_hash(H& hasher, size_t from, size_t to) const {
from = this->frames_to_bytes(from);
to = this->frames_to_bytes(to);
this->for_bytes_in_frame([&] { hasher.update(v_[from++], v_[to++]); });
}
private:
std::vector<T>& v_;
};
template <typename LoggerPolicy, typename GranularityPolicy>
class active_block : private GranularityPolicy {
private:
using offset_t = uint32_t;
@ -348,30 +477,37 @@ class active_block : private GranularityPolicy {
public:
template <typename... PolicyArgs>
active_block(size_t num, size_t size, size_t window_size, size_t window_step,
active_block(logger& lgr, repeating_sequence_map_type const& repseqmap,
repeating_collisions_map_type& repcoll, size_t num,
size_t size_in_frames, size_t window_size, size_t window_step,
size_t bloom_filter_size, PolicyArgs&&... args)
: GranularityPolicy(std::forward<PolicyArgs>(args)...)
, LOG_PROXY_INIT(lgr)
, num_(num)
, capacity_(size)
, capacity_in_frames_(size_in_frames)
, window_size_(window_size)
, window_step_mask_(window_step - 1)
, filter_(bloom_filter_size)
, repseqmap_{repseqmap}
, repeating_collisions_{repcoll}
, data_{std::make_shared<block_data>()} {
DWARFS_CHECK((window_step & window_step_mask_) == 0,
"window step size not a power of two");
data_->reserve(capacity_);
data_->reserve(this->frames_to_bytes(capacity_in_frames_));
}
size_t num() const { return num_; }
size_t size() const { return data_->size(); }
bool full() const { return size() == capacity_; }
size_t size_in_frames() const { return this->bytes_to_frames(data_->size()); }
bool full() const { return size_in_frames() == capacity_in_frames_; }
std::shared_ptr<block_data> data() const { return data_; }
void append(std::span<uint8_t const> data, bloom_filter& global_filter);
void append_bytes(std::span<uint8_t const> data, bloom_filter& global_filter);
size_t next_hash_distance() const {
return window_step_mask_ + 1 - (data_->size() & window_step_mask_);
size_t next_hash_distance_in_frames() const {
return window_step_mask_ + 1 - (size_in_frames() & window_step_mask_);
}
template <typename F>
@ -398,12 +534,17 @@ class active_block : private GranularityPolicy {
bloom_filter const& filter() const { return filter_; }
private:
bool is_existing_repeating_sequence(hash_t hashval, size_t offset);
static constexpr size_t num_inline_offsets = 4;
size_t const num_, capacity_, window_size_, window_step_mask_;
LOG_PROXY_DECL(LoggerPolicy);
size_t const num_, capacity_in_frames_, window_size_, window_step_mask_;
rsync_hash hasher_;
bloom_filter filter_;
fast_multimap<hash_t, offset_t, num_inline_offsets> offsets_;
repeating_sequence_map_type const& repseqmap_;
repeating_collisions_map_type& repeating_collisions_;
std::shared_ptr<block_data> data_;
};
@ -422,7 +563,7 @@ class segmenter_ final : public segmenter::impl, private GranularityPolicy {
, block_ready_{std::move(block_ready)}
, window_size_{window_size(cfg)}
, window_step_{window_step(cfg)}
, block_size_{this->constrained_block_size(block_size(cfg))}
, block_size_in_frames_{block_size_in_frames(cfg)}
, global_filter_{bloom_filter_size(cfg)}
, match_counts_{1, 0, 128} {
if (segmentation_enabled()) {
@ -430,6 +571,15 @@ class segmenter_ final : public segmenter::impl, private GranularityPolicy {
<< size_with_unit(window_step_) << " steps for segment analysis";
LOG_INFO << "bloom filter size: "
<< size_with_unit(global_filter_.size() / 8);
repeating_sequence_hash_values_.reserve(256);
for (int i = 0; i < 256; ++i) {
repeating_sequence_hash_values_.emplace(
rsync_hash::repeating_window(i,
this->frames_to_bytes(window_size_)),
i);
}
}
}
@ -438,6 +588,10 @@ class segmenter_ final : public segmenter::impl, private GranularityPolicy {
LOG_DEBUG << "match counts p50: " << pct(0.5) << ", p75: " << pct(0.75)
<< ", p90: " << pct(0.9) << ", p95: " << pct(0.95)
<< ", p99: " << pct(0.99);
for (auto [k, v] : repeating_collisions_) {
LOG_INFO << fmt::format(
"avoided {} collisions in 0x{:02x}-byte sequences", v, k);
}
}
void add_chunkable(chunkable& chkable) override;
@ -445,8 +599,8 @@ class segmenter_ final : public segmenter::impl, private GranularityPolicy {
private:
struct chunk_state {
size_t offset{0};
size_t size{0};
size_t offset_in_frames{0};
size_t size_in_frames{0};
};
bool segmentation_enabled() const {
@ -455,13 +609,15 @@ class segmenter_ final : public segmenter::impl, private GranularityPolicy {
void block_ready();
void finish_chunk(chunkable& chkable);
void append_to_block(chunkable& chkable, size_t offset, size_t size);
void add_data(chunkable& chkable, size_t offset, size_t size);
void segment_and_add_data(chunkable& chkable, size_t size);
void append_to_block(chunkable& chkable, size_t offset_in_frames,
size_t size_in_frames);
void
add_data(chunkable& chkable, size_t offset_in_frames, size_t size_in_frames);
void segment_and_add_data(chunkable& chkable, size_t size_in_frames);
static size_t bloom_filter_size(const segmenter::config& cfg) {
size_t bloom_filter_size(const segmenter::config& cfg) const {
auto hash_count = pow2ceil(std::max<size_t>(1, cfg.max_active_blocks)) *
(block_size(cfg) / window_step(cfg));
(block_size_in_frames(cfg) / window_step(cfg));
return (static_cast<size_t>(1) << cfg.bloom_filter_size) * hash_count;
}
@ -475,8 +631,8 @@ class segmenter_ final : public segmenter::impl, private GranularityPolicy {
return std::max<size_t>(1, window_size(cfg) >> cfg.window_increment_shift);
}
static size_t block_size(const segmenter::config& cfg) {
return static_cast<size_t>(1) << cfg.block_size_bits;
size_t block_size_in_frames(const segmenter::config& cfg) const {
return this->bytes_to_frames(static_cast<size_t>(1) << cfg.block_size_bits);
}
LOG_PROXY_DECL(LoggerPolicy);
@ -487,7 +643,7 @@ class segmenter_ final : public segmenter::impl, private GranularityPolicy {
size_t const window_size_;
size_t const window_step_;
size_t const block_size_;
size_t const block_size_in_frames_;
chunk_state chunk_;
@ -495,7 +651,7 @@ class segmenter_ final : public segmenter::impl, private GranularityPolicy {
segmenter_stats stats_;
using active_block_type = active_block<GranularityPolicy>;
using active_block_type = active_block<LoggerPolicy, GranularityPolicy>;
// Active blocks are blocks that can still be referenced from new chunks.
// Up to N blocks (configurable) can be active and are kept in this queue.
@ -503,13 +659,16 @@ class segmenter_ final : public segmenter::impl, private GranularityPolicy {
// already being compressed.
std::deque<active_block_type> blocks_;
repeating_sequence_map_type repeating_sequence_hash_values_;
repeating_collisions_map_type repeating_collisions_;
folly::Histogram<size_t> match_counts_;
};
template <typename GranularityPolicy>
template <typename LoggerPolicy, typename GranularityPolicy>
class segment_match : private GranularityPolicy {
public:
using active_block_type = active_block<GranularityPolicy>;
using active_block_type = active_block<LoggerPolicy, GranularityPolicy>;
template <typename... PolicyArgs>
segment_match(active_block_type const* blk, uint32_t off,
@ -540,45 +699,83 @@ class segment_match : private GranularityPolicy {
uint8_t const* data_{nullptr};
};
template <typename GranularityPolicy>
void active_block<GranularityPolicy>::append(std::span<uint8_t const> data,
bloom_filter& global_filter) {
auto& v = data_->vec();
template <typename LoggerPolicy, typename GranularityPolicy>
bool active_block<LoggerPolicy, GranularityPolicy>::
is_existing_repeating_sequence(hash_t hashval, size_t offset) {
if (auto it = repseqmap_.find(hashval); it != repseqmap_.end()) [[unlikely]] {
auto& raw = data_->vec();
auto window = raw | std::views::drop(this->frames_to_bytes(offset)) |
std::views::take(this->frames_to_bytes(window_size_));
if (std::ranges::find_if(window, [byte = it->second](auto b) {
return b != byte;
}) == window.end()) {
return offsets_.any_value_is(hashval, [&, this](auto off) {
auto offwin = raw | std::views::drop(this->frames_to_bytes(off)) |
std::views::take(this->frames_to_bytes(window_size_));
if (std::ranges::find_if(offwin, [byte = it->second](auto b) {
return b != byte;
}) == offwin.end()) {
++repeating_collisions_[it->second];
return true;
}
return false;
});
}
}
return false;
}
template <typename LoggerPolicy, typename GranularityPolicy>
void active_block<LoggerPolicy, GranularityPolicy>::append_bytes(
std::span<uint8_t const> data, bloom_filter& global_filter) {
auto src = this->template create<
granular_span_adapter<uint8_t const, GranularityPolicy>>(data);
auto v = this->template create<
granular_vector_adapter<uint8_t, GranularityPolicy>>(data_->vec());
auto offset = v.size();
assert(this->is_valid_granularity_size(data.size()));
DWARFS_CHECK(offset + src.size() <= capacity_in_frames_,
fmt::format("block capacity exceeded: {} + {} > {}",
this->frames_to_bytes(offset),
this->frames_to_bytes(src.size()),
this->frames_to_bytes(capacity_in_frames_)));
DWARFS_CHECK(offset + data.size() <= capacity_,
fmt::format("block capacity exceeded: {} + {} > {}", offset,
data.size(), capacity_));
v.resize(offset + data.size());
::memcpy(v.data() + offset, data.data(), data.size());
v.append(src);
if (window_size_ > 0) {
while (offset < v.size()) {
if (offset < window_size_) [[unlikely]] {
hasher_.update(v[offset++]);
v.update_hash(hasher_, offset);
} else {
hasher_.update(v[offset - window_size_], v[offset]);
if ((++offset & window_step_mask_) == 0) [[unlikely]] {
v.update_hash(hasher_, offset - window_size_, offset);
}
if (++offset >= window_size_) [[likely]] {
if ((offset & window_step_mask_) == 0) [[unlikely]] {
auto hashval = hasher_();
offsets_.insert(hashval, offset - window_size_);
filter_.add(hashval);
global_filter.add(hashval);
if (!is_existing_repeating_sequence(hashval, offset - window_size_))
[[likely]] {
offsets_.insert(hashval, offset - window_size_);
filter_.add(hashval);
global_filter.add(hashval);
}
}
}
}
}
}
template <typename GranularityPolicy>
void segment_match<GranularityPolicy>::verify_and_extend(uint8_t const* pos,
size_t len,
uint8_t const* begin,
uint8_t const* end) {
template <typename LoggerPolicy, typename GranularityPolicy>
void segment_match<LoggerPolicy, GranularityPolicy>::verify_and_extend(
uint8_t const* pos, size_t len, uint8_t const* begin, uint8_t const* end) {
auto const& v = block_->data()->vec();
// First, check if the regions actually match
if (::memcmp(v.data() + offset_, pos, len) == 0) {
// scan backward
auto tmp = offset_;
@ -599,22 +796,24 @@ void segment_match<GranularityPolicy>::verify_and_extend(uint8_t const* pos,
}
size_ = tmp - offset_;
}
// No match, this was a hash collision, we're done.
// size_ defaults to 0 unless we have a real match and set it above.
}
template <typename LoggerPolicy, typename GranularityPolicy>
void segmenter_<LoggerPolicy, GranularityPolicy>::add_chunkable(
chunkable& chkable) {
if (auto size = chkable.size(); size > 0) {
if (auto size_in_frames = this->bytes_to_frames(chkable.size());
size_in_frames > 0) {
LOG_TRACE << "adding " << chkable.description();
this->check_chunkable_size(size);
if (!segmentation_enabled() or size < window_size_) {
if (!segmentation_enabled() or size_in_frames < window_size_) {
// no point dealing with hashing, just write it out
add_data(chkable, 0, size);
add_data(chkable, 0, size_in_frames);
finish_chunk(chkable);
} else {
segment_and_add_data(chkable, size);
segment_and_add_data(chkable, size_in_frames);
}
}
}
@ -638,7 +837,9 @@ void segmenter_<LoggerPolicy, GranularityPolicy>::finish() {
}
if (stats_.total_matches > 0) {
LOG_INFO << "segmentation matches: good=" << stats_.good_matches
<< ", bad=" << stats_.bad_matches
<< ", bad=" << stats_.bad_matches << ", collisions="
<< (stats_.total_matches -
(stats_.bad_matches + stats_.good_matches))
<< ", total=" << stats_.total_matches;
}
if (stats_.total_hashes > 0) {
@ -673,9 +874,7 @@ void segmenter_<LoggerPolicy, GranularityPolicy>::block_ready() {
template <typename LoggerPolicy, typename GranularityPolicy>
void segmenter_<LoggerPolicy, GranularityPolicy>::append_to_block(
chunkable& chkable, size_t offset, size_t size) {
assert(this->is_valid_granularity_size(size));
chunkable& chkable, size_t offset_in_frames, size_t size_in_frames) {
if (blocks_.empty() or blocks_.back().full()) [[unlikely]] {
if (blocks_.size() >= std::max<size_t>(1, cfg_.max_active_blocks)) {
blocks_.pop_front();
@ -686,91 +885,117 @@ void segmenter_<LoggerPolicy, GranularityPolicy>::append_to_block(
global_filter_.merge(b.filter());
}
this->add_new_block(blocks_, blkmgr_->get_logical_block(), block_size_,
this->add_new_block(blocks_, LOG_GET_LOGGER,
repeating_sequence_hash_values_, repeating_collisions_,
blkmgr_->get_logical_block(), block_size_in_frames_,
cfg_.max_active_blocks > 0 ? window_size_ : 0,
window_step_, global_filter_.size());
}
auto const offset_in_bytes = this->frames_to_bytes(offset_in_frames);
auto const size_in_bytes = this->frames_to_bytes(size_in_frames);
auto& block = blocks_.back();
block.append(chkable.span().subspan(offset, size), global_filter_);
chunk_.size += size;
LOG_TRACE << "appending " << size_in_bytes << " bytes to block "
<< block.num() << " @ "
<< this->frames_to_bytes(block.size_in_frames())
<< " from chunkable offset " << offset_in_bytes;
prog_.filesystem_size += size;
block.append_bytes(chkable.span().subspan(offset_in_bytes, size_in_bytes),
global_filter_);
chunk_.size_in_frames += size_in_frames;
prog_.filesystem_size += size_in_bytes;
if (block.full()) [[unlikely]] {
chkable.release_until(offset + size);
chkable.release_until(offset_in_bytes + size_in_bytes);
finish_chunk(chkable);
block_ready();
}
}
template <typename LoggerPolicy, typename GranularityPolicy>
void segmenter_<LoggerPolicy, GranularityPolicy>::add_data(chunkable& chkable,
size_t offset,
size_t size) {
assert(this->is_valid_granularity_size(size));
while (size > 0) {
size_t block_offset = 0;
void segmenter_<LoggerPolicy, GranularityPolicy>::add_data(
chunkable& chkable, size_t offset_in_frames, size_t size_in_frames) {
while (size_in_frames > 0) {
size_t block_offset_in_frames = 0;
if (!blocks_.empty()) {
block_offset = blocks_.back().size();
block_offset_in_frames = blocks_.back().size_in_frames();
}
size_t chunk_size = std::min(size, block_size_ - block_offset);
size_t chunk_size_in_frames = std::min(
size_in_frames, block_size_in_frames_ - block_offset_in_frames);
append_to_block(chkable, offset, chunk_size);
append_to_block(chkable, offset_in_frames, chunk_size_in_frames);
offset += chunk_size;
size -= chunk_size;
offset_in_frames += chunk_size_in_frames;
size_in_frames -= chunk_size_in_frames;
}
}
template <typename LoggerPolicy, typename GranularityPolicy>
void segmenter_<LoggerPolicy, GranularityPolicy>::finish_chunk(
chunkable& chkable) {
if (chunk_.size > 0) {
if (chunk_.size_in_frames > 0) {
auto& block = blocks_.back();
chkable.add_chunk(block.num(), chunk_.offset, chunk_.size);
chunk_.offset = block.full() ? 0 : block.size();
chunk_.size = 0;
chkable.add_chunk(block.num(),
this->frames_to_bytes(chunk_.offset_in_frames),
this->frames_to_bytes(chunk_.size_in_frames));
chunk_.offset_in_frames = block.full() ? 0 : block.size_in_frames();
chunk_.size_in_frames = 0;
prog_.chunk_count++;
}
}
template <typename LoggerPolicy, typename GranularityPolicy>
void segmenter_<LoggerPolicy, GranularityPolicy>::segment_and_add_data(
chunkable& chkable, size_t size) {
chunkable& chkable, size_t size_in_frames) {
rsync_hash hasher;
size_t offset = 0;
size_t written = 0;
size_t lookback_size = window_size_ + window_step_;
size_t next_hash_offset =
lookback_size +
(blocks_.empty() ? window_step_ : blocks_.back().next_hash_distance());
auto data = chkable.span();
auto p = data.data();
size_t offset_in_frames = 0;
size_t frames_written = 0;
// TODO: can we potentially improve segmenter performance by using
// a larger lookback here?
size_t lookback_size_in_frames = window_size_ + window_step_;
size_t next_hash_offset_in_frames =
lookback_size_in_frames +
(blocks_.empty() ? window_step_
: blocks_.back().next_hash_distance_in_frames());
// auto data = chkable.span();
auto data = this->template create<
granular_span_adapter<uint8_t const, GranularityPolicy>>(chkable.span());
// auto p = data.data();
auto p = chkable.span().data();
assert(this->is_valid_granularity_size(size));
DWARFS_CHECK(size_in_frames >= window_size_,
"unexpected call to segment_and_add_data");
DWARFS_CHECK(size >= window_size_, "unexpected call to segment_and_add_data");
for (; offset < window_size_; ++offset) {
hasher.update(p[offset]);
for (; offset_in_frames < window_size_; ++offset_in_frames) {
// hasher.update(p[offset]);
data.update_hash(hasher, offset_in_frames);
}
folly::small_vector<segment_match<GranularityPolicy>, 1> matches;
folly::small_vector<segment_match<LoggerPolicy, GranularityPolicy>, 1>
matches;
const bool single_block_mode = cfg_.max_active_blocks == 1;
// TODO: we have multiple segmenter threads, so this doesn't fly anymore
auto total_bytes_read_before = prog_.total_bytes_read.load();
prog_.current_offset.store(offset);
prog_.current_size.store(size);
prog_.current_offset.store(this->frames_to_bytes(
offset_in_frames)); // TODO: what do we do with this?
prog_.current_size.store(this->frames_to_bytes(size_in_frames)); // TODO
while (offset < size) {
// TODO: matches need to work with frames
// TODO: how can we reasonably update the top progress bar with
// multiple concurrent segmenters?
while (offset_in_frames < size_in_frames) {
++stats_.bloom_lookups;
if (global_filter_.test(hasher())) [[unlikely]] {
++stats_.bloom_hits;
if (single_block_mode) { // TODO: can we constexpr this?
auto& block = blocks_.front();
block.for_each_offset(
@ -787,13 +1012,18 @@ void segmenter_<LoggerPolicy, GranularityPolicy>::segment_and_add_data(
++stats_.bloom_true_positives;
match_counts_.addValue(matches.size());
LOG_TRACE << "found " << matches.size() << " matches (hash=" << hasher()
LOG_TRACE << "[" << blocks_.back().num() << " @ "
<< this->frames_to_bytes(blocks_.back().size_in_frames())
<< ", chunkable @ " << this->frames_to_bytes(offset_in_frames)
<< "] found " << matches.size()
<< " matches (hash=" << fmt::format("{:08x}", hasher())
<< ", window size=" << window_size_ << ")";
for (auto& m : matches) {
LOG_TRACE << " block " << m.block_num() << " @ " << m.offset();
m.verify_and_extend(p + offset - window_size_, window_size_,
p + written, p + size);
m.verify_and_extend(p + offset_in_frames - window_size_, window_size_,
p + frames_written, p + size_in_frames);
LOG_TRACE << " -> " << m.offset() << " -> " << m.size();
}
stats_.total_matches += matches.size();
@ -811,36 +1041,41 @@ void segmenter_<LoggerPolicy, GranularityPolicy>::segment_and_add_data(
auto block_num = best->block_num();
auto match_off = best->offset();
auto num_to_write = best->data() - (p + written);
auto num_to_write = best->data() - (p + frames_written);
// best->block can be invalidated by this call to add_data()!
add_data(chkable, written, num_to_write);
written += num_to_write;
add_data(chkable, frames_written, num_to_write);
frames_written += num_to_write;
finish_chunk(chkable);
chkable.add_chunk(block_num, match_off, match_len);
prog_.chunk_count++;
written += match_len;
frames_written += match_len;
prog_.saved_by_segmentation += match_len;
offset = written;
offset_in_frames = frames_written;
if (size - written < window_size_) {
if (size_in_frames - frames_written < window_size_) {
break;
}
hasher.clear();
for (; offset < written + window_size_; ++offset) {
hasher.update(p[offset]);
for (; offset_in_frames < frames_written + window_size_;
++offset_in_frames) {
// hasher.update(p[offset]);
data.update_hash(hasher, offset_in_frames);
}
prog_.current_offset.store(offset);
prog_.total_bytes_read.store(total_bytes_read_before + offset);
prog_.current_offset.store(this->frames_to_bytes(
offset_in_frames)); // TODO: again, what's this?
prog_.total_bytes_read.store(total_bytes_read_before +
this->frames_to_bytes(offset_in_frames));
next_hash_offset =
written + lookback_size + blocks_.back().next_hash_distance();
next_hash_offset_in_frames =
frames_written + lookback_size_in_frames +
blocks_.back().next_hash_distance_in_frames();
}
matches.clear();
@ -852,25 +1087,31 @@ void segmenter_<LoggerPolicy, GranularityPolicy>::segment_and_add_data(
}
// no matches found, see if we can append data
// we need to keep at least lookback_size bytes unwritten
// we need to keep at least lookback_size_in_frames frames unwritten
if (offset == next_hash_offset) [[unlikely]] {
auto num_to_write = offset - lookback_size - written;
add_data(chkable, written, num_to_write);
written += num_to_write;
next_hash_offset += window_step_;
prog_.current_offset.store(offset);
prog_.total_bytes_read.store(total_bytes_read_before + offset);
if (offset_in_frames == next_hash_offset_in_frames) [[unlikely]] {
auto num_to_write =
offset_in_frames - lookback_size_in_frames - frames_written;
add_data(chkable, frames_written, num_to_write);
frames_written += num_to_write;
next_hash_offset_in_frames += window_step_;
prog_.current_offset.store(
this->frames_to_bytes(offset_in_frames)); // TODO: ???
prog_.total_bytes_read.store(
total_bytes_read_before +
this->frames_to_bytes(offset_in_frames)); // TODO: ???
}
hasher.update(p[offset - window_size_], p[offset]);
++offset;
// hasher.update(p[offset - window_size_], p[offset]);
data.update_hash(hasher, offset_in_frames - window_size_, offset_in_frames);
++offset_in_frames;
}
prog_.current_offset.store(size);
prog_.total_bytes_read.store(total_bytes_read_before + size);
prog_.current_offset.store(this->frames_to_bytes(size_in_frames)); // TODO
prog_.total_bytes_read.store(total_bytes_read_before +
this->frames_to_bytes(size_in_frames)); // TODO
add_data(chkable, written, size - written);
add_data(chkable, frames_written, size_in_frames - frames_written);
finish_chunk(chkable);
}