From 09d0ff50bdc96360e0c660c3ff73116fdb095eeb Mon Sep 17 00:00:00 2001 From: Marcus Holland-Moritz Date: Wed, 3 Mar 2021 12:41:19 +0100 Subject: [PATCH] Replace block manager implementation --- CMakeLists.txt | 2 +- src/dwarfs/block_manager.cpp | 800 ++++++++++++++++---------------- src/dwarfs/block_manager_v2.cpp | 537 --------------------- 3 files changed, 410 insertions(+), 929 deletions(-) delete mode 100644 src/dwarfs/block_manager_v2.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 0af80489..c423f968 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -192,7 +192,7 @@ list( LIBDWARFS_SRC src/dwarfs/block_cache.cpp src/dwarfs/block_compressor.cpp - src/dwarfs/block_manager_v2.cpp + src/dwarfs/block_manager.cpp src/dwarfs/checksum.cpp src/dwarfs/console_writer.cpp src/dwarfs/entry.cpp diff --git a/src/dwarfs/block_manager.cpp b/src/dwarfs/block_manager.cpp index 8d9a21fa..c43a1400 100644 --- a/src/dwarfs/block_manager.cpp +++ b/src/dwarfs/block_manager.cpp @@ -22,9 +22,10 @@ #include #include #include -#include +#include #include #include +#include #include #include @@ -32,13 +33,16 @@ #include +#include + #include "dwarfs/block_data.h" #include "dwarfs/block_manager.h" +#include "dwarfs/compiler.h" +#include "dwarfs/cyclic_hash.h" #include "dwarfs/entry.h" #include "dwarfs/error.h" #include "dwarfs/filesystem_writer.h" #include "dwarfs/inode.h" -#include "dwarfs/inode_hasher.h" #include "dwarfs/logger.h" #include "dwarfs/mmif.h" #include "dwarfs/os_access.h" @@ -47,467 +51,481 @@ namespace dwarfs { -namespace { -// TODO: for now, this has to work better... -// we lose this hash value, but that's life... -const uint32_t blockhash_emtpy = 0; +/** + * Block Manager Strategy + * + * For each *block*, start new rolling hash. The the hashes are associcated + * with the block, new hash-offset-pairs will only be added as the block + * grows. We only need to store a hash-offset-pair every N bytes, with N + * being configurable (typically half of the window size so we find all + * matches of at least 1.5 times the window size, but could also be smaller). + * + * For each *file*, start new rolling hash. Hash values *expire* immediately, + * no history of hash values is kept. Up to n blocks (typically only one) + * will be scanned for matches. Old file data beyond the moving window will + * be added to the current *block*, causing the rolling *block* hash to also + * advance. Data needs to be added to the block only in increments at which + * a new hash valus is computed. + * + * This strategy ensures that we're using a deterministic amount of memory + * (proportional to block size and history block count). + * + * A single window size is sufficient. That window size should still be + * configurable. + */ struct bm_stats { + bm_stats() + : l2_collision_vec_size(1, 0, 128) {} + size_t total_hashes{0}; - size_t collisions{0}; - size_t real_matches{0}; + size_t l2_collisions{0}; + size_t total_matches{0}; + size_t good_matches{0}; size_t bad_matches{0}; - size_t saved_bytes{0}; - size_t largest_block{0}; -}; -} // namespace - -template -struct block_hashes { - static constexpr size_t max_coll_inline = 2; - using blockhash_t = google::dense_hash_map; - - block_hashes(size_t size) - : size(size) { - values.set_empty_key(blockhash_emtpy); - } - - const size_t size; - blockhash_t values; + folly::Histogram l2_collision_vec_size; }; -template -class block_manager_ : public block_manager::impl { +template +class fast_multimap { private: - using cyclic_hash_t = uint32_t; - using offset_t = uint32_t; - using block_hashes_t = block_hashes; - using bhv_ri = std::vector::reverse_iterator; - using hasher_type = inode_hasher; - using hash_map_type = typename hasher_type::result_type; - - struct match_window { - match_window() - : first(0) - , last(0) {} - - match_window(size_t first, size_t last) - : first(first) - , last(last) {} - - size_t size() const { return last - first; } - - size_t first; - size_t last; - }; - - template - static std::vector sorted(const std::vector& vec) { - std::vector tmp(vec); - std::sort(tmp.begin(), tmp.end()); - return tmp; - } + using collision_vector = folly::small_vector; + using blockhash_t = google::dense_hash_map; + using collision_t = std::unordered_map; public: - block_manager_(logger& lgr, progress& prog, const block_manager::config& cfg, - std::shared_ptr os, filesystem_writer& fsw) - : cfg_(cfg) - , block_size_(static_cast(1) << cfg.block_size_bits) - , blockhash_window_size_(sorted(cfg.blockhash_window_size)) - , fsw_(fsw) - , os_(os) - , current_block_(0) - , total_blocks_size_(0) - , block_{std::make_shared()} - , hasher_(lgr, blockhash_window_size_) - , log_(lgr) - , prog_(prog) { - block_->vec().reserve(block_size_); + fast_multimap() { values_.set_empty_key(EmptyKey); } - for (auto size : blockhash_window_size_) { - block_hashes_.emplace_back(size); + void insert(KeyT const& key, ValT const& val) { + if (key == EmptyKey or !values_.insert(std::make_pair(key, val)).second) { + collisions_[key].emplace_back(val); } } - void add_inode(std::shared_ptr ino) override; - void finish_blocks() override; - - size_t total_size() const override { return total_blocks_size_; } - - size_t total_blocks() const override { - return total_blocks_size_ > 0 ? current_block_ + 1 : 0; - } - - private: - size_t cur_offset() const { return block_->size(); } - - void block_ready(); - void update_hashes(const hash_map_type& hm, size_t offset, size_t size); - void add_chunk(const std::shared_ptr& ino, const uint8_t* p, - size_t offset, size_t size, const hash_map_type* hm); - void add_data(const std::shared_ptr& ino, const uint8_t* p, - size_t size, const hash_map_type* hm = nullptr); - void segment_and_add_data(const hash_map_type& hm, - const std::shared_ptr& ino, - const uint8_t* data, size_t size); - void - segment_and_add_data(const std::string& indent, const hash_map_type& hm, - const std::shared_ptr& ino, const uint8_t* data, - match_window mw, bhv_ri bhcur, bhv_ri bhend); - bool get_match_window(const std::string& indent, match_window& win, - size_t& block_offset, const uint8_t* data, - const match_window& search_win) const; - - void - validate_match(const std::string& indent, const uint8_t* data, - match_window& mw, const std::vector& hashvec, - bm_stats& stats, size_t block_size, size_t block_offset, - size_t off, match_window& best, size_t& best_offset); - - const block_manager::config& cfg_; - const size_t block_size_; - std::vector blockhash_window_size_; - filesystem_writer& fsw_; - std::shared_ptr os_; - size_t current_block_; - size_t total_blocks_size_; - std::shared_ptr block_; - std::vector block_hashes_; - hasher_type hasher_; - hash_map_type hm_; - log_proxy log_; - progress& prog_; - std::map stats_; -}; - -template -void block_manager_::finish_blocks() { - if (!block_->empty()) { - block_ready(); - } - - for (const auto& sti : stats_) { - static char const* const percent = "{:.2}%"; - const auto& st = sti.second; - LOG_DEBUG << "blockhash window <" << sti.first << ">: " << st.collisions - << " collisions (" - << fmt::format(percent, float(st.collisions) / st.total_hashes) - << "), " << st.real_matches << " real matches, " << st.bad_matches - << " bad matches, (" - << fmt::format(percent, float(st.bad_matches) / - (st.real_matches + st.bad_matches)) - << "), " << size_with_unit(st.saved_bytes) - << " saved (largest=" << size_with_unit(st.largest_block) << ")"; - } -} - -template -void block_manager_::block_ready() { - auto tmp = std::make_shared(); - block_.swap(tmp); - fsw_.write_block(std::move(tmp)); - block_->vec().reserve(block_size_); - for (auto& bh : block_hashes_) { - bh.values.clear(); - } - ++current_block_; - ++prog_.block_count; -} - -template -void block_manager_::update_hashes(const hash_map_type& hm, - size_t offset, size_t size) { - size_t block_offset = cur_offset(); - - for (auto bhi = block_hashes_.begin(); bhi != block_hashes_.end(); ++bhi) { - if (bhi->size <= size) { - auto hmi = hm.find(bhi->size); - auto& stats = stats_[bhi->size]; - - if (hmi == hm.end()) { - DWARFS_THROW(runtime_error, - "internal error: no hash found for window size"); - } - - const auto& hashvec = hmi->second; - size_t incr = bhi->size >> cfg_.window_increment_shift; - size_t last = (size - bhi->size) + 1; - - if (hashvec.size() < offset + last) { - LOG_ERROR << "bhi=" << bhi->size - << ", hashvec.size()=" << hashvec.size() - << ", offset=" << offset << ", last=" << last; - DWARFS_THROW(runtime_error, "internal error: hashvec too small"); - } - - for (size_t off = 0; off < last; off += incr) { - cyclic_hash_t hval = hashvec[offset + off]; - - ++stats.total_hashes; - - if (hval != blockhash_emtpy) { - if constexpr (std::is_same_v) { - bhi->values[hval] = block_offset + off; - } else { - auto new_offest = block_offset + off; - auto [it, success] = - bhi->values.insert(std::make_pair(hval, new_offest)); - - if (!success) { - LOG_TRACE << "collision for hash=" << hval - << " (size=" << bhi->size << "): " << it->second - << " <-> " << new_offest; - - ++stats.collisions; - - it->second = new_offest; - } - } + template + void for_each_value(KeyT const& key, F&& func) const { + if (auto it = values_.find(key); it != values_.end()) { + func(it->second); + if (auto it2 = collisions_.find(key); it2 != collisions_.end()) { + for (auto const& val : it2->second) { + func(val); } } } } -} -template -void block_manager_::add_chunk(const std::shared_ptr& ino, - const uint8_t* p, size_t offset, - size_t size, - const hash_map_type* hm) { - LOG_TRACE << "block " << current_block_ << " size: " << block_->size() - << " of " << block_size_; - - if (hm) { - update_hashes(*hm, offset, size); + void clear() { + values_.clear(); + collisions_.clear(); } - size_t block_offset = cur_offset(); + blockhash_t const& values() const { return values_; }; + collision_t const& collisions() const { return collisions_; }; - LOG_TRACE << "adding chunk for inode " << ino->num() << " [" - << ino->any()->name() << "] - block: " << current_block_ - << " offset: " << block_offset << ", size: " << size; + private: + blockhash_t values_; + collision_t collisions_; +}; - block_->vec().resize(block_offset + size); +class active_block { + private: + using offset_t = uint32_t; + using hash_t = uint32_t; - ::memcpy(block_->data() + block_offset, p + offset, size); - - ino->add_chunk(current_block_, block_offset, size); - prog_.chunk_count++; - prog_.filesystem_size += size; - - if (block_->size() == block_size_) { - block_ready(); + public: + active_block(size_t num, size_t size, size_t window_size, size_t window_step) + : num_(num) + , capacity_(size) + , window_size_(window_size) + , window_step_mask_(window_step - 1) + , data_{std::make_shared()} { + DWARFS_CHECK((window_step & window_step_mask_) == 0, + "window step size not a power of two"); + data_->vec().reserve(capacity_); } -} -template -void block_manager_::add_data(const std::shared_ptr& ino, - const uint8_t* p, size_t size, - const hash_map_type* hm) { - size_t offset = 0; + size_t num() const { return num_; } + size_t size() const { return data_->size(); } - while (size > 0) { - size_t block_offset = cur_offset(); + bool full() const { return size() == capacity_; } + std::shared_ptr data() const { return data_; } - if (block_offset == block_size_) { - // apparently we've filled a block, so a new one will be created once - // we're adding another chunk - block_offset = 0; + void append(uint8_t const* p, size_t size); + + size_t next_hash_distance() const { + return window_step_mask_ + 1 - (data_->vec().size() & window_step_mask_); + } + + template + void for_each_offset(hash_t key, F&& func) const { + offsets_.for_each_value(key, std::forward(func)); + } + + void finalize(bm_stats& stats) { + stats.total_hashes += offsets_.values().size(); + for (auto& c : offsets_.collisions()) { + stats.total_hashes += c.second.size(); + stats.l2_collisions += c.second.size() - 1; + stats.l2_collision_vec_size.addValue(c.second.size()); } + } - if (block_offset + size <= block_size_) { - add_chunk(ino, p, offset, size, hm); - break; + private: + static constexpr size_t num_inline_offsets = 4; + + size_t num_, capacity_, window_size_, window_step_mask_; + rsync_hash hasher_; + fast_multimap offsets_; + std::shared_ptr data_; +}; + +template +class block_manager_ : public block_manager::impl { + public: + block_manager_(logger& lgr, progress& prog, const block_manager::config& cfg, + std::shared_ptr os, filesystem_writer& fsw) + : log_{lgr} + , prog_{prog} + , cfg_{cfg} + , os_{std::move(os)} + , fsw_{fsw} + , window_size_{cfg.blockhash_window_size > 0 + ? static_cast(1) << cfg.blockhash_window_size + : 0} + , window_step_{window_size_ >> cfg.window_increment_shift} + , block_size_{static_cast(1) << cfg.block_size_bits} {} + + void add_inode(std::shared_ptr ino) override; + void finish_blocks() override; + + private: + struct chunk_state { + size_t offset{0}; + size_t size{0}; + }; + + void block_ready(); + void finish_chunk(inode& ino); + void append_to_block(inode& ino, mmif& mm, size_t offset, size_t size); + void add_data(inode& ino, mmif& mm, size_t offset, size_t size); + void segment_and_add_data(inode& ino, mmif& mm, size_t size); + + log_proxy log_; + progress& prog_; + const block_manager::config& cfg_; + std::shared_ptr os_; + filesystem_writer& fsw_; + + size_t const window_size_; + size_t const window_step_; + size_t const block_size_; + size_t block_count_{0}; + + chunk_state chunk_; + + bm_stats stats_; + + // Active blocks are blocks that can still be referenced from new chunks. + // Up to N blocks (configurable) can be active and are kept in this queue. + // All active blocks except for the last one are immutable and potentially + // already being compressed. + std::deque blocks_; +}; + +class segment_match { + public: + segment_match(active_block const* blk, uint32_t off) noexcept + : block_{blk} + , offset_{off} {} + + void verify_and_extend(uint8_t const* pos, size_t len, uint8_t const* begin, + uint8_t const* end); + + bool operator<(segment_match const& rhs) const { + return size_ < rhs.size_ || + (size_ == rhs.size_ && + (block_->num() < rhs.block_->num() || + (block_->num() == rhs.block_->num() && offset_ < rhs.offset_))); + } + + uint8_t const* data() const { return data_; } + uint32_t size() const { return size_; } + uint32_t offset() const { return offset_; } + size_t block_num() const { return block_->num(); } + + private: + active_block const* block_; + uint32_t offset_; + uint32_t size_{0}; + uint8_t const* data_; +}; + +void active_block::append(uint8_t const* p, size_t size) { + auto& v = data_->vec(); + auto offset = v.size(); + DWARFS_CHECK(offset + size <= capacity_, + fmt::format("block capacity exceeded: {} + {} > {}", offset, + size, capacity_)); + v.resize(offset + size); + ::memcpy(v.data() + offset, p, size); + + while (offset < v.size()) { + if (offset < window_size_) { + hasher_.update(v[offset++]); } else { - size_t chunksize = block_size_ - block_offset; - add_chunk(ino, p, offset, chunksize, hm); - offset += chunksize; - size -= chunksize; + hasher_.update(v[offset - window_size_], v[offset]); + if ((++offset & window_step_mask_) == 0) { + offsets_.insert(hasher_(), offset - window_size_); + } } } } +void segment_match::verify_and_extend(uint8_t const* pos, size_t len, + uint8_t const* begin, + uint8_t const* end) { + auto const& v = block_->data()->vec(); + + if (::memcmp(v.data() + offset_, pos, len) == 0) { + // scan backward + auto tmp = offset_; + while (tmp > 0 && pos > begin && v[tmp - 1] == pos[-1]) { + --tmp; + --pos; + } + len += offset_ - tmp; + offset_ = tmp; + data_ = pos; + + // scan forward + pos += len; + tmp = offset_ + len; + while (tmp < v.size() && pos < end && v[tmp] == *pos) { + ++tmp; + ++pos; + } + size_ = tmp - offset_; + } +} + template void block_manager_::add_inode(std::shared_ptr ino) { auto e = ino->any(); - size_t size = e->size(); - if (size > 0) { + if (size_t size = e->size(); size > 0) { auto mm = os_->map_file(e->path(), size); LOG_TRACE << "adding inode " << ino->num() << " [" << ino->any()->name() << "] - size: " << size; - if (blockhash_window_size_.empty() or - size < blockhash_window_size_.front()) { - // no point dealing with hashes, just write it out - // XXX: might be worth checking if the whole file has a match? - add_data(ino, mm->as(), size); + if (window_size_ == 0 or size < window_size_) { + // no point dealing with hashing, just write it out + add_data(*ino, *mm, 0, size); + finish_chunk(*ino); } else { - const uint8_t* data = mm->as(); - - // XXX: hm_ is reused to avoid reallocations - hasher_(hm_, data, size); - segment_and_add_data(hm_, ino, data, size); + segment_and_add_data(*ino, *mm, size); } } } template -void block_manager_::segment_and_add_data( - const hash_map_type& hm, const std::shared_ptr& ino, - const uint8_t* data, size_t size) { - ; - segment_and_add_data("", hm, ino, data, match_window(0, size), - block_hashes_.rbegin(), block_hashes_.rend()); -} +void block_manager_::finish_blocks() { + if (!blocks_.empty()) { + block_ready(); + } -template -void block_manager_::validate_match( - const std::string& indent, const uint8_t* data, match_window& mw, - const std::vector& hashvec, bm_stats& stats, - size_t block_size, size_t block_offset, size_t off, match_window& best, - size_t& best_offset) { - LOG_TRACE << indent << "potentially matched " << block_size - << " bytes at offset " << off << ", hash=" << hashvec[off]; + auto l1_collisions = stats_.l2_collision_vec_size.computeTotalCount(); - match_window win(off, off + block_size); + LOG_INFO << "segmentation matches: good=" << stats_.good_matches + << ", bad=" << stats_.bad_matches + << ", total=" << stats_.total_matches; + LOG_INFO << "segmentation collisions: L1=" + << fmt::format("{:.3f}%", + 100.0 * (l1_collisions + stats_.l2_collisions) / + stats_.total_hashes) + << ", L2=" + << fmt::format("{:.3f}%", + 100.0 * stats_.l2_collisions / stats_.total_hashes) + << " [" << stats_.total_hashes << " hashes]"; - if (get_match_window(indent, win, block_offset, data, mw)) { - LOG_TRACE << indent << "definitely matched " << win.size() - << " bytes at offset " << win.first; - ++stats.real_matches; - - // fuck yeah, we've got a block... - - if (win.size() > best.size()) { - best = win; - best_offset = block_offset; - } - } else { - LOG_TRACE << indent << "bad match: " << block_size << " bytes at offset " - << off << ", hash=" << hashvec[off]; - ++stats.bad_matches; + if (l1_collisions > 0) { + auto pct = [&](double p) { + return stats_.l2_collision_vec_size.getPercentileEstimate(p); + }; + LOG_DEBUG << "collision vector size p50: " << pct(0.5) + << ", p75: " << pct(0.75) << ", p90: " << pct(0.9) + << ", p95: " << pct(0.95) << ", p99: " << pct(0.99); } } template -void block_manager_::segment_and_add_data( - const std::string& indent, const hash_map_type& hm, - const std::shared_ptr& ino, const uint8_t* data, match_window mw, - bhv_ri bhcur, bhv_ri bhend) { - LOG_TRACE << indent << "segment_and_add_data([" << mw.first << ", " << mw.last - << "], " << (bhcur != bhend ? bhcur->size : 0) << ")"; +void block_manager_::block_ready() { + auto& block = blocks_.back(); + block.finalize(stats_); + fsw_.write_block(block.data()); + ++prog_.block_count; +} - while (bhcur != bhend) { - LOG_TRACE << indent << " wsize=" << bhcur->size; +template +void block_manager_::append_to_block(inode& ino, mmif& mm, + size_t offset, size_t size) { + if (DWARFS_UNLIKELY(blocks_.empty() or blocks_.back().full())) { + if (blocks_.size() >= cfg_.max_active_blocks) { + blocks_.pop_front(); + } - if (bhcur->size <= mw.size()) { - auto hmi = hm.find(bhcur->size); - auto& stats = stats_[bhcur->size]; + blocks_.emplace_back(block_count_++, block_size_, window_size_, + window_step_); + } - if (hmi == hm.end()) { - DWARFS_THROW(runtime_error, - "internal error: no hash found for window size"); + auto& block = blocks_.back(); + + block.append(mm.as(offset), size); + chunk_.size += size; + + prog_.filesystem_size += size; + + if (block.full()) { + mm.release_until(offset + size); + finish_chunk(ino); + block_ready(); + } +} + +template +void block_manager_::add_data(inode& ino, mmif& mm, size_t offset, + size_t size) { + while (size > 0) { + size_t block_offset = 0; + + if (!blocks_.empty()) { + block_offset = blocks_.back().size(); + } + + size_t chunk_size = std::min(size, block_size_ - block_offset); + + append_to_block(ino, mm, offset, chunk_size); + + offset += chunk_size; + size -= chunk_size; + } +} + +template +void block_manager_::finish_chunk(inode& ino) { + if (chunk_.size > 0) { + auto& block = blocks_.back(); + ino.add_chunk(block.num(), chunk_.offset, chunk_.size); + chunk_.offset = block.full() ? 0 : block.size(); + chunk_.size = 0; + prog_.chunk_count++; + } +} + +template +void block_manager_::segment_and_add_data(inode& ino, mmif& mm, + size_t size) { + rsync_hash hasher; + size_t offset = 0; + size_t written = 0; + size_t lookback_size = window_size_ + window_step_; + size_t next_hash_offset = + lookback_size + + (blocks_.empty() ? window_step_ : blocks_.back().next_hash_distance()); + auto p = mm.as(); + + DWARFS_CHECK(size >= window_size_, "unexpected call to segment_and_add_data"); + + for (; offset < window_size_; ++offset) { + hasher.update(p[offset]); + } + + std::vector matches; + + while (offset < size) { + for (auto const& block : blocks_) { + block.for_each_offset(hasher(), [&](uint32_t offset) { + matches.emplace_back(&block, offset); + }); + } + + if (!matches.empty()) { + LOG_TRACE << "found " << matches.size() << " matches (hash=" << hasher() + << ", window size=" << window_size_ << ")"; + + for (auto& m : matches) { + LOG_TRACE << " block " << m.block_num() << " @ " << m.offset(); + m.verify_and_extend(p + offset - window_size_, window_size_, + p + written, p + size); } - const auto& hashvec = hmi->second; + stats_.total_matches += matches.size(); + stats_.bad_matches += + std::count_if(matches.begin(), matches.end(), + [](auto const& m) { return m.size() == 0; }); - for (size_t off = mw.first; off <= mw.last - bhcur->size;) { - match_window best; - size_t best_offset = 0; + auto best = std::max_element(matches.begin(), matches.end()); + auto match_len = best->size(); - if (auto bhi = bhcur->values.find(hashvec[off]); - bhi != bhcur->values.end()) { - validate_match(indent, data, mw, hashvec, stats, bhcur->size, - bhi->second, off, best, best_offset); + if (match_len > 0) { + ++stats_.good_matches; + LOG_TRACE << "successful match of length " << match_len << " @ " + << best->offset(); + + auto block_num = best->block_num(); + auto match_off = best->offset(); + auto num_to_write = best->data() - (p + written); + + // best->block can be invalidated by this call to add_data()! + add_data(ino, mm, written, num_to_write); + written += num_to_write; + finish_chunk(ino); + + ino.add_chunk(block_num, match_off, match_len); + written += match_len; + + prog_.saved_by_segmentation += match_len; + + offset = written; + + if (size - written < window_size_) { + break; } - if (best.size() > 0) { - LOG_TRACE << indent << "mw=[" << mw.first << ", " << mw.last - << "], best=[" << best.first << ", " << best.last << "]"; + hasher.clear(); - // 1) search for smaller blocks on the left recursively - match_window left(mw.first, best.first); - LOG_TRACE << indent << "left=[" << left.first << ", " << left.last - << "]"; - - // 2) save the block number before recursing, as current_block_ - // may change before we add the chunk - size_t block_no = current_block_; - - // 3) divide and conquer! - segment_and_add_data(indent + " ", hm, ino, data, left, bhcur + 1, - bhend); - - // 4) add the block we found - LOG_TRACE << "adding (existing) chunk for inode " << ino->num() - << " [" << ino->any()->name() - << "] - offset: " << best_offset - << ", size: " << best.size(); - - ino->add_chunk(block_no, best_offset, best.size()); - prog_.chunk_count++; - prog_.saved_by_segmentation += best.size(); - stats.saved_bytes += best.size(); - if (best.size() > stats.largest_block) { - stats.largest_block = best.size(); - } - - // 5) continue to look for more blocks on the right and - // make sure that we never go back to the left again - mw.first = best.last; - off = mw.first; - LOG_TRACE << indent << "updated mw=[" << mw.first << ", " << mw.last - << "]"; - } else { - ++off; + for (; offset < written + window_size_; ++offset) { + hasher.update(p[offset]); } + + next_hash_offset = + written + lookback_size + blocks_.back().next_hash_distance(); + } + + matches.clear(); + + if (match_len > 0) { + continue; } } - ++bhcur; + // no matches found, see if we can append data + // we need to keep at least lookback_size bytes unwritten + + if (offset == next_hash_offset) { + auto num_to_write = offset - lookback_size - written; + add_data(ino, mm, written, num_to_write); + written += num_to_write; + next_hash_offset += window_step_; + } + + hasher.update(p[offset - window_size_], p[offset]); + ++offset; } - add_data(ino, data + mw.first, mw.size(), &hm); -} - -template -bool block_manager_::get_match_window( - const std::string& indent, match_window& win, size_t& block_offset, - const uint8_t* data, const match_window& search_win) const { - const uint8_t* blockdata = block_->data(); - - LOG_TRACE << indent << "match(block_offset=" << block_offset << ", window=[" - << win.first << ", " << win.last << "], search_win=[" - << search_win.first << ", " << search_win.last << "])"; - - if (block_offset + win.size() > block_size_) { - LOG_TRACE << indent << "bad match size"; - return false; - } - - if (::memcmp(blockdata + block_offset, data + win.first, win.size()) != 0) { - LOG_TRACE << indent << "block data mismatch"; - return false; - } - - // Looking good! Let's see how much we can get out of it... - - while (block_offset + win.size() < block_size_ and - win.last < search_win.last and - block_offset + win.size() < block_->size() and - blockdata[block_offset + win.size()] == data[win.last]) { - ++win.last; - } - - while (win.first > search_win.first and block_offset > 0 and - blockdata[block_offset - 1] == data[win.first - 1]) { - --block_offset; - --win.first; - } - - return true; + add_data(ino, mm, written, size - written); + finish_chunk(ino); } block_manager::block_manager(logger& lgr, progress& prog, const config& cfg, diff --git a/src/dwarfs/block_manager_v2.cpp b/src/dwarfs/block_manager_v2.cpp deleted file mode 100644 index c43a1400..00000000 --- a/src/dwarfs/block_manager_v2.cpp +++ /dev/null @@ -1,537 +0,0 @@ -/* vim:set ts=2 sw=2 sts=2 et: */ -/** - * \author Marcus Holland-Moritz (github@mhxnet.de) - * \copyright Copyright (c) Marcus Holland-Moritz - * - * This file is part of dwarfs. - * - * dwarfs is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * dwarfs is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with dwarfs. If not, see . - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include - -#include - -#include "dwarfs/block_data.h" -#include "dwarfs/block_manager.h" -#include "dwarfs/compiler.h" -#include "dwarfs/cyclic_hash.h" -#include "dwarfs/entry.h" -#include "dwarfs/error.h" -#include "dwarfs/filesystem_writer.h" -#include "dwarfs/inode.h" -#include "dwarfs/logger.h" -#include "dwarfs/mmif.h" -#include "dwarfs/os_access.h" -#include "dwarfs/progress.h" -#include "dwarfs/util.h" - -namespace dwarfs { - -/** - * Block Manager Strategy - * - * For each *block*, start new rolling hash. The the hashes are associcated - * with the block, new hash-offset-pairs will only be added as the block - * grows. We only need to store a hash-offset-pair every N bytes, with N - * being configurable (typically half of the window size so we find all - * matches of at least 1.5 times the window size, but could also be smaller). - * - * For each *file*, start new rolling hash. Hash values *expire* immediately, - * no history of hash values is kept. Up to n blocks (typically only one) - * will be scanned for matches. Old file data beyond the moving window will - * be added to the current *block*, causing the rolling *block* hash to also - * advance. Data needs to be added to the block only in increments at which - * a new hash valus is computed. - * - * This strategy ensures that we're using a deterministic amount of memory - * (proportional to block size and history block count). - * - * A single window size is sufficient. That window size should still be - * configurable. - */ - -struct bm_stats { - bm_stats() - : l2_collision_vec_size(1, 0, 128) {} - - size_t total_hashes{0}; - size_t l2_collisions{0}; - size_t total_matches{0}; - size_t good_matches{0}; - size_t bad_matches{0}; - folly::Histogram l2_collision_vec_size; -}; - -template -class fast_multimap { - private: - using collision_vector = folly::small_vector; - using blockhash_t = google::dense_hash_map; - using collision_t = std::unordered_map; - - public: - fast_multimap() { values_.set_empty_key(EmptyKey); } - - void insert(KeyT const& key, ValT const& val) { - if (key == EmptyKey or !values_.insert(std::make_pair(key, val)).second) { - collisions_[key].emplace_back(val); - } - } - - template - void for_each_value(KeyT const& key, F&& func) const { - if (auto it = values_.find(key); it != values_.end()) { - func(it->second); - if (auto it2 = collisions_.find(key); it2 != collisions_.end()) { - for (auto const& val : it2->second) { - func(val); - } - } - } - } - - void clear() { - values_.clear(); - collisions_.clear(); - } - - blockhash_t const& values() const { return values_; }; - collision_t const& collisions() const { return collisions_; }; - - private: - blockhash_t values_; - collision_t collisions_; -}; - -class active_block { - private: - using offset_t = uint32_t; - using hash_t = uint32_t; - - public: - active_block(size_t num, size_t size, size_t window_size, size_t window_step) - : num_(num) - , capacity_(size) - , window_size_(window_size) - , window_step_mask_(window_step - 1) - , data_{std::make_shared()} { - DWARFS_CHECK((window_step & window_step_mask_) == 0, - "window step size not a power of two"); - data_->vec().reserve(capacity_); - } - - size_t num() const { return num_; } - size_t size() const { return data_->size(); } - - bool full() const { return size() == capacity_; } - std::shared_ptr data() const { return data_; } - - void append(uint8_t const* p, size_t size); - - size_t next_hash_distance() const { - return window_step_mask_ + 1 - (data_->vec().size() & window_step_mask_); - } - - template - void for_each_offset(hash_t key, F&& func) const { - offsets_.for_each_value(key, std::forward(func)); - } - - void finalize(bm_stats& stats) { - stats.total_hashes += offsets_.values().size(); - for (auto& c : offsets_.collisions()) { - stats.total_hashes += c.second.size(); - stats.l2_collisions += c.second.size() - 1; - stats.l2_collision_vec_size.addValue(c.second.size()); - } - } - - private: - static constexpr size_t num_inline_offsets = 4; - - size_t num_, capacity_, window_size_, window_step_mask_; - rsync_hash hasher_; - fast_multimap offsets_; - std::shared_ptr data_; -}; - -template -class block_manager_ : public block_manager::impl { - public: - block_manager_(logger& lgr, progress& prog, const block_manager::config& cfg, - std::shared_ptr os, filesystem_writer& fsw) - : log_{lgr} - , prog_{prog} - , cfg_{cfg} - , os_{std::move(os)} - , fsw_{fsw} - , window_size_{cfg.blockhash_window_size > 0 - ? static_cast(1) << cfg.blockhash_window_size - : 0} - , window_step_{window_size_ >> cfg.window_increment_shift} - , block_size_{static_cast(1) << cfg.block_size_bits} {} - - void add_inode(std::shared_ptr ino) override; - void finish_blocks() override; - - private: - struct chunk_state { - size_t offset{0}; - size_t size{0}; - }; - - void block_ready(); - void finish_chunk(inode& ino); - void append_to_block(inode& ino, mmif& mm, size_t offset, size_t size); - void add_data(inode& ino, mmif& mm, size_t offset, size_t size); - void segment_and_add_data(inode& ino, mmif& mm, size_t size); - - log_proxy log_; - progress& prog_; - const block_manager::config& cfg_; - std::shared_ptr os_; - filesystem_writer& fsw_; - - size_t const window_size_; - size_t const window_step_; - size_t const block_size_; - size_t block_count_{0}; - - chunk_state chunk_; - - bm_stats stats_; - - // Active blocks are blocks that can still be referenced from new chunks. - // Up to N blocks (configurable) can be active and are kept in this queue. - // All active blocks except for the last one are immutable and potentially - // already being compressed. - std::deque blocks_; -}; - -class segment_match { - public: - segment_match(active_block const* blk, uint32_t off) noexcept - : block_{blk} - , offset_{off} {} - - void verify_and_extend(uint8_t const* pos, size_t len, uint8_t const* begin, - uint8_t const* end); - - bool operator<(segment_match const& rhs) const { - return size_ < rhs.size_ || - (size_ == rhs.size_ && - (block_->num() < rhs.block_->num() || - (block_->num() == rhs.block_->num() && offset_ < rhs.offset_))); - } - - uint8_t const* data() const { return data_; } - uint32_t size() const { return size_; } - uint32_t offset() const { return offset_; } - size_t block_num() const { return block_->num(); } - - private: - active_block const* block_; - uint32_t offset_; - uint32_t size_{0}; - uint8_t const* data_; -}; - -void active_block::append(uint8_t const* p, size_t size) { - auto& v = data_->vec(); - auto offset = v.size(); - DWARFS_CHECK(offset + size <= capacity_, - fmt::format("block capacity exceeded: {} + {} > {}", offset, - size, capacity_)); - v.resize(offset + size); - ::memcpy(v.data() + offset, p, size); - - while (offset < v.size()) { - if (offset < window_size_) { - hasher_.update(v[offset++]); - } else { - hasher_.update(v[offset - window_size_], v[offset]); - if ((++offset & window_step_mask_) == 0) { - offsets_.insert(hasher_(), offset - window_size_); - } - } - } -} - -void segment_match::verify_and_extend(uint8_t const* pos, size_t len, - uint8_t const* begin, - uint8_t const* end) { - auto const& v = block_->data()->vec(); - - if (::memcmp(v.data() + offset_, pos, len) == 0) { - // scan backward - auto tmp = offset_; - while (tmp > 0 && pos > begin && v[tmp - 1] == pos[-1]) { - --tmp; - --pos; - } - len += offset_ - tmp; - offset_ = tmp; - data_ = pos; - - // scan forward - pos += len; - tmp = offset_ + len; - while (tmp < v.size() && pos < end && v[tmp] == *pos) { - ++tmp; - ++pos; - } - size_ = tmp - offset_; - } -} - -template -void block_manager_::add_inode(std::shared_ptr ino) { - auto e = ino->any(); - - if (size_t size = e->size(); size > 0) { - auto mm = os_->map_file(e->path(), size); - - LOG_TRACE << "adding inode " << ino->num() << " [" << ino->any()->name() - << "] - size: " << size; - - if (window_size_ == 0 or size < window_size_) { - // no point dealing with hashing, just write it out - add_data(*ino, *mm, 0, size); - finish_chunk(*ino); - } else { - segment_and_add_data(*ino, *mm, size); - } - } -} - -template -void block_manager_::finish_blocks() { - if (!blocks_.empty()) { - block_ready(); - } - - auto l1_collisions = stats_.l2_collision_vec_size.computeTotalCount(); - - LOG_INFO << "segmentation matches: good=" << stats_.good_matches - << ", bad=" << stats_.bad_matches - << ", total=" << stats_.total_matches; - LOG_INFO << "segmentation collisions: L1=" - << fmt::format("{:.3f}%", - 100.0 * (l1_collisions + stats_.l2_collisions) / - stats_.total_hashes) - << ", L2=" - << fmt::format("{:.3f}%", - 100.0 * stats_.l2_collisions / stats_.total_hashes) - << " [" << stats_.total_hashes << " hashes]"; - - if (l1_collisions > 0) { - auto pct = [&](double p) { - return stats_.l2_collision_vec_size.getPercentileEstimate(p); - }; - LOG_DEBUG << "collision vector size p50: " << pct(0.5) - << ", p75: " << pct(0.75) << ", p90: " << pct(0.9) - << ", p95: " << pct(0.95) << ", p99: " << pct(0.99); - } -} - -template -void block_manager_::block_ready() { - auto& block = blocks_.back(); - block.finalize(stats_); - fsw_.write_block(block.data()); - ++prog_.block_count; -} - -template -void block_manager_::append_to_block(inode& ino, mmif& mm, - size_t offset, size_t size) { - if (DWARFS_UNLIKELY(blocks_.empty() or blocks_.back().full())) { - if (blocks_.size() >= cfg_.max_active_blocks) { - blocks_.pop_front(); - } - - blocks_.emplace_back(block_count_++, block_size_, window_size_, - window_step_); - } - - auto& block = blocks_.back(); - - block.append(mm.as(offset), size); - chunk_.size += size; - - prog_.filesystem_size += size; - - if (block.full()) { - mm.release_until(offset + size); - finish_chunk(ino); - block_ready(); - } -} - -template -void block_manager_::add_data(inode& ino, mmif& mm, size_t offset, - size_t size) { - while (size > 0) { - size_t block_offset = 0; - - if (!blocks_.empty()) { - block_offset = blocks_.back().size(); - } - - size_t chunk_size = std::min(size, block_size_ - block_offset); - - append_to_block(ino, mm, offset, chunk_size); - - offset += chunk_size; - size -= chunk_size; - } -} - -template -void block_manager_::finish_chunk(inode& ino) { - if (chunk_.size > 0) { - auto& block = blocks_.back(); - ino.add_chunk(block.num(), chunk_.offset, chunk_.size); - chunk_.offset = block.full() ? 0 : block.size(); - chunk_.size = 0; - prog_.chunk_count++; - } -} - -template -void block_manager_::segment_and_add_data(inode& ino, mmif& mm, - size_t size) { - rsync_hash hasher; - size_t offset = 0; - size_t written = 0; - size_t lookback_size = window_size_ + window_step_; - size_t next_hash_offset = - lookback_size + - (blocks_.empty() ? window_step_ : blocks_.back().next_hash_distance()); - auto p = mm.as(); - - DWARFS_CHECK(size >= window_size_, "unexpected call to segment_and_add_data"); - - for (; offset < window_size_; ++offset) { - hasher.update(p[offset]); - } - - std::vector matches; - - while (offset < size) { - for (auto const& block : blocks_) { - block.for_each_offset(hasher(), [&](uint32_t offset) { - matches.emplace_back(&block, offset); - }); - } - - if (!matches.empty()) { - LOG_TRACE << "found " << matches.size() << " matches (hash=" << hasher() - << ", window size=" << window_size_ << ")"; - - for (auto& m : matches) { - LOG_TRACE << " block " << m.block_num() << " @ " << m.offset(); - m.verify_and_extend(p + offset - window_size_, window_size_, - p + written, p + size); - } - - stats_.total_matches += matches.size(); - stats_.bad_matches += - std::count_if(matches.begin(), matches.end(), - [](auto const& m) { return m.size() == 0; }); - - auto best = std::max_element(matches.begin(), matches.end()); - auto match_len = best->size(); - - if (match_len > 0) { - ++stats_.good_matches; - LOG_TRACE << "successful match of length " << match_len << " @ " - << best->offset(); - - auto block_num = best->block_num(); - auto match_off = best->offset(); - auto num_to_write = best->data() - (p + written); - - // best->block can be invalidated by this call to add_data()! - add_data(ino, mm, written, num_to_write); - written += num_to_write; - finish_chunk(ino); - - ino.add_chunk(block_num, match_off, match_len); - written += match_len; - - prog_.saved_by_segmentation += match_len; - - offset = written; - - if (size - written < window_size_) { - break; - } - - hasher.clear(); - - for (; offset < written + window_size_; ++offset) { - hasher.update(p[offset]); - } - - next_hash_offset = - written + lookback_size + blocks_.back().next_hash_distance(); - } - - matches.clear(); - - if (match_len > 0) { - continue; - } - } - - // no matches found, see if we can append data - // we need to keep at least lookback_size bytes unwritten - - if (offset == next_hash_offset) { - auto num_to_write = offset - lookback_size - written; - add_data(ino, mm, written, num_to_write); - written += num_to_write; - next_hash_offset += window_step_; - } - - hasher.update(p[offset - window_size_], p[offset]); - ++offset; - } - - add_data(ino, mm, written, size - written); - finish_chunk(ino); -} - -block_manager::block_manager(logger& lgr, progress& prog, const config& cfg, - std::shared_ptr os, - filesystem_writer& fsw) - : impl_(make_unique_logging_object( - lgr, prog, cfg, os, fsw)) {} - -} // namespace dwarfs