add block_manager

This commit is contained in:
Marcus Holland-Moritz 2023-08-13 20:43:46 +02:00
parent 7b591c741f
commit 8fa157bf92
21 changed files with 460 additions and 278 deletions

View File

@ -356,6 +356,7 @@ list(
src/dwarfs/block_cache.cpp
src/dwarfs/block_compressor.cpp
src/dwarfs/block_compressor_parser.cpp
src/dwarfs/block_manager.cpp
src/dwarfs/block_range.cpp
src/dwarfs/builtin_script.cpp
src/dwarfs/cached_block.cpp

View File

@ -0,0 +1,45 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <mutex>
#include <vector>
#include "dwarfs/gen-cpp2/metadata_types.h"
namespace dwarfs {
class block_manager {
public:
using chunk_type = thrift::metadata::chunk;
size_t get_logical_block() const;
void set_written_block(size_t logical_block, size_t written_block);
void map_logical_blocks(std::vector<chunk_type>& vec);
private:
std::mutex mutable mx_;
size_t mutable num_blocks_{0};
std::vector<size_t> block_map_;
};
} // namespace dwarfs

View File

@ -28,6 +28,7 @@
#include <span>
#include <utility>
#include "dwarfs/fragment_category.h"
#include "dwarfs/fstypes.h"
#include "dwarfs/options.h"
#include "dwarfs/worker_group.h"
@ -44,23 +45,27 @@ class filesystem_writer {
public:
filesystem_writer(
std::ostream& os, logger& lgr, worker_group& wg, progress& prog,
const block_compressor& bc,
const block_compressor& schema_bc, const block_compressor& metadata_bc,
filesystem_writer_options const& options = filesystem_writer_options(),
std::istream* header = nullptr);
filesystem_writer(std::ostream& os, logger& lgr, worker_group& wg,
progress& prog, const block_compressor& bc,
const block_compressor& schema_bc,
const block_compressor& metadata_bc,
filesystem_writer_options const& options,
std::istream* header = nullptr);
void add_default_compressor(block_compressor bc) {
impl_->add_default_compressor(std::move(bc));
}
void add_category_compressor(fragment_category::value_type cat,
block_compressor bc) {
impl_->add_category_compressor(cat, std::move(bc));
}
void copy_header(std::span<uint8_t const> header) {
impl_->copy_header(header);
}
void write_block(std::shared_ptr<block_data>&& data) {
impl_->write_block(std::move(data));
uint32_t write_block(fragment_category::value_type cat,
std::shared_ptr<block_data>&& data,
std::optional<std::string> meta = std::nullopt) {
return impl_->write_block(cat, std::move(data), std::move(meta));
}
void write_metadata_v2_schema(std::shared_ptr<block_data>&& data) {
@ -80,14 +85,17 @@ class filesystem_writer {
size_t size() const { return impl_->size(); }
int queue_fill() const { return impl_->queue_fill(); }
class impl {
public:
virtual ~impl() = default;
virtual void add_default_compressor(block_compressor bc) = 0;
virtual void add_category_compressor(fragment_category::value_type cat,
block_compressor bc) = 0;
virtual void copy_header(std::span<uint8_t const> header) = 0;
virtual void write_block(std::shared_ptr<block_data>&& data) = 0;
virtual uint32_t write_block(fragment_category::value_type cat,
std::shared_ptr<block_data>&& data,
std::optional<std::string> meta) = 0;
virtual void
write_metadata_v2_schema(std::shared_ptr<block_data>&& data) = 0;
virtual void write_metadata_v2(std::shared_ptr<block_data>&& data) = 0;
@ -96,7 +104,6 @@ class filesystem_writer {
std::span<uint8_t const> data) = 0;
virtual void flush() = 0;
virtual size_t size() const = 0;
virtual int queue_fill() const = 0;
};
private:

View File

@ -36,7 +36,7 @@ class fragment_chunkable : public chunkable {
public:
fragment_chunkable(inode const& ino, single_inode_fragment& frag,
file_off_t offset, mmif& mm,
categorizer_manager const& catmgr);
categorizer_manager const* catmgr);
~fragment_chunkable();
size_t size() const override;
@ -50,7 +50,7 @@ class fragment_chunkable : public chunkable {
single_inode_fragment& frag_;
file_off_t offset_;
mmif& mm_;
categorizer_manager const& catmgr_;
categorizer_manager const* catmgr_;
};
} // namespace dwarfs

View File

@ -48,6 +48,7 @@ class inode : public object {
using files_vector = folly::small_vector<file*, 1>;
virtual void set_files(files_vector&& fv) = 0;
virtual void populate(size_t size) = 0;
virtual void scan(mmif* mm, inode_options const& options) = 0;
virtual void set_num(uint32_t num) = 0;
virtual uint32_t num() const = 0;
@ -63,7 +64,7 @@ class inode : public object {
virtual void add_chunk(size_t block, size_t offset, size_t size) = 0;
virtual void
append_chunks_to(std::vector<thrift::metadata::chunk>& vec) const = 0;
virtual inode_fragments const& fragments() const = 0;
virtual inode_fragments& fragments() = 0;
virtual void dump(std::ostream& os, inode_options const& options) const = 0;
};

View File

@ -54,10 +54,6 @@ class inode_manager {
size_t count() const { return impl_->count(); }
void order_inodes(worker_group& wg, inode_cb const& fn) {
impl_->order_inodes(wg, fn);
}
void for_each_inode_in_order(inode_cb const& fn) const {
impl_->for_each_inode_in_order(fn);
}
@ -80,13 +76,17 @@ class inode_manager {
sortable_inode_span sortable_span() const { return impl_->sortable_span(); }
sortable_inode_span
ordered_span(fragment_category cat, worker_group& wg) const {
return impl_->ordered_span(cat, wg);
}
class impl {
public:
virtual ~impl() = default;
virtual std::shared_ptr<inode> create_inode() = 0;
virtual size_t count() const = 0;
virtual void order_inodes(worker_group& wg, inode_cb const& fn) = 0;
virtual void for_each_inode_in_order(
std::function<void(std::shared_ptr<inode> const&)> const& fn) const = 0;
virtual std::vector<std::pair<fragment_category::value_type, size_t>>
@ -97,6 +97,8 @@ class inode_manager {
file const* p) const = 0;
virtual void dump(std::ostream& os) const = 0;
virtual sortable_inode_span sortable_span() const = 0;
virtual sortable_inode_span
ordered_span(fragment_category cat, worker_group& wg) const = 0;
};
private:

View File

@ -71,6 +71,8 @@ class progress {
std::atomic<size_t> chunk_count{0};
std::atomic<size_t> inodes_scanned{0};
std::atomic<size_t> inodes_written{0};
std::atomic<size_t> fragments_found{0};
std::atomic<size_t> fragments_written{0};
std::atomic<size_t> blocks_written{0};
std::atomic<size_t> errors{0};
std::atomic<size_t> nilsimsa_depth{0};

View File

@ -25,10 +25,13 @@
#include <memory>
#include <vector>
#include <folly/Function.h>
namespace dwarfs {
class block_data;
class block_manager;
class chunkable;
class filesystem_writer;
class logger;
class progress;
@ -43,8 +46,10 @@ class segmenter {
unsigned bloom_filter_size{4};
};
segmenter(logger& lgr, progress& prog, const config& cfg,
filesystem_writer& fsw);
using block_ready_cb = folly::Function<size_t(std::shared_ptr<block_data>)>;
segmenter(logger& lgr, progress& prog, std::shared_ptr<block_manager> blkmgr,
const config& cfg, block_ready_cb block_ready);
void add_chunkable(chunkable& chkable) { impl_->add_chunkable(chkable); }

View File

@ -0,0 +1,56 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
*/
#include <cassert>
#include "dwarfs/block_manager.h"
namespace dwarfs {
size_t block_manager::get_logical_block() const {
size_t block_no;
{
std::lock_guard lock{mx_};
block_no = num_blocks_++;
}
return block_no;
}
void block_manager::set_written_block(size_t logical_block,
size_t written_block) {
std::lock_guard lock{mx_};
assert(logical_block < num_blocks_);
if (block_map_.size() < num_blocks_) {
block_map_.resize(num_blocks_);
}
block_map_[logical_block] = written_block;
}
void block_manager::map_logical_blocks(std::vector<chunk_type>& vec) {
std::lock_guard lock{mx_};
for (auto& c : vec) {
size_t block = c.get_block();
assert(block < num_blocks_);
c.block() = block_map_[block];
}
}
} // namespace dwarfs

View File

@ -177,9 +177,16 @@ void console_writer::update(const progress& p, bool last) {
<< size_with_unit(p.saved_by_segmentation) << newline
<< "filesystem: " << size_with_unit(p.filesystem_size) << " in "
<< p.block_count << " blocks (" << p.chunk_count << " chunks, "
<< (p.inodes_written > 0 ? p.inodes_written : p.inodes_scanned) << "/"
<< p.files_found - p.duplicate_files - p.hardlinks << " inodes)"
<< p.block_count << " blocks (" << p.chunk_count << " chunks, ";
if (p.fragments_written > 0) {
oss << p.fragments_written << "/" << p.fragments_found
<< " fragments, ";
} else {
oss << p.fragments_found << " fragments, " << p.inodes_scanned << "/";
}
oss << p.files_found - p.duplicate_files - p.hardlinks << " inodes)"
<< newline
<< "compressed filesystem: " << p.blocks_written << " blocks/"

View File

@ -33,6 +33,7 @@
#include "dwarfs/block_cache.h"
#include "dwarfs/block_compressor.h"
#include "dwarfs/block_data.h"
#include "dwarfs/categorizer.h"
#include "dwarfs/error.h"
#include "dwarfs/filesystem_v2.h"
#include "dwarfs/filesystem_writer.h"
@ -701,7 +702,9 @@ void filesystem_v2::rewrite(logger& lgr, progress& prog,
auto block =
std::make_shared<block_data>(block_decompressor::decompress(
s->compression(), mm->as<uint8_t>(s->start()), s->length()));
writer.write_block(std::move(block));
// TODO: re-write with different categories
writer.write_block(categorizer_manager::default_category().value(),
std::move(block));
} else {
writer.write_compressed_section(s->type(), s->compression(),
s->data(*mm));

View File

@ -26,7 +26,9 @@
#include <deque>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
#include <unordered_map>
#include <folly/system/ThreadName.h>
@ -52,10 +54,14 @@ class fsblock {
fsblock(section_type type, compression_type compression,
std::span<uint8_t const> data, uint32_t number);
void compress(worker_group& wg) { impl_->compress(wg); }
void
compress(worker_group& wg, std::optional<std::string> meta = std::nullopt) {
impl_->compress(wg, std::move(meta));
}
void wait_until_compressed() { impl_->wait_until_compressed(); }
section_type type() const { return impl_->type(); }
compression_type compression() const { return impl_->compression(); }
std::string description() const { return impl_->description(); }
std::span<uint8_t const> data() const { return impl_->data(); }
size_t uncompressed_size() const { return impl_->uncompressed_size(); }
size_t size() const { return impl_->size(); }
@ -66,10 +72,12 @@ class fsblock {
public:
virtual ~impl() = default;
virtual void compress(worker_group& wg) = 0;
virtual void
compress(worker_group& wg, std::optional<std::string> meta) = 0;
virtual void wait_until_compressed() = 0;
virtual section_type type() const = 0;
virtual compression_type compression() const = 0;
virtual std::string description() const = 0;
virtual std::span<uint8_t const> data() const = 0;
virtual size_t uncompressed_size() const = 0;
virtual size_t size() const = 0;
@ -95,14 +103,20 @@ class raw_fsblock : public fsblock::impl {
, number_{number}
, comp_type_{bc_.type()} {}
void compress(worker_group& wg) override {
void compress(worker_group& wg, std::optional<std::string> meta) override {
std::promise<void> prom;
future_ = prom.get_future();
wg.add_job([this, prom = std::move(prom)]() mutable {
wg.add_job([this, prom = std::move(prom),
meta = std::move(meta)]() mutable {
try {
// TODO: metadata
auto tmp = std::make_shared<block_data>(bc_.compress(data_->vec()));
std::shared_ptr<block_data> tmp;
if (meta) {
tmp = std::make_shared<block_data>(bc_.compress(data_->vec(), *meta));
} else {
tmp = std::make_shared<block_data>(bc_.compress(data_->vec()));
}
{
std::lock_guard lock(mx_);
@ -124,6 +138,8 @@ class raw_fsblock : public fsblock::impl {
compression_type compression() const override { return comp_type_; }
std::string description() const override { return bc_.describe(); }
std::span<uint8_t const> data() const override { return data_->vec(); }
size_t uncompressed_size() const override { return uncompressed_size_; }
@ -158,7 +174,8 @@ class compressed_fsblock : public fsblock::impl {
, range_{range}
, number_{number} {}
void compress(worker_group& wg) override {
void
compress(worker_group& wg, std::optional<std::string> /* meta */) override {
std::promise<void> prom;
future_ = prom.get_future();
@ -173,6 +190,9 @@ class compressed_fsblock : public fsblock::impl {
section_type type() const override { return type_; }
compression_type compression() const override { return compression_; }
// TODO
std::string description() const override { return "<compressed>"; }
std::span<uint8_t const> data() const override { return range_; }
size_t uncompressed_size() const override { return range_.size(); }
@ -229,26 +249,32 @@ template <typename LoggerPolicy>
class filesystem_writer_ final : public filesystem_writer::impl {
public:
filesystem_writer_(logger& lgr, std::ostream& os, worker_group& wg,
progress& prog, const block_compressor& bc,
const block_compressor& schema_bc,
progress& prog, const block_compressor& schema_bc,
const block_compressor& metadata_bc,
filesystem_writer_options const& options,
std::istream* header);
~filesystem_writer_() noexcept override;
void add_default_compressor(block_compressor bc) override;
void add_category_compressor(fragment_category::value_type cat,
block_compressor bc) override;
void copy_header(std::span<uint8_t const> header) override;
void write_block(std::shared_ptr<block_data>&& data) override;
uint32_t write_block(fragment_category::value_type cat,
std::shared_ptr<block_data>&& data,
std::optional<std::string> meta) override;
void write_metadata_v2_schema(std::shared_ptr<block_data>&& data) override;
void write_metadata_v2(std::shared_ptr<block_data>&& data) override;
void write_compressed_section(section_type type, compression_type compression,
std::span<uint8_t const> data) override;
void flush() override;
size_t size() const override { return os_.tellp(); }
int queue_fill() const override { return static_cast<int>(wg_.queue_size()); }
private:
void write_section(section_type type, std::shared_ptr<block_data>&& data,
block_compressor const& bc);
block_compressor const&
compressor_for_category(fragment_category::value_type cat) const;
uint32_t write_section(section_type type, std::shared_ptr<block_data>&& data,
block_compressor const& bc,
std::optional<std::string> meta = std::nullopt);
void write(fsblock const& fsb);
void write(const char* data, size_t size);
template <typename T>
@ -263,7 +289,8 @@ class filesystem_writer_ final : public filesystem_writer::impl {
std::istream* header_;
worker_group& wg_;
progress& prog_;
const block_compressor& bc_;
std::optional<block_compressor> default_bc_;
std::unordered_map<fragment_category::value_type, block_compressor> bc_;
const block_compressor& schema_bc_;
const block_compressor& metadata_bc_;
const filesystem_writer_options options_;
@ -281,14 +308,12 @@ class filesystem_writer_ final : public filesystem_writer::impl {
template <typename LoggerPolicy>
filesystem_writer_<LoggerPolicy>::filesystem_writer_(
logger& lgr, std::ostream& os, worker_group& wg, progress& prog,
const block_compressor& bc, const block_compressor& schema_bc,
const block_compressor& metadata_bc,
const block_compressor& schema_bc, const block_compressor& metadata_bc,
filesystem_writer_options const& options, std::istream* header)
: os_(os)
, header_(header)
, wg_(wg)
, prog_(prog)
, bc_(bc)
, schema_bc_(schema_bc)
, metadata_bc_(metadata_bc)
, options_(options)
@ -344,9 +369,11 @@ void filesystem_writer_<LoggerPolicy>::writer_thread() {
fsb->wait_until_compressed();
LOG_DEBUG << get_section_name(fsb->type()) << " compressed from "
LOG_DEBUG << get_section_name(fsb->type()) << " [" << fsb->number()
<< "] compressed from "
<< size_with_unit(fsb->uncompressed_size()) << " to "
<< size_with_unit(fsb->size());
<< size_with_unit(fsb->size()) << " [" << fsb->description()
<< "]";
write(*fsb);
}
@ -396,9 +423,25 @@ void filesystem_writer_<LoggerPolicy>::write(fsblock const& fsb) {
}
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write_section(
block_compressor const&
filesystem_writer_<LoggerPolicy>::compressor_for_category(
fragment_category::value_type cat) const {
if (auto it = bc_.find(cat); it != bc_.end()) {
LOG_DEBUG << "using compressor (" << it->second.describe()
<< ") for category " << cat;
return it->second;
}
LOG_DEBUG << "using default compressor (" << default_bc_.value().describe()
<< ") for category " << cat;
return default_bc_.value();
}
template <typename LoggerPolicy>
uint32_t filesystem_writer_<LoggerPolicy>::write_section(
section_type type, std::shared_ptr<block_data>&& data,
block_compressor const& bc) {
block_compressor const& bc, std::optional<std::string> meta) {
uint32_t block_no;
{
std::unique_lock lock(mx_);
@ -406,15 +449,17 @@ void filesystem_writer_<LoggerPolicy>::write_section(
cond_.wait(lock);
}
auto fsb =
std::make_unique<fsblock>(type, bc, std::move(data), section_number_++);
block_no = section_number_++;
auto fsb = std::make_unique<fsblock>(type, bc, std::move(data), block_no);
fsb->compress(wg_);
fsb->compress(wg_, meta);
queue_.push_back(std::move(fsb));
}
cond_.notify_one();
return block_no;
}
template <typename LoggerPolicy>
@ -435,6 +480,26 @@ void filesystem_writer_<LoggerPolicy>::write_compressed_section(
cond_.notify_one();
}
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::add_default_compressor(
block_compressor bc) {
if (default_bc_) {
DWARFS_THROW(runtime_error, "default compressor registered more than once");
}
default_bc_ = std::move(bc);
}
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::add_category_compressor(
fragment_category::value_type cat, block_compressor bc) {
LOG_DEBUG << "adding compressor (" << bc.describe() << ") for category "
<< cat;
if (!bc_.emplace(cat, std::move(bc)).second) {
DWARFS_THROW(runtime_error,
"category compressor registered more than once");
}
}
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::copy_header(
std::span<uint8_t const> header) {
@ -449,9 +514,11 @@ void filesystem_writer_<LoggerPolicy>::copy_header(
}
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write_block(
std::shared_ptr<block_data>&& data) {
write_section(section_type::BLOCK, std::move(data), bc_);
uint32_t filesystem_writer_<LoggerPolicy>::write_block(
fragment_category::value_type cat, std::shared_ptr<block_data>&& data,
std::optional<std::string> meta) {
return write_section(section_type::BLOCK, std::move(data),
compressor_for_category(cat), std::move(meta));
}
template <typename LoggerPolicy>
@ -512,21 +579,12 @@ void filesystem_writer_<LoggerPolicy>::write_section_index() {
filesystem_writer::filesystem_writer(std::ostream& os, logger& lgr,
worker_group& wg, progress& prog,
const block_compressor& bc,
filesystem_writer_options const& options,
std::istream* header)
: filesystem_writer(os, lgr, wg, prog, bc, bc, bc, options, header) {}
filesystem_writer::filesystem_writer(std::ostream& os, logger& lgr,
worker_group& wg, progress& prog,
const block_compressor& bc,
const block_compressor& schema_bc,
const block_compressor& metadata_bc,
filesystem_writer_options const& options,
std::istream* header)
: impl_(
make_unique_logging_object<impl, filesystem_writer_, logger_policies>(
lgr, os, wg, prog, bc, schema_bc, metadata_bc, options, header)) {
}
lgr, os, wg, prog, schema_bc, metadata_bc, options, header)) {}
} // namespace dwarfs

View File

@ -33,7 +33,7 @@ namespace dwarfs {
fragment_chunkable::fragment_chunkable(inode const& ino,
single_inode_fragment& frag,
file_off_t offset, mmif& mm,
categorizer_manager const& catmgr)
categorizer_manager const* catmgr)
: ino_{ino}
, frag_{frag}
, offset_{offset}
@ -45,9 +45,13 @@ fragment_chunkable::~fragment_chunkable() = default;
size_t fragment_chunkable::size() const { return frag_.size(); }
std::string fragment_chunkable::description() const {
return fmt::format("{} fragment at offset {} of inode {} [{}] - size: {}",
catmgr_.category_name(frag_.category().value()), offset_,
ino_.num(), ino_.any()->name(), size());
std::string prefix;
if (catmgr_) {
prefix =
fmt::format("{} ", catmgr_->category_name(frag_.category().value()));
}
return fmt::format("{}fragment at offset {} of inode {} [{}] - size: {}",
prefix, offset_, ino_.num(), ino_.any()->name(), size());
}
std::span<uint8_t const> fragment_chunkable::span() const {

View File

@ -93,7 +93,7 @@ std::string inode_element_view::description(size_t i) const {
}
nilsimsa::hash_type const& inode_element_view::get_bits(size_t i) const {
return inodes_[i]->nilsimsa_similarity_hash();
return *hash_cache_[i];
}
} // namespace dwarfs

View File

@ -29,6 +29,7 @@
#include <limits>
#include <numeric>
#include <ostream>
#include <ranges>
#include <string>
#include <unordered_map>
#include <variant>
@ -69,10 +70,7 @@ class inode_ : public inode {
public:
using chunk_type = thrift::metadata::chunk;
inode_() {
std::fill(nilsimsa_similarity_hash_.begin(),
nilsimsa_similarity_hash_.end(), 0);
}
inode_() = default;
void set_num(uint32_t num) override {
DWARFS_CHECK((flags_ & kNumIsValid) == 0,
@ -89,27 +87,22 @@ class inode_ : public inode {
bool has_category(fragment_category cat) const override {
DWARFS_CHECK(!fragments_.empty(),
"has_category() called with no fragments");
if (fragments_.size() == 1) {
return fragments_.get_single_category() == cat;
}
auto& m = std::get<similarity_map_type>(similarity_);
return m.find(cat) != m.end();
return std::ranges::any_of(
fragments_, [cat](auto const& f) { return f.category() == cat; });
}
uint32_t similarity_hash() const override {
if (files_.empty()) {
DWARFS_THROW(runtime_error, "inode has no file (similarity)");
}
// TODO
return similarity_hash_;
return std::get<uint32_t>(similarity_);
}
nilsimsa::hash_type const& nilsimsa_similarity_hash() const override {
if (files_.empty()) {
DWARFS_THROW(runtime_error, "inode has no file (nilsimsa)");
}
// TODO
return nilsimsa_similarity_hash_;
return std::get<nilsimsa::hash_type>(similarity_);
}
uint32_t similarity_hash(fragment_category cat) const override {
@ -129,6 +122,11 @@ class inode_ : public inode {
files_ = std::move(fv);
}
void populate(size_t size) override {
assert(fragments_.empty());
fragments_.emplace_back(categorizer_manager::default_category(), size);
}
void scan(mmif* mm, inode_options const& opts) override {
categorizer_job catjob;
@ -189,15 +187,14 @@ class inode_ : public inode {
if (fragments_.empty()) {
fragments_.emplace_back(categorizer_manager::default_category(),
mm ? mm->size() : 0);
scan_full(mm, opts);
}
}
void add_chunk(size_t block, size_t offset, size_t size) override {
chunk_type c;
c.block() = block;
c.offset() = offset;
c.size() = size;
chunks_.push_back(c);
DWARFS_CHECK(fragments_.size() == 1,
"exactly one fragment must be used in legacy add_chunk()");
fragments_.back().add_chunk(block, offset, size);
}
size_t size() const override { return any()->size(); }
@ -212,10 +209,13 @@ class inode_ : public inode {
}
void append_chunks_to(std::vector<chunk_type>& vec) const override {
vec.insert(vec.end(), chunks_.begin(), chunks_.end());
for (auto const& frag : fragments_) {
auto chks = frag.chunks();
vec.insert(vec.end(), chks.begin(), chks.end());
}
}
inode_fragments const& fragments() const override { return fragments_; }
inode_fragments& fragments() override { return fragments_; }
void dump(std::ostream& os, inode_options const& options) const override {
auto dump_category = [&os, &options](fragment_category const& cat) {
@ -379,7 +379,6 @@ class inode_ : public inode {
}
void scan_full(mmif* mm, inode_options const& opts) {
assert(mm);
assert(fragments_.size() <= 1);
auto order_mode =
@ -394,18 +393,20 @@ class inode_ : public inode {
case file_order_mode::SIMILARITY: {
similarity sc;
scan_range(mm, sc);
similarity_hash_ = sc.finalize(); // TODO
if (mm) {
scan_range(mm, sc);
}
similarity_.emplace<uint32_t>(sc.finalize());
} break;
case file_order_mode::NILSIMSA: {
nilsimsa nc;
scan_range(mm, nc);
if (mm) {
scan_range(mm, nc);
}
// TODO: can we finalize in-place?
nilsimsa::hash_type hash;
nc.finalize(hash);
nilsimsa_similarity_hash_ = hash; // TODO
similarity_.emplace<nilsimsa::hash_type>(hash);
} break;
}
@ -434,12 +435,6 @@ class inode_ : public inode {
similarity_map_type // 24 bytes
>
similarity_;
// OLDE:
uint32_t similarity_hash_{0}; // TODO: remove (move to similarity_)
std::vector<chunk_type> chunks_; // TODO: remove (part of fragments_ now)
nilsimsa::hash_type
nilsimsa_similarity_hash_; // TODO: remove (move to similarity_)
};
} // namespace
@ -461,9 +456,6 @@ class inode_manager_ final : public inode_manager::impl {
size_t count() const override { return inodes_.size(); }
void
order_inodes(worker_group& wg, inode_manager::inode_cb const& fn) override;
void for_each_inode_in_order(
std::function<void(std::shared_ptr<inode> const&)> const& fn)
const override {
@ -527,6 +519,9 @@ class inode_manager_ final : public inode_manager::impl {
return sortable_inode_span(inodes_);
}
sortable_inode_span
ordered_span(fragment_category cat, worker_group& wg) const override;
private:
static bool inodes_need_scanning(inode_options const& opts) {
if (opts.categorizer_mgr) {
@ -539,29 +534,6 @@ class inode_manager_ final : public inode_manager::impl {
});
}
void order_inodes_by_path() {
auto span = sortable_span();
span.all();
inode_ordering(LOG_GET_LOGGER, prog_).by_path(span);
std::vector<std::shared_ptr<inode>> tmp(span.begin(), span.end());
inodes_.swap(tmp);
}
void order_inodes_by_similarity() {
auto span = sortable_span();
span.all();
inode_ordering(LOG_GET_LOGGER, prog_).by_similarity(span);
std::vector<std::shared_ptr<inode>> tmp(span.begin(), span.end());
inodes_.swap(tmp);
}
void presort_index(std::vector<std::shared_ptr<inode>>& inodes,
std::vector<uint32_t>& index);
void order_inodes_by_nilsimsa(worker_group& wg);
LOG_PROXY_DECL(LoggerPolicy);
std::vector<std::shared_ptr<inode>> inodes_;
progress& prog_;
@ -587,123 +559,82 @@ void inode_manager_<LoggerPolicy>::scan_background(worker_group& wg,
mm = os.map_file(p->fs_path(), size);
}
ino->scan(mm.get(), opts_);
prog_.fragments_found += ino->fragments().size();
++prog_.similarity_scans; // TODO: we probably don't want this here
prog_.similarity_bytes += size;
++prog_.inodes_scanned;
++prog_.files_scanned;
});
} else {
ino->populate(p->size());
prog_.fragments_found += ino->fragments().size();
++prog_.inodes_scanned;
++prog_.files_scanned;
}
}
template <typename LoggerPolicy>
void inode_manager_<LoggerPolicy>::order_inodes(
worker_group& wg, inode_manager::inode_cb const& fn) {
// TODO: only use an index, never actually reorder inodes
// TODO:
switch (opts_.fragment_order.get().mode) {
case file_order_mode::NONE:
LOG_INFO << "keeping inode order";
break;
case file_order_mode::PATH: {
LOG_INFO << "ordering " << count() << " inodes by path name...";
auto ti = LOG_CPU_TIMED_INFO;
order_inodes_by_path();
ti << count() << " inodes ordered";
break;
}
case file_order_mode::SIMILARITY: {
LOG_INFO << "ordering " << count() << " inodes by similarity...";
auto ti = LOG_CPU_TIMED_INFO;
order_inodes_by_similarity();
ti << count() << " inodes ordered";
break;
}
case file_order_mode::NILSIMSA: {
LOG_INFO << "ordering " << count()
<< " inodes using new nilsimsa similarity...";
auto ti = LOG_CPU_TIMED_INFO;
order_inodes_by_nilsimsa(wg);
ti << count() << " inodes ordered";
break;
}
}
LOG_INFO << "assigning file inodes...";
for (const auto& ino : inodes_) {
fn(ino);
}
}
template <typename LoggerPolicy>
void inode_manager_<LoggerPolicy>::presort_index(
std::vector<std::shared_ptr<inode>>& inodes, std::vector<uint32_t>& index) {
auto ti = LOG_TIMED_INFO;
size_t num_name = 0;
size_t num_path = 0;
std::sort(index.begin(), index.end(), [&](auto a, auto b) {
auto const& ia = *inodes[a];
auto const& ib = *inodes[b];
auto sa = ia.size();
auto sb = ib.size();
if (sa < sb) {
return true;
} else if (sa > sb) {
return false;
}
++num_name;
auto fa = ia.any();
auto fb = ib.any();
auto& na = fa->name();
auto& nb = fb->name();
if (na > nb) {
return true;
} else if (na < nb) {
return false;
}
++num_path;
return !fa->less_revpath(*fb);
});
ti << "pre-sorted index (" << num_name << " name, " << num_path
<< " path lookups)";
}
template <typename LoggerPolicy>
void inode_manager_<LoggerPolicy>::order_inodes_by_nilsimsa(worker_group& wg) {
auto const& file_order = opts_.fragment_order.get(); // TODO
similarity_ordering_options opts;
opts.max_children = file_order.nilsimsa_max_children;
opts.max_cluster_size = file_order.nilsimsa_max_cluster_size;
auto span = sortable_span();
span.all();
inode_ordering(LOG_GET_LOGGER, prog_).by_nilsimsa(wg, opts, span);
std::vector<std::shared_ptr<inode>> tmp(span.begin(), span.end());
inodes_.swap(tmp);
}
template <typename LoggerPolicy>
void inode_manager_<LoggerPolicy>::dump(std::ostream& os) const {
for_each_inode_in_order(
[this, &os](auto const& ino) { ino->dump(os, opts_); });
}
template <typename LoggerPolicy>
auto inode_manager_<LoggerPolicy>::ordered_span(fragment_category cat,
worker_group& wg) const
-> sortable_inode_span {
std::string prefix;
if (opts_.categorizer_mgr) {
prefix =
fmt::format("[{}] ", opts_.categorizer_mgr->category_name(cat.value()));
}
auto opts = opts_.fragment_order.get(cat);
auto span = sortable_span();
span.select([cat](auto const& v) { return v->has_category(cat); });
inode_ordering order(LOG_GET_LOGGER, prog_);
switch (opts.mode) {
case file_order_mode::NONE:
LOG_INFO << prefix << "keeping inode order";
break;
case file_order_mode::PATH: {
LOG_INFO << prefix << "ordering " << span.size()
<< " inodes by path name...";
auto ti = LOG_CPU_TIMED_INFO;
order.by_path(span);
ti << prefix << span.size() << " inodes ordered";
break;
}
case file_order_mode::SIMILARITY: {
LOG_INFO << prefix << "ordering " << span.size()
<< " inodes by similarity...";
auto ti = LOG_CPU_TIMED_INFO;
order.by_similarity(span, cat);
ti << prefix << span.size() << " inodes ordered";
break;
}
case file_order_mode::NILSIMSA: {
LOG_INFO << prefix << "ordering " << span.size()
<< " inodes using nilsimsa similarity...";
similarity_ordering_options soo;
soo.max_children = opts.nilsimsa_max_children;
soo.max_cluster_size = opts.nilsimsa_max_cluster_size;
auto ti = LOG_TIMED_INFO;
order.by_nilsimsa(wg, soo, span, cat);
ti << prefix << span.size() << " inodes ordered";
break;
}
}
return span;
}
inode_manager::inode_manager(logger& lgr, progress& prog,
inode_options const& opts)
: impl_(make_unique_logging_object<impl, inode_manager_, logger_policies>(

View File

@ -38,17 +38,20 @@
#include <fmt/format.h>
#include "dwarfs/block_data.h"
#include "dwarfs/block_manager.h"
#include "dwarfs/categorizer.h"
#include "dwarfs/entry.h"
#include "dwarfs/error.h"
#include "dwarfs/file_scanner.h"
#include "dwarfs/filesystem_writer.h"
#include "dwarfs/fragment_chunkable.h"
#include "dwarfs/global_entry_data.h"
#include "dwarfs/inode.h"
#include "dwarfs/inode_chunkable.h"
#include "dwarfs/inode_manager.h"
#include "dwarfs/inode_ordering.h"
#include "dwarfs/logger.h"
#include "dwarfs/metadata_v2.h"
#include "dwarfs/mmif.h"
#include "dwarfs/options.h"
#include "dwarfs/os_access.h"
#include "dwarfs/progress.h"
@ -663,7 +666,6 @@ void scanner_<LoggerPolicy>::scan(
}
LOG_INFO << "building blocks...";
segmenter seg(LOG_GET_LOGGER, prog, cfg_, fsw);
// TODO:
// - get rid of multiple worker groups
@ -671,37 +673,77 @@ void scanner_<LoggerPolicy>::scan(
// which gets run on a worker groups; each batch keeps track of
// its CPU time and affects thread naming
// segmenter seg(LOG_GET_LOGGER, prog, cfg_, fsw);
auto blockmgr = std::make_shared<block_manager>();
{
worker_group blockify("blockify", 1, 1 << 20);
size_t const num_threads = std::max(folly::hardware_concurrency(), 1u);
worker_group wg_ordering("ordering", num_threads);
worker_group wg_blockify("blockify", num_threads);
{
// TODO
size_t const num_threads = std::max(folly::hardware_concurrency(), 1u);
worker_group wg_order("ordering", num_threads);
for (auto category : im.inode_categories()) {
auto catmgr = options_.inode.categorizer_mgr.get();
std::string meta;
// ordering.add_job([&] {
im.order_inodes(wg_order, [&](std::shared_ptr<inode> const& ino) {
blockify.add_job([&, this] {
prog.current.store(ino.get());
inode_chunkable ic(*ino, *os_);
seg.add_chunkable(ic);
prog.inodes_written++;
});
if (catmgr) {
meta = catmgr->category_metadata(category);
}
wg_ordering.add_job([this, catmgr, blockmgr, category, meta, &prog, &fsw,
&im, &wg_ordering, &wg_blockify] {
wg_blockify.add_job(
[this, catmgr, blockmgr, category, meta, &prog, &fsw,
span = im.ordered_span(category, wg_ordering)]() mutable {
// TODO: segmenter config per-category
segmenter seg(LOG_GET_LOGGER, prog, blockmgr, cfg_,
[category, meta, &fsw](auto block) {
return fsw.write_block(category.value(),
std::move(block), meta);
});
for (auto ino : span) {
prog.current.store(ino.get());
// TODO: factor this code out
auto f = ino->any();
if (auto size = f->size(); size > 0) {
auto mm = os_->map_file(f->fs_path(), size);
file_off_t offset{0};
for (auto& frag : ino->fragments()) {
if (frag.category() == category) {
fragment_chunkable fc(*ino, frag, offset, *mm, catmgr);
seg.add_chunkable(fc);
prog.fragments_written++;
}
offset += frag.size();
}
}
prog.inodes_written++; // TODO: remove?
}
seg.finish();
});
});
// });
// wg_order.wait();
}
LOG_INFO << "waiting for segmenting/blockifying to finish...";
blockify.wait();
wg_blockify.wait();
wg_ordering.wait();
LOG_INFO << "segmenting/blockifying CPU time: "
<< time_with_unit(blockify.get_cpu_time());
LOG_INFO << "ordering CPU time: "
<< time_with_unit(wg_ordering.get_cpu_time());
LOG_INFO << "segmenting CPU time: "
<< time_with_unit(wg_blockify.get_cpu_time());
}
seg.finish();
// seg.finish();
wg_.wait();
prog.set_status_function([](progress const&, size_t) {
@ -722,6 +764,8 @@ void scanner_<LoggerPolicy>::scan(
ino->append_chunks_to(mv2.chunks().value());
});
blockmgr->map_logical_blocks(mv2.chunks().value());
// insert dummy inode to help determine number of chunks per inode
DWARFS_NOTHROW(mv2.chunk_table()->at(im.count())) = mv2.chunks()->size();

View File

@ -39,12 +39,12 @@
#include <folly/stats/Histogram.h>
#include "dwarfs/block_data.h"
#include "dwarfs/block_manager.h"
#include "dwarfs/chunkable.h"
#include "dwarfs/compiler.h"
#include "dwarfs/cyclic_hash.h"
#include "dwarfs/entry.h"
#include "dwarfs/error.h"
#include "dwarfs/filesystem_writer.h"
#include "dwarfs/logger.h"
#include "dwarfs/progress.h"
#include "dwarfs/segmenter.h"
@ -288,12 +288,14 @@ class active_block {
template <typename LoggerPolicy>
class segmenter_ final : public segmenter::impl {
public:
segmenter_(logger& lgr, progress& prog, const segmenter::config& cfg,
filesystem_writer& fsw)
segmenter_(logger& lgr, progress& prog, std::shared_ptr<block_manager> blkmgr,
const segmenter::config& cfg,
segmenter::block_ready_cb block_ready)
: LOG_PROXY_INIT(lgr)
, prog_{prog}
, blkmgr_{std::move(blkmgr)}
, cfg_{cfg}
, fsw_{fsw}
, block_ready_{std::move(block_ready)}
, window_size_{window_size(cfg)}
, window_step_{window_step(cfg)}
, block_size_{block_size(cfg)}
@ -346,13 +348,13 @@ class segmenter_ final : public segmenter::impl {
LOG_PROXY_DECL(LoggerPolicy);
progress& prog_;
std::shared_ptr<block_manager> blkmgr_;
const segmenter::config& cfg_;
filesystem_writer& fsw_;
segmenter::block_ready_cb block_ready_;
size_t const window_size_;
size_t const window_step_;
size_t const block_size_;
size_t block_count_{0};
chunk_state chunk_;
@ -512,7 +514,8 @@ template <typename LoggerPolicy>
void segmenter_<LoggerPolicy>::block_ready() {
auto& block = blocks_.back();
block.finalize(stats_);
fsw_.write_block(block.data());
auto written_block_num = block_ready_(block.data());
blkmgr_->set_written_block(block.num(), written_block_num);
++prog_.block_count;
}
@ -529,7 +532,7 @@ void segmenter_<LoggerPolicy>::append_to_block(chunkable& chkable,
filter_.merge(b.filter());
}
blocks_.emplace_back(block_count_++, block_size_,
blocks_.emplace_back(blkmgr_->get_logical_block(), block_size_,
cfg_.max_active_blocks > 0 ? window_size_ : 0,
window_step_, filter_.size());
}
@ -711,9 +714,10 @@ void segmenter_<LoggerPolicy>::segment_and_add_data(chunkable& chkable,
finish_chunk(chkable);
}
segmenter::segmenter(logger& lgr, progress& prog, const config& cfg,
filesystem_writer& fsw)
segmenter::segmenter(logger& lgr, progress& prog,
std::shared_ptr<block_manager> blkmgr, const config& cfg,
block_ready_cb block_ready)
: impl_(make_unique_logging_object<impl, segmenter_, logger_policies>(
lgr, prog, cfg, fsw)) {}
lgr, prog, std::move(blkmgr), cfg, std::move(block_ready))) {}
} // namespace dwarfs

View File

@ -372,7 +372,7 @@ int mkdwarfs_main(int argc, sys_char** argv) {
"recompress an existing filesystem (none, block, metadata, all)")
("categorize",
po::value<std::string>(&categorizer_list_str)
->default_value("pcmaudio,incompressible"),
->implicit_value("pcmaudio,incompressible"),
categorize_desc.c_str())
("order",
po::value<std::vector<std::string>>(&order)->multitoken(),
@ -923,10 +923,6 @@ int mkdwarfs_main(int argc, sys_char** argv) {
progress prog(std::move(updater), interval_ms);
block_compressor bc(compression.front()); // TODO
block_compressor schema_bc(schema_compression);
block_compressor metadata_bc(metadata_compression);
auto min_memory_req = num_workers * (UINT64_C(1) << cfg.block_size_bits);
// TODO:
@ -1023,6 +1019,12 @@ int mkdwarfs_main(int argc, sys_char** argv) {
return 1;
}
block_compressor schema_bc(schema_compression);
block_compressor metadata_bc(metadata_compression);
filesystem_writer fsw(*os, lgr, wg_compress, prog, schema_bc, metadata_bc,
fswopts, header_ifs.get());
try {
categorized_option<block_compressor> compression_opt;
contextual_option_parser cop("--compression", compression_opt, cp,
@ -1030,10 +1032,14 @@ int mkdwarfs_main(int argc, sys_char** argv) {
cop.parse(compression);
LOG_DEBUG << cop.as_string();
compression_opt.visit_contextual([catmgr = options.inode.categorizer_mgr](
auto cat, block_compressor const& bc) {
fsw.add_default_compressor(compression_opt.get());
compression_opt.visit_contextual([catmgr = options.inode.categorizer_mgr,
&fsw](auto cat,
block_compressor const& bc) {
try {
catmgr->set_metadata_requirements(cat, bc.metadata_requirements());
fsw.add_category_compressor(cat, bc);
} catch (std::exception const& e) {
throw std::runtime_error(
fmt::format("compression '{}' cannot be used for category '{}': "
@ -1046,9 +1052,6 @@ int mkdwarfs_main(int argc, sys_char** argv) {
return 1;
}
filesystem_writer fsw(*os, lgr, wg_compress, prog, bc, schema_bc, metadata_bc,
fswopts, header_ifs.get());
auto ti = LOG_TIMED_INFO;
try {

View File

@ -81,7 +81,8 @@ build_dwarfs(logger& lgr, std::shared_ptr<test::os_access_mock> input,
}
block_compressor bc(compression);
filesystem_writer fsw(oss, lgr, wg, *prog, bc);
filesystem_writer fsw(oss, lgr, wg, *prog, bc, bc);
fsw.add_default_compressor(bc);
s.scan(fsw, std::filesystem::path("/"), *prog, input_list);

View File

@ -125,7 +125,8 @@ std::string make_filesystem(::benchmark::State const& state) {
progress prog([](const progress&, bool) {}, 1000);
block_compressor bc("null");
filesystem_writer fsw(oss, lgr, wg, prog, bc);
filesystem_writer fsw(oss, lgr, wg, prog, bc, bc);
fsw.add_default_compressor(bc);
s.scan(fsw, "", prog);

View File

@ -1109,7 +1109,8 @@ TEST_P(rewrite, filesystem_rewrite) {
std::ostringstream rewritten, idss;
{
filesystem_writer fsw(rewritten, lgr, wg, prog, bc);
filesystem_writer fsw(rewritten, lgr, wg, prog, bc, bc);
fsw.add_default_compressor(bc);
auto mm = std::make_shared<mmap>(filename);
EXPECT_NO_THROW(filesystem_v2::identify(lgr, mm, idss));
EXPECT_FALSE(filesystem_v2::header(mm));
@ -1130,7 +1131,8 @@ TEST_P(rewrite, filesystem_rewrite) {
{
std::istringstream hdr_iss(format_sh);
filesystem_writer_options fsw_opts;
filesystem_writer fsw(rewritten, lgr, wg, prog, bc, fsw_opts, &hdr_iss);
filesystem_writer fsw(rewritten, lgr, wg, prog, bc, bc, fsw_opts, &hdr_iss);
fsw.add_default_compressor(bc);
filesystem_v2::rewrite(lgr, prog, std::make_shared<mmap>(filename), fsw,
opts);
}
@ -1155,7 +1157,9 @@ TEST_P(rewrite, filesystem_rewrite) {
{
std::istringstream hdr_iss("D");
filesystem_writer_options fsw_opts;
filesystem_writer fsw(rewritten2, lgr, wg, prog, bc, fsw_opts, &hdr_iss);
filesystem_writer fsw(rewritten2, lgr, wg, prog, bc, bc, fsw_opts,
&hdr_iss);
fsw.add_default_compressor(bc);
filesystem_v2::rewrite(lgr, prog,
std::make_shared<test::mmap_mock>(rewritten.str()),
fsw, opts);
@ -1173,7 +1177,8 @@ TEST_P(rewrite, filesystem_rewrite) {
std::ostringstream rewritten3;
{
filesystem_writer fsw(rewritten3, lgr, wg, prog, bc);
filesystem_writer fsw(rewritten3, lgr, wg, prog, bc, bc);
fsw.add_default_compressor(bc);
filesystem_v2::rewrite(lgr, prog,
std::make_shared<test::mmap_mock>(rewritten2.str()),
fsw, opts);
@ -1193,7 +1198,8 @@ TEST_P(rewrite, filesystem_rewrite) {
{
filesystem_writer_options fsw_opts;
fsw_opts.remove_header = true;
filesystem_writer fsw(rewritten4, lgr, wg, prog, bc, fsw_opts);
filesystem_writer fsw(rewritten4, lgr, wg, prog, bc, bc, fsw_opts);
fsw.add_default_compressor(bc);
filesystem_v2::rewrite(lgr, prog,
std::make_shared<test::mmap_mock>(rewritten3.str()),
fsw, opts);
@ -1213,7 +1219,8 @@ TEST_P(rewrite, filesystem_rewrite) {
{
filesystem_writer_options fsw_opts;
fsw_opts.no_section_index = true;
filesystem_writer fsw(rewritten5, lgr, wg, prog, bc, fsw_opts);
filesystem_writer fsw(rewritten5, lgr, wg, prog, bc, bc, fsw_opts);
fsw.add_default_compressor(bc);
filesystem_v2::rewrite(lgr, prog,
std::make_shared<test::mmap_mock>(rewritten4.str()),
fsw, opts);