New block manager implementation

This commit is contained in:
Marcus Holland-Moritz 2021-03-02 21:05:41 +01:00
parent 17d7b8d96c
commit d29faef726
12 changed files with 554 additions and 13 deletions

View File

@ -192,7 +192,7 @@ list(
LIBDWARFS_SRC
src/dwarfs/block_cache.cpp
src/dwarfs/block_compressor.cpp
src/dwarfs/block_manager.cpp
src/dwarfs/block_manager_v2.cpp
src/dwarfs/checksum.cpp
src/dwarfs/console_writer.cpp
src/dwarfs/entry.cpp

View File

@ -36,12 +36,12 @@ class progress;
class block_manager {
public:
struct config {
config();
// TODO: remove vector and use single window size
std::vector<size_t> blockhash_window_size;
unsigned window_increment_shift;
size_t memory_limit;
unsigned block_size_bits;
unsigned window_increment_shift{1};
size_t max_active_blocks{1};
size_t memory_limit{256 << 20};
unsigned block_size_bits{22};
};
block_manager(logger& lgr, progress& prog, const config& cfg,

30
include/dwarfs/compiler.h Normal file
View File

@ -0,0 +1,30 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#if defined(__GNUC__)
#define DWARFS_LIKELY(x) __builtin_expect((x), 1)
#define DWARFS_UNLIKELY(x) __builtin_expect((x), 0)
#else
#define DWARFS_LIKELY(x) (x)
#define DWARFS_UNLIKELY(x) (x)
#endif

View File

@ -44,6 +44,12 @@ class rsync_hash {
b_ += a_;
}
void clear() {
a_ = 0;
b_ = 0;
len_ = 0;
}
private:
uint16_t a_{0};
uint16_t b_{0};

View File

@ -40,6 +40,7 @@ class mmap : public mmif {
boost::system::error_code lock(off_t offset, size_t size) override;
boost::system::error_code release(off_t offset, size_t size) override;
boost::system::error_code release_until(off_t offset) override;
private:
int fd_;

View File

@ -49,5 +49,6 @@ class mmif : public boost::noncopyable {
virtual boost::system::error_code lock(off_t offset, size_t size) = 0;
virtual boost::system::error_code release(off_t offset, size_t size) = 0;
virtual boost::system::error_code release_until(off_t offset) = 0;
};
} // namespace dwarfs

View File

@ -27,4 +27,5 @@
namespace dwarfs {
uint32_t get_similarity_hash(const uint8_t* data, size_t size);
}

View File

@ -180,11 +180,6 @@ class block_manager_ : public block_manager::impl {
std::map<size_t, bm_stats> stats_;
};
block_manager::config::config()
: window_increment_shift(1)
, memory_limit(256 << 20)
, block_size_bits(22) {}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::finish_blocks() {
if (!block_->empty()) {

View File

@ -0,0 +1,481 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
*/
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <deque>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
#include <fmt/format.h>
#include <sparsehash/dense_hash_map>
#include "dwarfs/block_data.h"
#include "dwarfs/block_manager.h"
#include "dwarfs/compiler.h"
#include "dwarfs/cyclic_hash.h"
#include "dwarfs/entry.h"
#include "dwarfs/error.h"
#include "dwarfs/filesystem_writer.h"
#include "dwarfs/inode.h"
#include "dwarfs/logger.h"
#include "dwarfs/mmif.h"
#include "dwarfs/os_access.h"
#include "dwarfs/progress.h"
#include "dwarfs/util.h"
namespace dwarfs {
/**
* Block Manager Strategy
*
* For each *block*, start new rolling hash. The the hashes are associcated
* with the block, new hash-offset-pairs will only be added as the block
* grows. We only need to store a hash-offset-pair every N bytes, with N
* being configurable (typically half of the window size so we find all
* matches of at least 1.5 times the window size, but could also be smaller).
*
* For each *file*, start new rolling hash. Hash values *expire* immediately,
* no history of hash values is kept. Up to n blocks (typically only one)
* will be scanned for matches. Old file data beyond the moving window will
* be added to the current *block*, causing the rolling *block* hash to also
* advance. Data needs to be added to the block only in increments at which
* a new hash valus is computed.
*
* This strategy ensures that we're using a deterministic amount of memory
* (proportional to block size and history block count).
*
* A single window size is sufficient. That window size should still be
* configurable.
*/
template <typename KeyT, typename ValT, KeyT EmptyKey = KeyT{},
size_t MaxCollInline = 2>
class fast_multimap {
private:
using collision_vector = folly::small_vector<ValT, MaxCollInline>;
using blockhash_t = google::dense_hash_map<KeyT, ValT>;
using collision_t = std::unordered_map<KeyT, collision_vector>;
public:
fast_multimap() { values_.set_empty_key(EmptyKey); }
void insert(KeyT const& key, ValT const& val) {
if (key == EmptyKey or !values_.insert(std::make_pair(key, val)).second) {
collisions_[key].emplace_back(val);
}
}
template <typename F>
void for_each_value(KeyT const& key, F&& func) const {
if (auto it = values_.find(key); it != values_.end()) {
func(it->second);
if (auto it2 = collisions_.find(key); it2 != collisions_.end()) {
for (auto const& val : it2->second) {
func(val);
}
}
}
}
void clear() {
values_.clear();
collisions_.clear();
}
private:
blockhash_t values_;
collision_t collisions_;
};
class active_block {
private:
using offset_t = uint32_t;
using hash_t = uint32_t;
public:
active_block(size_t num, size_t size, size_t window_size, size_t window_step)
: num_(num)
, capacity_(size)
, window_size_(window_size)
, window_step_mask_(window_step - 1)
, data_{std::make_shared<block_data>()} {
DWARFS_CHECK((window_step & window_step_mask_) == 0,
"window step size not a power of two");
data_->vec().reserve(capacity_);
}
size_t num() const { return num_; }
size_t size() const { return data_->size(); }
bool full() const { return size() == capacity_; }
std::shared_ptr<block_data> data() const { return data_; }
void append(uint8_t const* p, size_t size);
size_t next_hash_distance() const {
return window_step_mask_ + 1 - (data_->vec().size() & window_step_mask_);
}
template <typename F>
void for_each_offset(hash_t key, F&& func) const {
offsets_.for_each_value(key, std::forward<F>(func));
}
private:
size_t num_, capacity_, window_size_, window_step_mask_;
rsync_hash hasher_;
fast_multimap<hash_t, offset_t> offsets_;
std::shared_ptr<block_data> data_;
};
template <typename LoggerPolicy>
class block_manager_ : public block_manager::impl {
public:
block_manager_(logger& lgr, progress& prog, const block_manager::config& cfg,
std::shared_ptr<os_access> os, filesystem_writer& fsw)
: log_{lgr}
, prog_{prog}
, cfg_{cfg}
, os_{std::move(os)}
, fsw_{fsw}
, window_size_{cfg.blockhash_window_size.empty()
? 0
: cfg.blockhash_window_size.front()}
, window_step_{window_size_ >> cfg.window_increment_shift}
, block_size_{static_cast<size_t>(1) << cfg.block_size_bits} {}
void add_inode(std::shared_ptr<inode> ino) override;
void finish_blocks() override;
size_t total_size() const override { return 0; } // TODO
size_t total_blocks() const override { return 0; } // TODO
private:
struct chunk_state {
size_t offset{0};
size_t size{0};
};
void block_ready();
void finish_chunk(inode& ino);
void append_to_block(inode& ino, mmif& mm, size_t offset, size_t size);
void add_data(inode& ino, mmif& mm, size_t offset, size_t size);
void segment_and_add_data(inode& ino, mmif& mm, size_t size);
log_proxy<LoggerPolicy> log_;
progress& prog_;
const block_manager::config& cfg_;
std::shared_ptr<os_access> os_;
filesystem_writer& fsw_;
size_t const window_size_;
size_t const window_step_;
size_t const block_size_;
size_t block_count_{0};
chunk_state chunk_;
// Active blocks are blocks that can still be referenced from new chunks.
// Up to N blocks (configurable) can be active and are kept in this queue.
// All active blocks except for the last one are immutable and potentially
// already being compressed.
std::deque<active_block> blocks_;
};
class segment_match {
public:
segment_match(active_block const* blk, uint32_t off) noexcept
: block_{blk}
, offset_{off} {}
void verify_and_extend(uint8_t const* pos, size_t len, uint8_t const* begin,
uint8_t const* end);
bool operator<(segment_match const& rhs) const {
return size_ < rhs.size_ ||
(size_ == rhs.size_ &&
(block_->num() < rhs.block_->num() ||
(block_->num() == rhs.block_->num() && offset_ < rhs.offset_)));
}
uint8_t const* data() const { return data_; }
uint32_t size() const { return size_; }
uint32_t offset() const { return offset_; }
size_t block_num() const { return block_->num(); }
private:
active_block const* block_;
uint32_t offset_;
uint32_t size_{0};
uint8_t const* data_;
};
void active_block::append(uint8_t const* p, size_t size) {
auto& v = data_->vec();
auto offset = v.size();
DWARFS_CHECK(offset + size <= capacity_,
fmt::format("block capacity exceeded: {} + {} > {}", offset,
size, capacity_));
v.resize(offset + size);
::memcpy(v.data() + offset, p, size);
while (offset < v.size()) {
if (offset < window_size_) {
hasher_.update(v[offset++]);
} else {
hasher_.update(v[offset - window_size_], v[offset]);
if ((++offset & window_step_mask_) == 0) {
offsets_.insert(hasher_(), offset - window_size_);
}
}
}
}
void segment_match::verify_and_extend(uint8_t const* pos, size_t len,
uint8_t const* begin,
uint8_t const* end) {
auto const& v = block_->data()->vec();
if (::memcmp(v.data() + offset_, pos, len) == 0) {
// scan backward
auto tmp = offset_;
while (tmp > 0 && pos > begin && v[tmp - 1] == pos[-1]) {
--tmp;
--pos;
}
len += offset_ - tmp;
offset_ = tmp;
data_ = pos;
// scan forward
pos += len;
tmp = offset_ + len;
while (tmp < v.size() && pos < end && v[tmp] == *pos) {
++tmp;
++pos;
}
size_ = tmp - offset_;
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::add_inode(std::shared_ptr<inode> ino) {
auto e = ino->any();
if (size_t size = e->size(); size > 0) {
auto mm = os_->map_file(e->path(), size);
LOG_TRACE << "adding inode " << ino->num() << " [" << ino->any()->name()
<< "] - size: " << size;
if (window_size_ == 0 or size < window_size_) {
// no point dealing with hashes, just write it out
add_data(*ino, *mm, 0, size);
finish_chunk(*ino);
} else {
segment_and_add_data(*ino, *mm, size);
}
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::finish_blocks() {
if (!blocks_.empty()) {
block_ready();
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::block_ready() {
fsw_.write_block(blocks_.back().data());
++prog_.block_count;
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::append_to_block(inode& ino, mmif& mm,
size_t offset, size_t size) {
if (DWARFS_UNLIKELY(blocks_.empty() or blocks_.back().full())) {
if (blocks_.size() >= cfg_.max_active_blocks) {
blocks_.pop_front();
}
blocks_.emplace_back(block_count_++, block_size_, window_size_,
window_step_);
}
auto& block = blocks_.back();
block.append(mm.as<uint8_t>(offset), size);
chunk_.size += size;
prog_.filesystem_size += size;
if (block.full()) {
mm.release_until(offset + size);
finish_chunk(ino);
block_ready();
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::add_data(inode& ino, mmif& mm, size_t offset,
size_t size) {
while (size > 0) {
size_t block_offset = 0;
if (!blocks_.empty()) {
block_offset = blocks_.back().size();
}
size_t chunk_size = std::min(size, block_size_ - block_offset);
append_to_block(ino, mm, offset, chunk_size);
offset += chunk_size;
size -= chunk_size;
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::finish_chunk(inode& ino) {
if (chunk_.size > 0) {
auto& block = blocks_.back();
ino.add_chunk(block.num(), chunk_.offset, chunk_.size);
chunk_.offset = block.full() ? 0 : block.size();
chunk_.size = 0;
prog_.chunk_count++;
}
}
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::segment_and_add_data(inode& ino, mmif& mm,
size_t size) {
rsync_hash hasher;
size_t offset = 0;
size_t written = 0;
size_t lookback_size = window_size_ + window_step_;
size_t next_hash_offset =
lookback_size +
(blocks_.empty() ? window_step_ : blocks_.back().next_hash_distance());
auto p = mm.as<uint8_t>();
DWARFS_CHECK(size >= window_size_, "unexpected call to segment_and_add_data");
for (; offset < window_size_; ++offset) {
hasher.update(p[offset]);
}
std::vector<segment_match> matches;
while (offset < size) {
for (auto const& block : blocks_) {
block.for_each_offset(hasher(), [&](uint32_t offset) {
matches.emplace_back(&block, offset);
});
}
if (!matches.empty()) {
// TODO: verify & extend matches, find longest match
LOG_TRACE << "found " << matches.size() << " matches (hash=" << hasher()
<< ", window size=" << window_size_ << ")";
for (auto& m : matches) {
LOG_TRACE << " @" << m.offset();
m.verify_and_extend(p + offset - window_size_, window_size_,
p + written, p + size);
}
auto best = std::max_element(matches.begin(), matches.end());
auto match_len = best->size();
if (match_len > 0) {
LOG_DEBUG << "successful match of length " << match_len << " @ "
<< best->offset();
auto block_num = best->block_num();
auto match_off = best->offset();
auto num_to_write = best->data() - (p + written);
// best->block can be invalidated by this call to add_data()!
add_data(ino, mm, written, num_to_write);
written += num_to_write;
finish_chunk(ino);
ino.add_chunk(block_num, match_off, match_len);
written += match_len;
prog_.saved_by_segmentation += match_len;
offset = written;
if (size - written < window_size_) {
break;
}
hasher.clear();
for (; offset < written + window_size_; ++offset) {
hasher.update(p[offset]);
}
next_hash_offset =
written + lookback_size + blocks_.back().next_hash_distance();
}
matches.clear();
if (match_len > 0) {
continue;
}
}
// no matches found, see if we can append data
// we need to keep at least lookback_size bytes unwritten
if (offset == next_hash_offset) {
auto num_to_write = offset - lookback_size - written;
add_data(ino, mm, written, num_to_write);
written += num_to_write;
next_hash_offset += window_step_;
}
hasher.update(p[offset - window_size_], p[offset]);
++offset;
}
add_data(ino, mm, written, size - written);
finish_chunk(ino);
}
block_manager::block_manager(logger& lgr, progress& prog, const config& cfg,
std::shared_ptr<os_access> os,
filesystem_writer& fsw)
: impl_(make_unique_logging_object<impl, block_manager_, logger_policies>(
lgr, prog, cfg, os, fsw)) {}
} // namespace dwarfs

View File

@ -166,10 +166,22 @@ void file::scan(os_access& os, progress& prog) {
static_assert(checksum::digest_size(alg) == sizeof(data::hash_type));
if (size_t s = size(); s > 0) {
constexpr size_t chunk_size = 16 << 20;
prog.original_size += s;
auto mm = os.map_file(path(), s);
DWARFS_CHECK(checksum::compute(alg, mm->as<void>(), s, &data_->hash[0]),
"checksum computation failed");
checksum cs(alg);
size_t offset = 0;
while (s >= chunk_size) {
cs.update(mm->as<void>(offset), chunk_size);
mm->release_until(offset);
offset += chunk_size;
s -= chunk_size;
}
cs.update(mm->as<void>(offset), s);
DWARFS_CHECK(cs.finalize(&data_->hash[0]), "checksum computation failed");
} else {
DWARFS_CHECK(checksum::compute(alg, nullptr, 0, &data_->hash[0]),
"checksum computation failed");

View File

@ -90,6 +90,17 @@ boost::system::error_code mmap::release(off_t offset, size_t size) {
return ec;
}
boost::system::error_code mmap::release_until(off_t offset) {
boost::system::error_code ec;
offset -= offset % page_size_;
if (::madvise(addr_, offset, MADV_DONTNEED) != 0) {
ec.assign(errno, boost::system::generic_category());
}
return ec;
}
void const* mmap::addr() const { return addr_; }
size_t mmap::size() const { return size_; }

View File

@ -39,6 +39,9 @@ class mmap_mock : public mmif {
boost::system::error_code release(off_t, size_t) override {
return boost::system::error_code();
}
boost::system::error_code release_until(off_t) override {
return boost::system::error_code();
}
private:
const std::string m_data;