Add granularity policies to segmenter

This commit is contained in:
Marcus Holland-Moritz 2023-08-14 20:49:35 +02:00
parent f94d1c7def
commit a9ff74ee0c

View File

@ -52,6 +52,8 @@
namespace dwarfs {
namespace {
/**
* Segmenter Strategy
*
@ -221,15 +223,44 @@ class alignas(64) bloom_filter {
size_t const size_;
};
class active_block {
template <size_t N>
class ConstantGranularityPolicy {
public:
template <typename T>
void add_new_block(T& blocks, size_t num, size_t size, size_t window_size,
size_t window_step, size_t bloom_filter_size) {
blocks.emplace_back(num, size, window_size, window_step, bloom_filter_size);
}
};
class VariableGranularityPolicy {
public:
VariableGranularityPolicy(uint32_t granularity)
: granularity_{granularity} {}
template <typename T>
void add_new_block(T& blocks, size_t num, size_t size, size_t window_size,
size_t window_step, size_t bloom_filter_size) {
blocks.emplace_back(num, size, window_size, window_step, bloom_filter_size,
granularity_);
}
private:
uint_fast32_t granularity_;
};
template <typename GranularityPolicy>
class active_block : private GranularityPolicy {
private:
using offset_t = uint32_t;
using hash_t = uint32_t;
public:
template <typename... PolicyArgs>
active_block(size_t num, size_t size, size_t window_size, size_t window_step,
size_t bloom_filter_size)
: num_(num)
size_t bloom_filter_size, PolicyArgs&&... args)
: GranularityPolicy(std::forward<PolicyArgs>(args)...)
, num_(num)
, capacity_(size)
, window_size_(window_size)
, window_step_mask_(window_step - 1)
@ -278,24 +309,25 @@ class active_block {
private:
static constexpr size_t num_inline_offsets = 4;
size_t num_, capacity_, window_size_, window_step_mask_;
size_t const num_, capacity_, window_size_, window_step_mask_;
rsync_hash hasher_;
bloom_filter filter_;
fast_multimap<hash_t, offset_t, num_inline_offsets> offsets_;
std::shared_ptr<block_data> data_;
};
template <typename LoggerPolicy>
class segmenter_ final : public segmenter::impl {
template <typename LoggerPolicy, typename GranularityPolicy>
class segmenter_ final : public segmenter::impl, private GranularityPolicy {
public:
template <typename... PolicyArgs>
segmenter_(logger& lgr, progress& prog, std::shared_ptr<block_manager> blkmgr,
segmenter::config const& cfg, compression_constraints const& cc,
segmenter::block_ready_cb block_ready)
: LOG_PROXY_INIT(lgr)
segmenter::config const& cfg,
segmenter::block_ready_cb block_ready, PolicyArgs&&... args)
: GranularityPolicy(std::forward<PolicyArgs>(args)...)
, LOG_PROXY_INIT(lgr)
, prog_{prog}
, blkmgr_{std::move(blkmgr)}
, cfg_{cfg}
, granularity_{cc.granularity ? cc.granularity.value() : 1}
, block_ready_{std::move(block_ready)}
, window_size_{window_size(cfg)}
, window_step_{window_step(cfg)}
@ -351,7 +383,6 @@ class segmenter_ final : public segmenter::impl {
progress& prog_;
std::shared_ptr<block_manager> blkmgr_;
segmenter::config const cfg_;
uint_fast32_t const granularity_;
segmenter::block_ready_cb block_ready_;
size_t const window_size_;
@ -364,16 +395,21 @@ class segmenter_ final : public segmenter::impl {
segmenter_stats stats_;
using active_block_type = active_block<GranularityPolicy>;
// Active blocks are blocks that can still be referenced from new chunks.
// Up to N blocks (configurable) can be active and are kept in this queue.
// All active blocks except for the last one are immutable and potentially
// already being compressed.
std::deque<active_block> blocks_;
std::deque<active_block_type> blocks_;
};
template <typename GranularityPolicy>
class segment_match {
public:
segment_match(active_block const* blk, uint32_t off) noexcept
using active_block_type = active_block<GranularityPolicy>;
segment_match(active_block_type const* blk, uint32_t off) noexcept
: block_{blk}
, offset_{off} {}
@ -393,13 +429,15 @@ class segment_match {
size_t block_num() const { return block_->num(); }
private:
active_block const* block_;
active_block_type const* block_;
uint32_t offset_;
uint32_t size_{0};
uint8_t const* data_{nullptr};
};
void active_block::append(std::span<uint8_t const> data, bloom_filter* filter) {
template <typename GranularityPolicy>
void active_block<GranularityPolicy>::append(std::span<uint8_t const> data,
bloom_filter* filter) {
auto& v = data_->vec();
auto offset = v.size();
DWARFS_CHECK(offset + data.size() <= capacity_,
@ -427,7 +465,9 @@ void active_block::append(std::span<uint8_t const> data, bloom_filter* filter) {
}
}
void segment_match::verify_and_extend(uint8_t const* pos, size_t len,
template <typename GranularityPolicy>
void segment_match<GranularityPolicy>::verify_and_extend(uint8_t const* pos,
size_t len,
uint8_t const* begin,
uint8_t const* end) {
auto const& v = block_->data()->vec();
@ -454,11 +494,20 @@ void segment_match::verify_and_extend(uint8_t const* pos, size_t len,
}
}
template <typename LoggerPolicy>
void segmenter_<LoggerPolicy>::add_chunkable(chunkable& chkable) {
template <typename LoggerPolicy, typename GranularityPolicy>
void segmenter_<LoggerPolicy, GranularityPolicy>::add_chunkable(
chunkable& chkable) {
if (auto size = chkable.size(); size > 0) {
LOG_TRACE << "adding " << chkable.description();
// if (granularity_ > 1) {
// DWARFS_CHECK(
// size % granularity_ == 0,
// fmt::format(
// "unexpected size {} for given granularity {} (modulus: {})",
// size, granularity_, size % granularity_));
// }
if (!segmentation_enabled() or size < window_size_) {
// no point dealing with hashing, just write it out
add_data(chkable, 0, size);
@ -469,8 +518,8 @@ void segmenter_<LoggerPolicy>::add_chunkable(chunkable& chkable) {
}
}
template <typename LoggerPolicy>
void segmenter_<LoggerPolicy>::finish() {
template <typename LoggerPolicy, typename GranularityPolicy>
void segmenter_<LoggerPolicy, GranularityPolicy>::finish() {
if (!blocks_.empty() && !blocks_.back().full()) {
block_ready();
}
@ -512,8 +561,8 @@ void segmenter_<LoggerPolicy>::finish() {
}
}
template <typename LoggerPolicy>
void segmenter_<LoggerPolicy>::block_ready() {
template <typename LoggerPolicy, typename GranularityPolicy>
void segmenter_<LoggerPolicy, GranularityPolicy>::block_ready() {
auto& block = blocks_.back();
block.finalize(stats_);
auto written_block_num = block_ready_(block.data());
@ -521,9 +570,9 @@ void segmenter_<LoggerPolicy>::block_ready() {
++prog_.block_count;
}
template <typename LoggerPolicy>
void segmenter_<LoggerPolicy>::append_to_block(chunkable& chkable,
size_t offset, size_t size) {
template <typename LoggerPolicy, typename GranularityPolicy>
void segmenter_<LoggerPolicy, GranularityPolicy>::append_to_block(
chunkable& chkable, size_t offset, size_t size) {
if (blocks_.empty() or blocks_.back().full()) [[unlikely]] {
if (blocks_.size() >= std::max<size_t>(1, cfg_.max_active_blocks)) {
blocks_.pop_front();
@ -534,7 +583,7 @@ void segmenter_<LoggerPolicy>::append_to_block(chunkable& chkable,
filter_.merge(b.filter());
}
blocks_.emplace_back(blkmgr_->get_logical_block(), block_size_,
this->add_new_block(blocks_, blkmgr_->get_logical_block(), block_size_,
cfg_.max_active_blocks > 0 ? window_size_ : 0,
window_step_, filter_.size());
}
@ -553,8 +602,9 @@ void segmenter_<LoggerPolicy>::append_to_block(chunkable& chkable,
}
}
template <typename LoggerPolicy>
void segmenter_<LoggerPolicy>::add_data(chunkable& chkable, size_t offset,
template <typename LoggerPolicy, typename GranularityPolicy>
void segmenter_<LoggerPolicy, GranularityPolicy>::add_data(chunkable& chkable,
size_t offset,
size_t size) {
while (size > 0) {
size_t block_offset = 0;
@ -572,8 +622,9 @@ void segmenter_<LoggerPolicy>::add_data(chunkable& chkable, size_t offset,
}
}
template <typename LoggerPolicy>
void segmenter_<LoggerPolicy>::finish_chunk(chunkable& chkable) {
template <typename LoggerPolicy, typename GranularityPolicy>
void segmenter_<LoggerPolicy, GranularityPolicy>::finish_chunk(
chunkable& chkable) {
if (chunk_.size > 0) {
auto& block = blocks_.back();
chkable.add_chunk(block.num(), chunk_.offset, chunk_.size);
@ -583,9 +634,9 @@ void segmenter_<LoggerPolicy>::finish_chunk(chunkable& chkable) {
}
}
template <typename LoggerPolicy>
void segmenter_<LoggerPolicy>::segment_and_add_data(chunkable& chkable,
size_t size) {
template <typename LoggerPolicy, typename GranularityPolicy>
void segmenter_<LoggerPolicy, GranularityPolicy>::segment_and_add_data(
chunkable& chkable, size_t size) {
rsync_hash hasher;
size_t offset = 0;
size_t written = 0;
@ -602,7 +653,8 @@ void segmenter_<LoggerPolicy>::segment_and_add_data(chunkable& chkable,
hasher.update(p[offset]);
}
std::vector<segment_match> matches;
// TODO: try folly::small_vector?
std::vector<segment_match<GranularityPolicy>> matches;
const bool single_block_mode = cfg_.max_active_blocks == 1;
auto total_bytes_read_before = prog_.total_bytes_read.load();
@ -615,14 +667,12 @@ void segmenter_<LoggerPolicy>::segment_and_add_data(chunkable& chkable,
++stats_.bloom_hits;
if (single_block_mode) {
auto& block = blocks_.front();
block.for_each_offset(hasher(), [&](uint32_t offset) {
matches.emplace_back(&block, offset);
});
block.for_each_offset(
hasher(), [&](auto off) { matches.emplace_back(&block, off); });
} else {
for (auto const& block : blocks_) {
block.for_each_offset_filter(hasher(), [&](uint32_t offset) {
matches.emplace_back(&block, offset);
});
block.for_each_offset_filter(
hasher(), [&](auto off) { matches.emplace_back(&block, off); });
}
}
@ -716,11 +766,43 @@ void segmenter_<LoggerPolicy>::segment_and_add_data(chunkable& chkable,
finish_chunk(chkable);
}
template <size_t N>
struct constant_granularity_segmenter_ {
template <typename LoggerPolicy>
using type = segmenter_<LoggerPolicy, ConstantGranularityPolicy<N>>;
};
struct variable_granularity_segmenter_ {
template <typename LoggerPolicy>
using type = segmenter_<LoggerPolicy, VariableGranularityPolicy>;
};
std::unique_ptr<segmenter::impl>
create_segmenter(logger& lgr, progress& prog,
std::shared_ptr<block_manager> blkmgr,
segmenter::config const& cfg,
compression_constraints const& cc,
segmenter::block_ready_cb block_ready) {
if (!cc.granularity || cc.granularity.value() == 1) {
return make_unique_logging_object<segmenter::impl,
constant_granularity_segmenter_<1>::type,
logger_policies>(
lgr, prog, std::move(blkmgr), cfg, std::move(block_ready));
}
return make_unique_logging_object<
segmenter::impl, variable_granularity_segmenter_::type, logger_policies>(
lgr, prog, std::move(blkmgr), cfg, std::move(block_ready),
cc.granularity.value());
}
} // namespace
segmenter::segmenter(logger& lgr, progress& prog,
std::shared_ptr<block_manager> blkmgr, config const& cfg,
compression_constraints const& cc,
block_ready_cb block_ready)
: impl_(make_unique_logging_object<impl, segmenter_, logger_policies>(
lgr, prog, std::move(blkmgr), cfg, cc, std::move(block_ready))) {}
: impl_(create_segmenter(lgr, prog, std::move(blkmgr), cfg, cc,
std::move(block_ready))) {}
} // namespace dwarfs