Replace block manager implementation

This commit is contained in:
Marcus Holland-Moritz 2021-03-03 12:41:19 +01:00
parent 0da750c143
commit 09d0ff50bd
3 changed files with 410 additions and 929 deletions

View File

@ -192,7 +192,7 @@ list(
LIBDWARFS_SRC LIBDWARFS_SRC
src/dwarfs/block_cache.cpp src/dwarfs/block_cache.cpp
src/dwarfs/block_compressor.cpp src/dwarfs/block_compressor.cpp
src/dwarfs/block_manager_v2.cpp src/dwarfs/block_manager.cpp
src/dwarfs/checksum.cpp src/dwarfs/checksum.cpp
src/dwarfs/console_writer.cpp src/dwarfs/console_writer.cpp
src/dwarfs/entry.cpp src/dwarfs/entry.cpp

View File

@ -22,9 +22,10 @@
#include <algorithm> #include <algorithm>
#include <cstdint> #include <cstdint>
#include <cstring> #include <cstring>
#include <map> #include <deque>
#include <string> #include <string>
#include <type_traits> #include <type_traits>
#include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -32,13 +33,16 @@
#include <sparsehash/dense_hash_map> #include <sparsehash/dense_hash_map>
#include <folly/stats/Histogram.h>
#include "dwarfs/block_data.h" #include "dwarfs/block_data.h"
#include "dwarfs/block_manager.h" #include "dwarfs/block_manager.h"
#include "dwarfs/compiler.h"
#include "dwarfs/cyclic_hash.h"
#include "dwarfs/entry.h" #include "dwarfs/entry.h"
#include "dwarfs/error.h" #include "dwarfs/error.h"
#include "dwarfs/filesystem_writer.h" #include "dwarfs/filesystem_writer.h"
#include "dwarfs/inode.h" #include "dwarfs/inode.h"
#include "dwarfs/inode_hasher.h"
#include "dwarfs/logger.h" #include "dwarfs/logger.h"
#include "dwarfs/mmif.h" #include "dwarfs/mmif.h"
#include "dwarfs/os_access.h" #include "dwarfs/os_access.h"
@ -47,467 +51,481 @@
namespace dwarfs { namespace dwarfs {
namespace { /**
// TODO: for now, this has to work better... * Block Manager Strategy
// we lose this hash value, but that's life... *
const uint32_t blockhash_emtpy = 0; * For each *block*, start new rolling hash. The the hashes are associcated
* with the block, new hash-offset-pairs will only be added as the block
* grows. We only need to store a hash-offset-pair every N bytes, with N
* being configurable (typically half of the window size so we find all
* matches of at least 1.5 times the window size, but could also be smaller).
*
* For each *file*, start new rolling hash. Hash values *expire* immediately,
* no history of hash values is kept. Up to n blocks (typically only one)
* will be scanned for matches. Old file data beyond the moving window will
* be added to the current *block*, causing the rolling *block* hash to also
* advance. Data needs to be added to the block only in increments at which
* a new hash valus is computed.
*
* This strategy ensures that we're using a deterministic amount of memory
* (proportional to block size and history block count).
*
* A single window size is sufficient. That window size should still be
* configurable.
*/
struct bm_stats { struct bm_stats {
bm_stats()
: l2_collision_vec_size(1, 0, 128) {}
size_t total_hashes{0}; size_t total_hashes{0};
size_t collisions{0}; size_t l2_collisions{0};
size_t real_matches{0}; size_t total_matches{0};
size_t good_matches{0};
size_t bad_matches{0}; size_t bad_matches{0};
size_t saved_bytes{0}; folly::Histogram<size_t> l2_collision_vec_size;
size_t largest_block{0};
};
} // namespace
template <typename Key, typename T>
struct block_hashes {
static constexpr size_t max_coll_inline = 2;
using blockhash_t = google::dense_hash_map<Key, T>;
block_hashes(size_t size)
: size(size) {
values.set_empty_key(blockhash_emtpy);
}
const size_t size;
blockhash_t values;
}; };
template <typename LoggerPolicy> template <typename KeyT, typename ValT, KeyT EmptyKey = KeyT{},
class block_manager_ : public block_manager::impl { size_t MaxCollInline = 2>
class fast_multimap {
private: private:
using cyclic_hash_t = uint32_t; using collision_vector = folly::small_vector<ValT, MaxCollInline>;
using offset_t = uint32_t; using blockhash_t = google::dense_hash_map<KeyT, ValT>;
using block_hashes_t = block_hashes<cyclic_hash_t, offset_t>; using collision_t = std::unordered_map<KeyT, collision_vector>;
using bhv_ri = std::vector<block_hashes_t>::reverse_iterator;
using hasher_type = inode_hasher<LoggerPolicy, cyclic_hash_t>;
using hash_map_type = typename hasher_type::result_type;
struct match_window {
match_window()
: first(0)
, last(0) {}
match_window(size_t first, size_t last)
: first(first)
, last(last) {}
size_t size() const { return last - first; }
size_t first;
size_t last;
};
template <typename T>
static std::vector<T> sorted(const std::vector<T>& vec) {
std::vector<T> tmp(vec);
std::sort(tmp.begin(), tmp.end());
return tmp;
}
public: public:
block_manager_(logger& lgr, progress& prog, const block_manager::config& cfg, fast_multimap() { values_.set_empty_key(EmptyKey); }
std::shared_ptr<os_access> os, filesystem_writer& fsw)
: cfg_(cfg)
, block_size_(static_cast<size_t>(1) << cfg.block_size_bits)
, blockhash_window_size_(sorted(cfg.blockhash_window_size))
, fsw_(fsw)
, os_(os)
, current_block_(0)
, total_blocks_size_(0)
, block_{std::make_shared<block_data>()}
, hasher_(lgr, blockhash_window_size_)
, log_(lgr)
, prog_(prog) {
block_->vec().reserve(block_size_);
for (auto size : blockhash_window_size_) { void insert(KeyT const& key, ValT const& val) {
block_hashes_.emplace_back(size); if (key == EmptyKey or !values_.insert(std::make_pair(key, val)).second) {
collisions_[key].emplace_back(val);
} }
} }
void add_inode(std::shared_ptr<inode> ino) override; template <typename F>
void finish_blocks() override; void for_each_value(KeyT const& key, F&& func) const {
if (auto it = values_.find(key); it != values_.end()) {
size_t total_size() const override { return total_blocks_size_; } func(it->second);
if (auto it2 = collisions_.find(key); it2 != collisions_.end()) {
size_t total_blocks() const override { for (auto const& val : it2->second) {
return total_blocks_size_ > 0 ? current_block_ + 1 : 0; func(val);
}
private:
size_t cur_offset() const { return block_->size(); }
void block_ready();
void update_hashes(const hash_map_type& hm, size_t offset, size_t size);
void add_chunk(const std::shared_ptr<inode>& ino, const uint8_t* p,
size_t offset, size_t size, const hash_map_type* hm);
void add_data(const std::shared_ptr<inode>& ino, const uint8_t* p,
size_t size, const hash_map_type* hm = nullptr);
void segment_and_add_data(const hash_map_type& hm,
const std::shared_ptr<inode>& ino,
const uint8_t* data, size_t size);
void
segment_and_add_data(const std::string& indent, const hash_map_type& hm,
const std::shared_ptr<inode>& ino, const uint8_t* data,
match_window mw, bhv_ri bhcur, bhv_ri bhend);
bool get_match_window(const std::string& indent, match_window& win,
size_t& block_offset, const uint8_t* data,
const match_window& search_win) const;
void
validate_match(const std::string& indent, const uint8_t* data,
match_window& mw, const std::vector<cyclic_hash_t>& hashvec,
bm_stats& stats, size_t block_size, size_t block_offset,
size_t off, match_window& best, size_t& best_offset);
const block_manager::config& cfg_;
const size_t block_size_;
std::vector<size_t> blockhash_window_size_;
filesystem_writer& fsw_;
std::shared_ptr<os_access> os_;
size_t current_block_;
size_t total_blocks_size_;
std::shared_ptr<block_data> block_;
std::vector<block_hashes_t> block_hashes_;
hasher_type hasher_;
hash_map_type hm_;
log_proxy<LoggerPolicy> log_;
progress& prog_;
std::map<size_t, bm_stats> stats_;
};
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::finish_blocks() {
if (!block_->empty()) {
block_ready();
}
for (const auto& sti : stats_) {
static char const* const percent = "{:.2}%";
const auto& st = sti.second;
LOG_DEBUG << "blockhash window <" << sti.first << ">: " << st.collisions
<< " collisions ("
<< fmt::format(percent, float(st.collisions) / st.total_hashes)
<< "), " << st.real_matches << " real matches, " << st.bad_matches
<< " bad matches, ("
<< fmt::format(percent, float(st.bad_matches) /
(st.real_matches + st.bad_matches))
<< "), " << size_with_unit(st.saved_bytes)
<< " saved (largest=" << size_with_unit(st.largest_block) << ")";
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::block_ready() {
auto tmp = std::make_shared<block_data>();
block_.swap(tmp);
fsw_.write_block(std::move(tmp));
block_->vec().reserve(block_size_);
for (auto& bh : block_hashes_) {
bh.values.clear();
}
++current_block_;
++prog_.block_count;
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::update_hashes(const hash_map_type& hm,
size_t offset, size_t size) {
size_t block_offset = cur_offset();
for (auto bhi = block_hashes_.begin(); bhi != block_hashes_.end(); ++bhi) {
if (bhi->size <= size) {
auto hmi = hm.find(bhi->size);
auto& stats = stats_[bhi->size];
if (hmi == hm.end()) {
DWARFS_THROW(runtime_error,
"internal error: no hash found for window size");
}
const auto& hashvec = hmi->second;
size_t incr = bhi->size >> cfg_.window_increment_shift;
size_t last = (size - bhi->size) + 1;
if (hashvec.size() < offset + last) {
LOG_ERROR << "bhi=" << bhi->size
<< ", hashvec.size()=" << hashvec.size()
<< ", offset=" << offset << ", last=" << last;
DWARFS_THROW(runtime_error, "internal error: hashvec too small");
}
for (size_t off = 0; off < last; off += incr) {
cyclic_hash_t hval = hashvec[offset + off];
++stats.total_hashes;
if (hval != blockhash_emtpy) {
if constexpr (std::is_same_v<LoggerPolicy, prod_logger_policy>) {
bhi->values[hval] = block_offset + off;
} else {
auto new_offest = block_offset + off;
auto [it, success] =
bhi->values.insert(std::make_pair(hval, new_offest));
if (!success) {
LOG_TRACE << "collision for hash=" << hval
<< " (size=" << bhi->size << "): " << it->second
<< " <-> " << new_offest;
++stats.collisions;
it->second = new_offest;
}
}
} }
} }
} }
} }
}
template <typename LoggerPolicy> void clear() {
void block_manager_<LoggerPolicy>::add_chunk(const std::shared_ptr<inode>& ino, values_.clear();
const uint8_t* p, size_t offset, collisions_.clear();
size_t size,
const hash_map_type* hm) {
LOG_TRACE << "block " << current_block_ << " size: " << block_->size()
<< " of " << block_size_;
if (hm) {
update_hashes(*hm, offset, size);
} }
size_t block_offset = cur_offset(); blockhash_t const& values() const { return values_; };
collision_t const& collisions() const { return collisions_; };
LOG_TRACE << "adding chunk for inode " << ino->num() << " [" private:
<< ino->any()->name() << "] - block: " << current_block_ blockhash_t values_;
<< " offset: " << block_offset << ", size: " << size; collision_t collisions_;
};
block_->vec().resize(block_offset + size); class active_block {
private:
using offset_t = uint32_t;
using hash_t = uint32_t;
::memcpy(block_->data() + block_offset, p + offset, size); public:
active_block(size_t num, size_t size, size_t window_size, size_t window_step)
ino->add_chunk(current_block_, block_offset, size); : num_(num)
prog_.chunk_count++; , capacity_(size)
prog_.filesystem_size += size; , window_size_(window_size)
, window_step_mask_(window_step - 1)
if (block_->size() == block_size_) { , data_{std::make_shared<block_data>()} {
block_ready(); DWARFS_CHECK((window_step & window_step_mask_) == 0,
"window step size not a power of two");
data_->vec().reserve(capacity_);
} }
}
template <typename LoggerPolicy> size_t num() const { return num_; }
void block_manager_<LoggerPolicy>::add_data(const std::shared_ptr<inode>& ino, size_t size() const { return data_->size(); }
const uint8_t* p, size_t size,
const hash_map_type* hm) {
size_t offset = 0;
while (size > 0) { bool full() const { return size() == capacity_; }
size_t block_offset = cur_offset(); std::shared_ptr<block_data> data() const { return data_; }
if (block_offset == block_size_) { void append(uint8_t const* p, size_t size);
// apparently we've filled a block, so a new one will be created once
// we're adding another chunk size_t next_hash_distance() const {
block_offset = 0; return window_step_mask_ + 1 - (data_->vec().size() & window_step_mask_);
}
template <typename F>
void for_each_offset(hash_t key, F&& func) const {
offsets_.for_each_value(key, std::forward<F>(func));
}
void finalize(bm_stats& stats) {
stats.total_hashes += offsets_.values().size();
for (auto& c : offsets_.collisions()) {
stats.total_hashes += c.second.size();
stats.l2_collisions += c.second.size() - 1;
stats.l2_collision_vec_size.addValue(c.second.size());
} }
}
if (block_offset + size <= block_size_) { private:
add_chunk(ino, p, offset, size, hm); static constexpr size_t num_inline_offsets = 4;
break;
size_t num_, capacity_, window_size_, window_step_mask_;
rsync_hash hasher_;
fast_multimap<hash_t, offset_t, num_inline_offsets> offsets_;
std::shared_ptr<block_data> data_;
};
template <typename LoggerPolicy>
class block_manager_ : public block_manager::impl {
public:
block_manager_(logger& lgr, progress& prog, const block_manager::config& cfg,
std::shared_ptr<os_access> os, filesystem_writer& fsw)
: log_{lgr}
, prog_{prog}
, cfg_{cfg}
, os_{std::move(os)}
, fsw_{fsw}
, window_size_{cfg.blockhash_window_size > 0
? static_cast<size_t>(1) << cfg.blockhash_window_size
: 0}
, window_step_{window_size_ >> cfg.window_increment_shift}
, block_size_{static_cast<size_t>(1) << cfg.block_size_bits} {}
void add_inode(std::shared_ptr<inode> ino) override;
void finish_blocks() override;
private:
struct chunk_state {
size_t offset{0};
size_t size{0};
};
void block_ready();
void finish_chunk(inode& ino);
void append_to_block(inode& ino, mmif& mm, size_t offset, size_t size);
void add_data(inode& ino, mmif& mm, size_t offset, size_t size);
void segment_and_add_data(inode& ino, mmif& mm, size_t size);
log_proxy<LoggerPolicy> log_;
progress& prog_;
const block_manager::config& cfg_;
std::shared_ptr<os_access> os_;
filesystem_writer& fsw_;
size_t const window_size_;
size_t const window_step_;
size_t const block_size_;
size_t block_count_{0};
chunk_state chunk_;
bm_stats stats_;
// Active blocks are blocks that can still be referenced from new chunks.
// Up to N blocks (configurable) can be active and are kept in this queue.
// All active blocks except for the last one are immutable and potentially
// already being compressed.
std::deque<active_block> blocks_;
};
class segment_match {
public:
segment_match(active_block const* blk, uint32_t off) noexcept
: block_{blk}
, offset_{off} {}
void verify_and_extend(uint8_t const* pos, size_t len, uint8_t const* begin,
uint8_t const* end);
bool operator<(segment_match const& rhs) const {
return size_ < rhs.size_ ||
(size_ == rhs.size_ &&
(block_->num() < rhs.block_->num() ||
(block_->num() == rhs.block_->num() && offset_ < rhs.offset_)));
}
uint8_t const* data() const { return data_; }
uint32_t size() const { return size_; }
uint32_t offset() const { return offset_; }
size_t block_num() const { return block_->num(); }
private:
active_block const* block_;
uint32_t offset_;
uint32_t size_{0};
uint8_t const* data_;
};
void active_block::append(uint8_t const* p, size_t size) {
auto& v = data_->vec();
auto offset = v.size();
DWARFS_CHECK(offset + size <= capacity_,
fmt::format("block capacity exceeded: {} + {} > {}", offset,
size, capacity_));
v.resize(offset + size);
::memcpy(v.data() + offset, p, size);
while (offset < v.size()) {
if (offset < window_size_) {
hasher_.update(v[offset++]);
} else { } else {
size_t chunksize = block_size_ - block_offset; hasher_.update(v[offset - window_size_], v[offset]);
add_chunk(ino, p, offset, chunksize, hm); if ((++offset & window_step_mask_) == 0) {
offset += chunksize; offsets_.insert(hasher_(), offset - window_size_);
size -= chunksize; }
} }
} }
} }
void segment_match::verify_and_extend(uint8_t const* pos, size_t len,
uint8_t const* begin,
uint8_t const* end) {
auto const& v = block_->data()->vec();
if (::memcmp(v.data() + offset_, pos, len) == 0) {
// scan backward
auto tmp = offset_;
while (tmp > 0 && pos > begin && v[tmp - 1] == pos[-1]) {
--tmp;
--pos;
}
len += offset_ - tmp;
offset_ = tmp;
data_ = pos;
// scan forward
pos += len;
tmp = offset_ + len;
while (tmp < v.size() && pos < end && v[tmp] == *pos) {
++tmp;
++pos;
}
size_ = tmp - offset_;
}
}
template <typename LoggerPolicy> template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::add_inode(std::shared_ptr<inode> ino) { void block_manager_<LoggerPolicy>::add_inode(std::shared_ptr<inode> ino) {
auto e = ino->any(); auto e = ino->any();
size_t size = e->size();
if (size > 0) { if (size_t size = e->size(); size > 0) {
auto mm = os_->map_file(e->path(), size); auto mm = os_->map_file(e->path(), size);
LOG_TRACE << "adding inode " << ino->num() << " [" << ino->any()->name() LOG_TRACE << "adding inode " << ino->num() << " [" << ino->any()->name()
<< "] - size: " << size; << "] - size: " << size;
if (blockhash_window_size_.empty() or if (window_size_ == 0 or size < window_size_) {
size < blockhash_window_size_.front()) { // no point dealing with hashing, just write it out
// no point dealing with hashes, just write it out add_data(*ino, *mm, 0, size);
// XXX: might be worth checking if the whole file has a match? finish_chunk(*ino);
add_data(ino, mm->as<uint8_t>(), size);
} else { } else {
const uint8_t* data = mm->as<uint8_t>(); segment_and_add_data(*ino, *mm, size);
// XXX: hm_ is reused to avoid reallocations
hasher_(hm_, data, size);
segment_and_add_data(hm_, ino, data, size);
} }
} }
} }
template <typename LoggerPolicy> template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::segment_and_add_data( void block_manager_<LoggerPolicy>::finish_blocks() {
const hash_map_type& hm, const std::shared_ptr<inode>& ino, if (!blocks_.empty()) {
const uint8_t* data, size_t size) { block_ready();
; }
segment_and_add_data("", hm, ino, data, match_window(0, size),
block_hashes_.rbegin(), block_hashes_.rend());
}
template <typename LoggerPolicy> auto l1_collisions = stats_.l2_collision_vec_size.computeTotalCount();
void block_manager_<LoggerPolicy>::validate_match(
const std::string& indent, const uint8_t* data, match_window& mw,
const std::vector<cyclic_hash_t>& hashvec, bm_stats& stats,
size_t block_size, size_t block_offset, size_t off, match_window& best,
size_t& best_offset) {
LOG_TRACE << indent << "potentially matched " << block_size
<< " bytes at offset " << off << ", hash=" << hashvec[off];
match_window win(off, off + block_size); LOG_INFO << "segmentation matches: good=" << stats_.good_matches
<< ", bad=" << stats_.bad_matches
<< ", total=" << stats_.total_matches;
LOG_INFO << "segmentation collisions: L1="
<< fmt::format("{:.3f}%",
100.0 * (l1_collisions + stats_.l2_collisions) /
stats_.total_hashes)
<< ", L2="
<< fmt::format("{:.3f}%",
100.0 * stats_.l2_collisions / stats_.total_hashes)
<< " [" << stats_.total_hashes << " hashes]";
if (get_match_window(indent, win, block_offset, data, mw)) { if (l1_collisions > 0) {
LOG_TRACE << indent << "definitely matched " << win.size() auto pct = [&](double p) {
<< " bytes at offset " << win.first; return stats_.l2_collision_vec_size.getPercentileEstimate(p);
++stats.real_matches; };
LOG_DEBUG << "collision vector size p50: " << pct(0.5)
// fuck yeah, we've got a block... << ", p75: " << pct(0.75) << ", p90: " << pct(0.9)
<< ", p95: " << pct(0.95) << ", p99: " << pct(0.99);
if (win.size() > best.size()) {
best = win;
best_offset = block_offset;
}
} else {
LOG_TRACE << indent << "bad match: " << block_size << " bytes at offset "
<< off << ", hash=" << hashvec[off];
++stats.bad_matches;
} }
} }
template <typename LoggerPolicy> template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::segment_and_add_data( void block_manager_<LoggerPolicy>::block_ready() {
const std::string& indent, const hash_map_type& hm, auto& block = blocks_.back();
const std::shared_ptr<inode>& ino, const uint8_t* data, match_window mw, block.finalize(stats_);
bhv_ri bhcur, bhv_ri bhend) { fsw_.write_block(block.data());
LOG_TRACE << indent << "segment_and_add_data([" << mw.first << ", " << mw.last ++prog_.block_count;
<< "], " << (bhcur != bhend ? bhcur->size : 0) << ")"; }
while (bhcur != bhend) { template <typename LoggerPolicy>
LOG_TRACE << indent << " wsize=" << bhcur->size; void block_manager_<LoggerPolicy>::append_to_block(inode& ino, mmif& mm,
size_t offset, size_t size) {
if (DWARFS_UNLIKELY(blocks_.empty() or blocks_.back().full())) {
if (blocks_.size() >= cfg_.max_active_blocks) {
blocks_.pop_front();
}
if (bhcur->size <= mw.size()) { blocks_.emplace_back(block_count_++, block_size_, window_size_,
auto hmi = hm.find(bhcur->size); window_step_);
auto& stats = stats_[bhcur->size]; }
if (hmi == hm.end()) { auto& block = blocks_.back();
DWARFS_THROW(runtime_error,
"internal error: no hash found for window size"); block.append(mm.as<uint8_t>(offset), size);
chunk_.size += size;
prog_.filesystem_size += size;
if (block.full()) {
mm.release_until(offset + size);
finish_chunk(ino);
block_ready();
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::add_data(inode& ino, mmif& mm, size_t offset,
size_t size) {
while (size > 0) {
size_t block_offset = 0;
if (!blocks_.empty()) {
block_offset = blocks_.back().size();
}
size_t chunk_size = std::min(size, block_size_ - block_offset);
append_to_block(ino, mm, offset, chunk_size);
offset += chunk_size;
size -= chunk_size;
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::finish_chunk(inode& ino) {
if (chunk_.size > 0) {
auto& block = blocks_.back();
ino.add_chunk(block.num(), chunk_.offset, chunk_.size);
chunk_.offset = block.full() ? 0 : block.size();
chunk_.size = 0;
prog_.chunk_count++;
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::segment_and_add_data(inode& ino, mmif& mm,
size_t size) {
rsync_hash hasher;
size_t offset = 0;
size_t written = 0;
size_t lookback_size = window_size_ + window_step_;
size_t next_hash_offset =
lookback_size +
(blocks_.empty() ? window_step_ : blocks_.back().next_hash_distance());
auto p = mm.as<uint8_t>();
DWARFS_CHECK(size >= window_size_, "unexpected call to segment_and_add_data");
for (; offset < window_size_; ++offset) {
hasher.update(p[offset]);
}
std::vector<segment_match> matches;
while (offset < size) {
for (auto const& block : blocks_) {
block.for_each_offset(hasher(), [&](uint32_t offset) {
matches.emplace_back(&block, offset);
});
}
if (!matches.empty()) {
LOG_TRACE << "found " << matches.size() << " matches (hash=" << hasher()
<< ", window size=" << window_size_ << ")";
for (auto& m : matches) {
LOG_TRACE << " block " << m.block_num() << " @ " << m.offset();
m.verify_and_extend(p + offset - window_size_, window_size_,
p + written, p + size);
} }
const auto& hashvec = hmi->second; stats_.total_matches += matches.size();
stats_.bad_matches +=
std::count_if(matches.begin(), matches.end(),
[](auto const& m) { return m.size() == 0; });
for (size_t off = mw.first; off <= mw.last - bhcur->size;) { auto best = std::max_element(matches.begin(), matches.end());
match_window best; auto match_len = best->size();
size_t best_offset = 0;
if (auto bhi = bhcur->values.find(hashvec[off]); if (match_len > 0) {
bhi != bhcur->values.end()) { ++stats_.good_matches;
validate_match(indent, data, mw, hashvec, stats, bhcur->size, LOG_TRACE << "successful match of length " << match_len << " @ "
bhi->second, off, best, best_offset); << best->offset();
auto block_num = best->block_num();
auto match_off = best->offset();
auto num_to_write = best->data() - (p + written);
// best->block can be invalidated by this call to add_data()!
add_data(ino, mm, written, num_to_write);
written += num_to_write;
finish_chunk(ino);
ino.add_chunk(block_num, match_off, match_len);
written += match_len;
prog_.saved_by_segmentation += match_len;
offset = written;
if (size - written < window_size_) {
break;
} }
if (best.size() > 0) { hasher.clear();
LOG_TRACE << indent << "mw=[" << mw.first << ", " << mw.last
<< "], best=[" << best.first << ", " << best.last << "]";
// 1) search for smaller blocks on the left recursively for (; offset < written + window_size_; ++offset) {
match_window left(mw.first, best.first); hasher.update(p[offset]);
LOG_TRACE << indent << "left=[" << left.first << ", " << left.last
<< "]";
// 2) save the block number before recursing, as current_block_
// may change before we add the chunk
size_t block_no = current_block_;
// 3) divide and conquer!
segment_and_add_data(indent + " ", hm, ino, data, left, bhcur + 1,
bhend);
// 4) add the block we found
LOG_TRACE << "adding (existing) chunk for inode " << ino->num()
<< " [" << ino->any()->name()
<< "] - offset: " << best_offset
<< ", size: " << best.size();
ino->add_chunk(block_no, best_offset, best.size());
prog_.chunk_count++;
prog_.saved_by_segmentation += best.size();
stats.saved_bytes += best.size();
if (best.size() > stats.largest_block) {
stats.largest_block = best.size();
}
// 5) continue to look for more blocks on the right and
// make sure that we never go back to the left again
mw.first = best.last;
off = mw.first;
LOG_TRACE << indent << "updated mw=[" << mw.first << ", " << mw.last
<< "]";
} else {
++off;
} }
next_hash_offset =
written + lookback_size + blocks_.back().next_hash_distance();
}
matches.clear();
if (match_len > 0) {
continue;
} }
} }
++bhcur; // no matches found, see if we can append data
// we need to keep at least lookback_size bytes unwritten
if (offset == next_hash_offset) {
auto num_to_write = offset - lookback_size - written;
add_data(ino, mm, written, num_to_write);
written += num_to_write;
next_hash_offset += window_step_;
}
hasher.update(p[offset - window_size_], p[offset]);
++offset;
} }
add_data(ino, data + mw.first, mw.size(), &hm); add_data(ino, mm, written, size - written);
} finish_chunk(ino);
template <typename LoggerPolicy>
bool block_manager_<LoggerPolicy>::get_match_window(
const std::string& indent, match_window& win, size_t& block_offset,
const uint8_t* data, const match_window& search_win) const {
const uint8_t* blockdata = block_->data();
LOG_TRACE << indent << "match(block_offset=" << block_offset << ", window=["
<< win.first << ", " << win.last << "], search_win=["
<< search_win.first << ", " << search_win.last << "])";
if (block_offset + win.size() > block_size_) {
LOG_TRACE << indent << "bad match size";
return false;
}
if (::memcmp(blockdata + block_offset, data + win.first, win.size()) != 0) {
LOG_TRACE << indent << "block data mismatch";
return false;
}
// Looking good! Let's see how much we can get out of it...
while (block_offset + win.size() < block_size_ and
win.last < search_win.last and
block_offset + win.size() < block_->size() and
blockdata[block_offset + win.size()] == data[win.last]) {
++win.last;
}
while (win.first > search_win.first and block_offset > 0 and
blockdata[block_offset - 1] == data[win.first - 1]) {
--block_offset;
--win.first;
}
return true;
} }
block_manager::block_manager(logger& lgr, progress& prog, const config& cfg, block_manager::block_manager(logger& lgr, progress& prog, const config& cfg,

View File

@ -1,537 +0,0 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
*/
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <deque>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
#include <fmt/format.h>
#include <sparsehash/dense_hash_map>
#include <folly/stats/Histogram.h>
#include "dwarfs/block_data.h"
#include "dwarfs/block_manager.h"
#include "dwarfs/compiler.h"
#include "dwarfs/cyclic_hash.h"
#include "dwarfs/entry.h"
#include "dwarfs/error.h"
#include "dwarfs/filesystem_writer.h"
#include "dwarfs/inode.h"
#include "dwarfs/logger.h"
#include "dwarfs/mmif.h"
#include "dwarfs/os_access.h"
#include "dwarfs/progress.h"
#include "dwarfs/util.h"
namespace dwarfs {
/**
* Block Manager Strategy
*
* For each *block*, start new rolling hash. The the hashes are associcated
* with the block, new hash-offset-pairs will only be added as the block
* grows. We only need to store a hash-offset-pair every N bytes, with N
* being configurable (typically half of the window size so we find all
* matches of at least 1.5 times the window size, but could also be smaller).
*
* For each *file*, start new rolling hash. Hash values *expire* immediately,
* no history of hash values is kept. Up to n blocks (typically only one)
* will be scanned for matches. Old file data beyond the moving window will
* be added to the current *block*, causing the rolling *block* hash to also
* advance. Data needs to be added to the block only in increments at which
* a new hash valus is computed.
*
* This strategy ensures that we're using a deterministic amount of memory
* (proportional to block size and history block count).
*
* A single window size is sufficient. That window size should still be
* configurable.
*/
struct bm_stats {
bm_stats()
: l2_collision_vec_size(1, 0, 128) {}
size_t total_hashes{0};
size_t l2_collisions{0};
size_t total_matches{0};
size_t good_matches{0};
size_t bad_matches{0};
folly::Histogram<size_t> l2_collision_vec_size;
};
template <typename KeyT, typename ValT, KeyT EmptyKey = KeyT{},
size_t MaxCollInline = 2>
class fast_multimap {
private:
using collision_vector = folly::small_vector<ValT, MaxCollInline>;
using blockhash_t = google::dense_hash_map<KeyT, ValT>;
using collision_t = std::unordered_map<KeyT, collision_vector>;
public:
fast_multimap() { values_.set_empty_key(EmptyKey); }
void insert(KeyT const& key, ValT const& val) {
if (key == EmptyKey or !values_.insert(std::make_pair(key, val)).second) {
collisions_[key].emplace_back(val);
}
}
template <typename F>
void for_each_value(KeyT const& key, F&& func) const {
if (auto it = values_.find(key); it != values_.end()) {
func(it->second);
if (auto it2 = collisions_.find(key); it2 != collisions_.end()) {
for (auto const& val : it2->second) {
func(val);
}
}
}
}
void clear() {
values_.clear();
collisions_.clear();
}
blockhash_t const& values() const { return values_; };
collision_t const& collisions() const { return collisions_; };
private:
blockhash_t values_;
collision_t collisions_;
};
class active_block {
private:
using offset_t = uint32_t;
using hash_t = uint32_t;
public:
active_block(size_t num, size_t size, size_t window_size, size_t window_step)
: num_(num)
, capacity_(size)
, window_size_(window_size)
, window_step_mask_(window_step - 1)
, data_{std::make_shared<block_data>()} {
DWARFS_CHECK((window_step & window_step_mask_) == 0,
"window step size not a power of two");
data_->vec().reserve(capacity_);
}
size_t num() const { return num_; }
size_t size() const { return data_->size(); }
bool full() const { return size() == capacity_; }
std::shared_ptr<block_data> data() const { return data_; }
void append(uint8_t const* p, size_t size);
size_t next_hash_distance() const {
return window_step_mask_ + 1 - (data_->vec().size() & window_step_mask_);
}
template <typename F>
void for_each_offset(hash_t key, F&& func) const {
offsets_.for_each_value(key, std::forward<F>(func));
}
void finalize(bm_stats& stats) {
stats.total_hashes += offsets_.values().size();
for (auto& c : offsets_.collisions()) {
stats.total_hashes += c.second.size();
stats.l2_collisions += c.second.size() - 1;
stats.l2_collision_vec_size.addValue(c.second.size());
}
}
private:
static constexpr size_t num_inline_offsets = 4;
size_t num_, capacity_, window_size_, window_step_mask_;
rsync_hash hasher_;
fast_multimap<hash_t, offset_t, num_inline_offsets> offsets_;
std::shared_ptr<block_data> data_;
};
template <typename LoggerPolicy>
class block_manager_ : public block_manager::impl {
public:
block_manager_(logger& lgr, progress& prog, const block_manager::config& cfg,
std::shared_ptr<os_access> os, filesystem_writer& fsw)
: log_{lgr}
, prog_{prog}
, cfg_{cfg}
, os_{std::move(os)}
, fsw_{fsw}
, window_size_{cfg.blockhash_window_size > 0
? static_cast<size_t>(1) << cfg.blockhash_window_size
: 0}
, window_step_{window_size_ >> cfg.window_increment_shift}
, block_size_{static_cast<size_t>(1) << cfg.block_size_bits} {}
void add_inode(std::shared_ptr<inode> ino) override;
void finish_blocks() override;
private:
struct chunk_state {
size_t offset{0};
size_t size{0};
};
void block_ready();
void finish_chunk(inode& ino);
void append_to_block(inode& ino, mmif& mm, size_t offset, size_t size);
void add_data(inode& ino, mmif& mm, size_t offset, size_t size);
void segment_and_add_data(inode& ino, mmif& mm, size_t size);
log_proxy<LoggerPolicy> log_;
progress& prog_;
const block_manager::config& cfg_;
std::shared_ptr<os_access> os_;
filesystem_writer& fsw_;
size_t const window_size_;
size_t const window_step_;
size_t const block_size_;
size_t block_count_{0};
chunk_state chunk_;
bm_stats stats_;
// Active blocks are blocks that can still be referenced from new chunks.
// Up to N blocks (configurable) can be active and are kept in this queue.
// All active blocks except for the last one are immutable and potentially
// already being compressed.
std::deque<active_block> blocks_;
};
class segment_match {
public:
segment_match(active_block const* blk, uint32_t off) noexcept
: block_{blk}
, offset_{off} {}
void verify_and_extend(uint8_t const* pos, size_t len, uint8_t const* begin,
uint8_t const* end);
bool operator<(segment_match const& rhs) const {
return size_ < rhs.size_ ||
(size_ == rhs.size_ &&
(block_->num() < rhs.block_->num() ||
(block_->num() == rhs.block_->num() && offset_ < rhs.offset_)));
}
uint8_t const* data() const { return data_; }
uint32_t size() const { return size_; }
uint32_t offset() const { return offset_; }
size_t block_num() const { return block_->num(); }
private:
active_block const* block_;
uint32_t offset_;
uint32_t size_{0};
uint8_t const* data_;
};
void active_block::append(uint8_t const* p, size_t size) {
auto& v = data_->vec();
auto offset = v.size();
DWARFS_CHECK(offset + size <= capacity_,
fmt::format("block capacity exceeded: {} + {} > {}", offset,
size, capacity_));
v.resize(offset + size);
::memcpy(v.data() + offset, p, size);
while (offset < v.size()) {
if (offset < window_size_) {
hasher_.update(v[offset++]);
} else {
hasher_.update(v[offset - window_size_], v[offset]);
if ((++offset & window_step_mask_) == 0) {
offsets_.insert(hasher_(), offset - window_size_);
}
}
}
}
void segment_match::verify_and_extend(uint8_t const* pos, size_t len,
uint8_t const* begin,
uint8_t const* end) {
auto const& v = block_->data()->vec();
if (::memcmp(v.data() + offset_, pos, len) == 0) {
// scan backward
auto tmp = offset_;
while (tmp > 0 && pos > begin && v[tmp - 1] == pos[-1]) {
--tmp;
--pos;
}
len += offset_ - tmp;
offset_ = tmp;
data_ = pos;
// scan forward
pos += len;
tmp = offset_ + len;
while (tmp < v.size() && pos < end && v[tmp] == *pos) {
++tmp;
++pos;
}
size_ = tmp - offset_;
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::add_inode(std::shared_ptr<inode> ino) {
auto e = ino->any();
if (size_t size = e->size(); size > 0) {
auto mm = os_->map_file(e->path(), size);
LOG_TRACE << "adding inode " << ino->num() << " [" << ino->any()->name()
<< "] - size: " << size;
if (window_size_ == 0 or size < window_size_) {
// no point dealing with hashing, just write it out
add_data(*ino, *mm, 0, size);
finish_chunk(*ino);
} else {
segment_and_add_data(*ino, *mm, size);
}
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::finish_blocks() {
if (!blocks_.empty()) {
block_ready();
}
auto l1_collisions = stats_.l2_collision_vec_size.computeTotalCount();
LOG_INFO << "segmentation matches: good=" << stats_.good_matches
<< ", bad=" << stats_.bad_matches
<< ", total=" << stats_.total_matches;
LOG_INFO << "segmentation collisions: L1="
<< fmt::format("{:.3f}%",
100.0 * (l1_collisions + stats_.l2_collisions) /
stats_.total_hashes)
<< ", L2="
<< fmt::format("{:.3f}%",
100.0 * stats_.l2_collisions / stats_.total_hashes)
<< " [" << stats_.total_hashes << " hashes]";
if (l1_collisions > 0) {
auto pct = [&](double p) {
return stats_.l2_collision_vec_size.getPercentileEstimate(p);
};
LOG_DEBUG << "collision vector size p50: " << pct(0.5)
<< ", p75: " << pct(0.75) << ", p90: " << pct(0.9)
<< ", p95: " << pct(0.95) << ", p99: " << pct(0.99);
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::block_ready() {
auto& block = blocks_.back();
block.finalize(stats_);
fsw_.write_block(block.data());
++prog_.block_count;
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::append_to_block(inode& ino, mmif& mm,
size_t offset, size_t size) {
if (DWARFS_UNLIKELY(blocks_.empty() or blocks_.back().full())) {
if (blocks_.size() >= cfg_.max_active_blocks) {
blocks_.pop_front();
}
blocks_.emplace_back(block_count_++, block_size_, window_size_,
window_step_);
}
auto& block = blocks_.back();
block.append(mm.as<uint8_t>(offset), size);
chunk_.size += size;
prog_.filesystem_size += size;
if (block.full()) {
mm.release_until(offset + size);
finish_chunk(ino);
block_ready();
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::add_data(inode& ino, mmif& mm, size_t offset,
size_t size) {
while (size > 0) {
size_t block_offset = 0;
if (!blocks_.empty()) {
block_offset = blocks_.back().size();
}
size_t chunk_size = std::min(size, block_size_ - block_offset);
append_to_block(ino, mm, offset, chunk_size);
offset += chunk_size;
size -= chunk_size;
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::finish_chunk(inode& ino) {
if (chunk_.size > 0) {
auto& block = blocks_.back();
ino.add_chunk(block.num(), chunk_.offset, chunk_.size);
chunk_.offset = block.full() ? 0 : block.size();
chunk_.size = 0;
prog_.chunk_count++;
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::segment_and_add_data(inode& ino, mmif& mm,
size_t size) {
rsync_hash hasher;
size_t offset = 0;
size_t written = 0;
size_t lookback_size = window_size_ + window_step_;
size_t next_hash_offset =
lookback_size +
(blocks_.empty() ? window_step_ : blocks_.back().next_hash_distance());
auto p = mm.as<uint8_t>();
DWARFS_CHECK(size >= window_size_, "unexpected call to segment_and_add_data");
for (; offset < window_size_; ++offset) {
hasher.update(p[offset]);
}
std::vector<segment_match> matches;
while (offset < size) {
for (auto const& block : blocks_) {
block.for_each_offset(hasher(), [&](uint32_t offset) {
matches.emplace_back(&block, offset);
});
}
if (!matches.empty()) {
LOG_TRACE << "found " << matches.size() << " matches (hash=" << hasher()
<< ", window size=" << window_size_ << ")";
for (auto& m : matches) {
LOG_TRACE << " block " << m.block_num() << " @ " << m.offset();
m.verify_and_extend(p + offset - window_size_, window_size_,
p + written, p + size);
}
stats_.total_matches += matches.size();
stats_.bad_matches +=
std::count_if(matches.begin(), matches.end(),
[](auto const& m) { return m.size() == 0; });
auto best = std::max_element(matches.begin(), matches.end());
auto match_len = best->size();
if (match_len > 0) {
++stats_.good_matches;
LOG_TRACE << "successful match of length " << match_len << " @ "
<< best->offset();
auto block_num = best->block_num();
auto match_off = best->offset();
auto num_to_write = best->data() - (p + written);
// best->block can be invalidated by this call to add_data()!
add_data(ino, mm, written, num_to_write);
written += num_to_write;
finish_chunk(ino);
ino.add_chunk(block_num, match_off, match_len);
written += match_len;
prog_.saved_by_segmentation += match_len;
offset = written;
if (size - written < window_size_) {
break;
}
hasher.clear();
for (; offset < written + window_size_; ++offset) {
hasher.update(p[offset]);
}
next_hash_offset =
written + lookback_size + blocks_.back().next_hash_distance();
}
matches.clear();
if (match_len > 0) {
continue;
}
}
// no matches found, see if we can append data
// we need to keep at least lookback_size bytes unwritten
if (offset == next_hash_offset) {
auto num_to_write = offset - lookback_size - written;
add_data(ino, mm, written, num_to_write);
written += num_to_write;
next_hash_offset += window_step_;
}
hasher.update(p[offset - window_size_], p[offset]);
++offset;
}
add_data(ino, mm, written, size - written);
finish_chunk(ino);
}
block_manager::block_manager(logger& lgr, progress& prog, const config& cfg,
std::shared_ptr<os_access> os,
filesystem_writer& fsw)
: impl_(make_unique_logging_object<impl, block_manager_, logger_policies>(
lgr, prog, cfg, os, fsw)) {}
} // namespace dwarfs