diff --git a/CMakeLists.txt b/CMakeLists.txt index 9395d054..49c4381d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -356,6 +356,7 @@ list( src/dwarfs/block_cache.cpp src/dwarfs/block_compressor.cpp src/dwarfs/block_compressor_parser.cpp + src/dwarfs/block_manager.cpp src/dwarfs/block_range.cpp src/dwarfs/builtin_script.cpp src/dwarfs/cached_block.cpp diff --git a/include/dwarfs/block_manager.h b/include/dwarfs/block_manager.h new file mode 100644 index 00000000..4e51eea5 --- /dev/null +++ b/include/dwarfs/block_manager.h @@ -0,0 +1,45 @@ +/* vim:set ts=2 sw=2 sts=2 et: */ +/** + * \author Marcus Holland-Moritz (github@mhxnet.de) + * \copyright Copyright (c) Marcus Holland-Moritz + * + * This file is part of dwarfs. + * + * dwarfs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dwarfs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dwarfs. If not, see . + */ + +#pragma once + +#include +#include + +#include "dwarfs/gen-cpp2/metadata_types.h" + +namespace dwarfs { + +class block_manager { + public: + using chunk_type = thrift::metadata::chunk; + + size_t get_logical_block() const; + void set_written_block(size_t logical_block, size_t written_block); + void map_logical_blocks(std::vector& vec); + + private: + std::mutex mutable mx_; + size_t mutable num_blocks_{0}; + std::vector block_map_; +}; + +} // namespace dwarfs diff --git a/include/dwarfs/filesystem_writer.h b/include/dwarfs/filesystem_writer.h index 2880605f..fdfaa8a7 100644 --- a/include/dwarfs/filesystem_writer.h +++ b/include/dwarfs/filesystem_writer.h @@ -28,6 +28,7 @@ #include #include +#include "dwarfs/fragment_category.h" #include "dwarfs/fstypes.h" #include "dwarfs/options.h" #include "dwarfs/worker_group.h" @@ -44,23 +45,27 @@ class filesystem_writer { public: filesystem_writer( std::ostream& os, logger& lgr, worker_group& wg, progress& prog, - const block_compressor& bc, + const block_compressor& schema_bc, const block_compressor& metadata_bc, filesystem_writer_options const& options = filesystem_writer_options(), std::istream* header = nullptr); - filesystem_writer(std::ostream& os, logger& lgr, worker_group& wg, - progress& prog, const block_compressor& bc, - const block_compressor& schema_bc, - const block_compressor& metadata_bc, - filesystem_writer_options const& options, - std::istream* header = nullptr); + void add_default_compressor(block_compressor bc) { + impl_->add_default_compressor(std::move(bc)); + } + + void add_category_compressor(fragment_category::value_type cat, + block_compressor bc) { + impl_->add_category_compressor(cat, std::move(bc)); + } void copy_header(std::span header) { impl_->copy_header(header); } - void write_block(std::shared_ptr&& data) { - impl_->write_block(std::move(data)); + uint32_t write_block(fragment_category::value_type cat, + std::shared_ptr&& data, + std::optional meta = std::nullopt) { + return impl_->write_block(cat, std::move(data), std::move(meta)); } void write_metadata_v2_schema(std::shared_ptr&& data) { @@ -80,14 +85,17 @@ class filesystem_writer { size_t size() const { return impl_->size(); } - int queue_fill() const { return impl_->queue_fill(); } - class impl { public: virtual ~impl() = default; + virtual void add_default_compressor(block_compressor bc) = 0; + virtual void add_category_compressor(fragment_category::value_type cat, + block_compressor bc) = 0; virtual void copy_header(std::span header) = 0; - virtual void write_block(std::shared_ptr&& data) = 0; + virtual uint32_t write_block(fragment_category::value_type cat, + std::shared_ptr&& data, + std::optional meta) = 0; virtual void write_metadata_v2_schema(std::shared_ptr&& data) = 0; virtual void write_metadata_v2(std::shared_ptr&& data) = 0; @@ -96,7 +104,6 @@ class filesystem_writer { std::span data) = 0; virtual void flush() = 0; virtual size_t size() const = 0; - virtual int queue_fill() const = 0; }; private: diff --git a/include/dwarfs/fragment_chunkable.h b/include/dwarfs/fragment_chunkable.h index 0c28984e..bda55599 100644 --- a/include/dwarfs/fragment_chunkable.h +++ b/include/dwarfs/fragment_chunkable.h @@ -36,7 +36,7 @@ class fragment_chunkable : public chunkable { public: fragment_chunkable(inode const& ino, single_inode_fragment& frag, file_off_t offset, mmif& mm, - categorizer_manager const& catmgr); + categorizer_manager const* catmgr); ~fragment_chunkable(); size_t size() const override; @@ -50,7 +50,7 @@ class fragment_chunkable : public chunkable { single_inode_fragment& frag_; file_off_t offset_; mmif& mm_; - categorizer_manager const& catmgr_; + categorizer_manager const* catmgr_; }; } // namespace dwarfs diff --git a/include/dwarfs/inode.h b/include/dwarfs/inode.h index 6bb89886..3cb9877c 100644 --- a/include/dwarfs/inode.h +++ b/include/dwarfs/inode.h @@ -48,6 +48,7 @@ class inode : public object { using files_vector = folly::small_vector; virtual void set_files(files_vector&& fv) = 0; + virtual void populate(size_t size) = 0; virtual void scan(mmif* mm, inode_options const& options) = 0; virtual void set_num(uint32_t num) = 0; virtual uint32_t num() const = 0; @@ -63,7 +64,7 @@ class inode : public object { virtual void add_chunk(size_t block, size_t offset, size_t size) = 0; virtual void append_chunks_to(std::vector& vec) const = 0; - virtual inode_fragments const& fragments() const = 0; + virtual inode_fragments& fragments() = 0; virtual void dump(std::ostream& os, inode_options const& options) const = 0; }; diff --git a/include/dwarfs/inode_manager.h b/include/dwarfs/inode_manager.h index fdeac178..2a2eac5d 100644 --- a/include/dwarfs/inode_manager.h +++ b/include/dwarfs/inode_manager.h @@ -54,10 +54,6 @@ class inode_manager { size_t count() const { return impl_->count(); } - void order_inodes(worker_group& wg, inode_cb const& fn) { - impl_->order_inodes(wg, fn); - } - void for_each_inode_in_order(inode_cb const& fn) const { impl_->for_each_inode_in_order(fn); } @@ -80,13 +76,17 @@ class inode_manager { sortable_inode_span sortable_span() const { return impl_->sortable_span(); } + sortable_inode_span + ordered_span(fragment_category cat, worker_group& wg) const { + return impl_->ordered_span(cat, wg); + } + class impl { public: virtual ~impl() = default; virtual std::shared_ptr create_inode() = 0; virtual size_t count() const = 0; - virtual void order_inodes(worker_group& wg, inode_cb const& fn) = 0; virtual void for_each_inode_in_order( std::function const&)> const& fn) const = 0; virtual std::vector> @@ -97,6 +97,8 @@ class inode_manager { file const* p) const = 0; virtual void dump(std::ostream& os) const = 0; virtual sortable_inode_span sortable_span() const = 0; + virtual sortable_inode_span + ordered_span(fragment_category cat, worker_group& wg) const = 0; }; private: diff --git a/include/dwarfs/progress.h b/include/dwarfs/progress.h index d607a311..a47d2e6e 100644 --- a/include/dwarfs/progress.h +++ b/include/dwarfs/progress.h @@ -71,6 +71,8 @@ class progress { std::atomic chunk_count{0}; std::atomic inodes_scanned{0}; std::atomic inodes_written{0}; + std::atomic fragments_found{0}; + std::atomic fragments_written{0}; std::atomic blocks_written{0}; std::atomic errors{0}; std::atomic nilsimsa_depth{0}; diff --git a/include/dwarfs/segmenter.h b/include/dwarfs/segmenter.h index be7cfe06..41cbabdf 100644 --- a/include/dwarfs/segmenter.h +++ b/include/dwarfs/segmenter.h @@ -25,10 +25,13 @@ #include #include +#include + namespace dwarfs { +class block_data; +class block_manager; class chunkable; -class filesystem_writer; class logger; class progress; @@ -43,8 +46,10 @@ class segmenter { unsigned bloom_filter_size{4}; }; - segmenter(logger& lgr, progress& prog, const config& cfg, - filesystem_writer& fsw); + using block_ready_cb = folly::Function)>; + + segmenter(logger& lgr, progress& prog, std::shared_ptr blkmgr, + const config& cfg, block_ready_cb block_ready); void add_chunkable(chunkable& chkable) { impl_->add_chunkable(chkable); } diff --git a/src/dwarfs/block_manager.cpp b/src/dwarfs/block_manager.cpp new file mode 100644 index 00000000..11f3049a --- /dev/null +++ b/src/dwarfs/block_manager.cpp @@ -0,0 +1,56 @@ +/* vim:set ts=2 sw=2 sts=2 et: */ +/** + * \author Marcus Holland-Moritz (github@mhxnet.de) + * \copyright Copyright (c) Marcus Holland-Moritz + * + * This file is part of dwarfs. + * + * dwarfs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dwarfs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dwarfs. If not, see . + */ + +#include + +#include "dwarfs/block_manager.h" + +namespace dwarfs { + +size_t block_manager::get_logical_block() const { + size_t block_no; + { + std::lock_guard lock{mx_}; + block_no = num_blocks_++; + } + return block_no; +} + +void block_manager::set_written_block(size_t logical_block, + size_t written_block) { + std::lock_guard lock{mx_}; + assert(logical_block < num_blocks_); + if (block_map_.size() < num_blocks_) { + block_map_.resize(num_blocks_); + } + block_map_[logical_block] = written_block; +} + +void block_manager::map_logical_blocks(std::vector& vec) { + std::lock_guard lock{mx_}; + for (auto& c : vec) { + size_t block = c.get_block(); + assert(block < num_blocks_); + c.block() = block_map_[block]; + } +} + +} // namespace dwarfs diff --git a/src/dwarfs/console_writer.cpp b/src/dwarfs/console_writer.cpp index 59410139..54f0a158 100644 --- a/src/dwarfs/console_writer.cpp +++ b/src/dwarfs/console_writer.cpp @@ -177,9 +177,16 @@ void console_writer::update(const progress& p, bool last) { << size_with_unit(p.saved_by_segmentation) << newline << "filesystem: " << size_with_unit(p.filesystem_size) << " in " - << p.block_count << " blocks (" << p.chunk_count << " chunks, " - << (p.inodes_written > 0 ? p.inodes_written : p.inodes_scanned) << "/" - << p.files_found - p.duplicate_files - p.hardlinks << " inodes)" + << p.block_count << " blocks (" << p.chunk_count << " chunks, "; + + if (p.fragments_written > 0) { + oss << p.fragments_written << "/" << p.fragments_found + << " fragments, "; + } else { + oss << p.fragments_found << " fragments, " << p.inodes_scanned << "/"; + } + + oss << p.files_found - p.duplicate_files - p.hardlinks << " inodes)" << newline << "compressed filesystem: " << p.blocks_written << " blocks/" diff --git a/src/dwarfs/filesystem_v2.cpp b/src/dwarfs/filesystem_v2.cpp index e5bc88b0..1116e551 100644 --- a/src/dwarfs/filesystem_v2.cpp +++ b/src/dwarfs/filesystem_v2.cpp @@ -33,6 +33,7 @@ #include "dwarfs/block_cache.h" #include "dwarfs/block_compressor.h" #include "dwarfs/block_data.h" +#include "dwarfs/categorizer.h" #include "dwarfs/error.h" #include "dwarfs/filesystem_v2.h" #include "dwarfs/filesystem_writer.h" @@ -701,7 +702,9 @@ void filesystem_v2::rewrite(logger& lgr, progress& prog, auto block = std::make_shared(block_decompressor::decompress( s->compression(), mm->as(s->start()), s->length())); - writer.write_block(std::move(block)); + // TODO: re-write with different categories + writer.write_block(categorizer_manager::default_category().value(), + std::move(block)); } else { writer.write_compressed_section(s->type(), s->compression(), s->data(*mm)); diff --git a/src/dwarfs/filesystem_writer.cpp b/src/dwarfs/filesystem_writer.cpp index 423d89ee..a2f717d2 100644 --- a/src/dwarfs/filesystem_writer.cpp +++ b/src/dwarfs/filesystem_writer.cpp @@ -26,7 +26,9 @@ #include #include #include +#include #include +#include #include @@ -52,10 +54,14 @@ class fsblock { fsblock(section_type type, compression_type compression, std::span data, uint32_t number); - void compress(worker_group& wg) { impl_->compress(wg); } + void + compress(worker_group& wg, std::optional meta = std::nullopt) { + impl_->compress(wg, std::move(meta)); + } void wait_until_compressed() { impl_->wait_until_compressed(); } section_type type() const { return impl_->type(); } compression_type compression() const { return impl_->compression(); } + std::string description() const { return impl_->description(); } std::span data() const { return impl_->data(); } size_t uncompressed_size() const { return impl_->uncompressed_size(); } size_t size() const { return impl_->size(); } @@ -66,10 +72,12 @@ class fsblock { public: virtual ~impl() = default; - virtual void compress(worker_group& wg) = 0; + virtual void + compress(worker_group& wg, std::optional meta) = 0; virtual void wait_until_compressed() = 0; virtual section_type type() const = 0; virtual compression_type compression() const = 0; + virtual std::string description() const = 0; virtual std::span data() const = 0; virtual size_t uncompressed_size() const = 0; virtual size_t size() const = 0; @@ -95,14 +103,20 @@ class raw_fsblock : public fsblock::impl { , number_{number} , comp_type_{bc_.type()} {} - void compress(worker_group& wg) override { + void compress(worker_group& wg, std::optional meta) override { std::promise prom; future_ = prom.get_future(); - wg.add_job([this, prom = std::move(prom)]() mutable { + wg.add_job([this, prom = std::move(prom), + meta = std::move(meta)]() mutable { try { - // TODO: metadata - auto tmp = std::make_shared(bc_.compress(data_->vec())); + std::shared_ptr tmp; + + if (meta) { + tmp = std::make_shared(bc_.compress(data_->vec(), *meta)); + } else { + tmp = std::make_shared(bc_.compress(data_->vec())); + } { std::lock_guard lock(mx_); @@ -124,6 +138,8 @@ class raw_fsblock : public fsblock::impl { compression_type compression() const override { return comp_type_; } + std::string description() const override { return bc_.describe(); } + std::span data() const override { return data_->vec(); } size_t uncompressed_size() const override { return uncompressed_size_; } @@ -158,7 +174,8 @@ class compressed_fsblock : public fsblock::impl { , range_{range} , number_{number} {} - void compress(worker_group& wg) override { + void + compress(worker_group& wg, std::optional /* meta */) override { std::promise prom; future_ = prom.get_future(); @@ -173,6 +190,9 @@ class compressed_fsblock : public fsblock::impl { section_type type() const override { return type_; } compression_type compression() const override { return compression_; } + // TODO + std::string description() const override { return ""; } + std::span data() const override { return range_; } size_t uncompressed_size() const override { return range_.size(); } @@ -229,26 +249,32 @@ template class filesystem_writer_ final : public filesystem_writer::impl { public: filesystem_writer_(logger& lgr, std::ostream& os, worker_group& wg, - progress& prog, const block_compressor& bc, - const block_compressor& schema_bc, + progress& prog, const block_compressor& schema_bc, const block_compressor& metadata_bc, filesystem_writer_options const& options, std::istream* header); ~filesystem_writer_() noexcept override; + void add_default_compressor(block_compressor bc) override; + void add_category_compressor(fragment_category::value_type cat, + block_compressor bc) override; void copy_header(std::span header) override; - void write_block(std::shared_ptr&& data) override; + uint32_t write_block(fragment_category::value_type cat, + std::shared_ptr&& data, + std::optional meta) override; void write_metadata_v2_schema(std::shared_ptr&& data) override; void write_metadata_v2(std::shared_ptr&& data) override; void write_compressed_section(section_type type, compression_type compression, std::span data) override; void flush() override; size_t size() const override { return os_.tellp(); } - int queue_fill() const override { return static_cast(wg_.queue_size()); } private: - void write_section(section_type type, std::shared_ptr&& data, - block_compressor const& bc); + block_compressor const& + compressor_for_category(fragment_category::value_type cat) const; + uint32_t write_section(section_type type, std::shared_ptr&& data, + block_compressor const& bc, + std::optional meta = std::nullopt); void write(fsblock const& fsb); void write(const char* data, size_t size); template @@ -263,7 +289,8 @@ class filesystem_writer_ final : public filesystem_writer::impl { std::istream* header_; worker_group& wg_; progress& prog_; - const block_compressor& bc_; + std::optional default_bc_; + std::unordered_map bc_; const block_compressor& schema_bc_; const block_compressor& metadata_bc_; const filesystem_writer_options options_; @@ -281,14 +308,12 @@ class filesystem_writer_ final : public filesystem_writer::impl { template filesystem_writer_::filesystem_writer_( logger& lgr, std::ostream& os, worker_group& wg, progress& prog, - const block_compressor& bc, const block_compressor& schema_bc, - const block_compressor& metadata_bc, + const block_compressor& schema_bc, const block_compressor& metadata_bc, filesystem_writer_options const& options, std::istream* header) : os_(os) , header_(header) , wg_(wg) , prog_(prog) - , bc_(bc) , schema_bc_(schema_bc) , metadata_bc_(metadata_bc) , options_(options) @@ -344,9 +369,11 @@ void filesystem_writer_::writer_thread() { fsb->wait_until_compressed(); - LOG_DEBUG << get_section_name(fsb->type()) << " compressed from " + LOG_DEBUG << get_section_name(fsb->type()) << " [" << fsb->number() + << "] compressed from " << size_with_unit(fsb->uncompressed_size()) << " to " - << size_with_unit(fsb->size()); + << size_with_unit(fsb->size()) << " [" << fsb->description() + << "]"; write(*fsb); } @@ -396,9 +423,25 @@ void filesystem_writer_::write(fsblock const& fsb) { } template -void filesystem_writer_::write_section( +block_compressor const& +filesystem_writer_::compressor_for_category( + fragment_category::value_type cat) const { + if (auto it = bc_.find(cat); it != bc_.end()) { + LOG_DEBUG << "using compressor (" << it->second.describe() + << ") for category " << cat; + return it->second; + } + LOG_DEBUG << "using default compressor (" << default_bc_.value().describe() + << ") for category " << cat; + return default_bc_.value(); +} + +template +uint32_t filesystem_writer_::write_section( section_type type, std::shared_ptr&& data, - block_compressor const& bc) { + block_compressor const& bc, std::optional meta) { + uint32_t block_no; + { std::unique_lock lock(mx_); @@ -406,15 +449,17 @@ void filesystem_writer_::write_section( cond_.wait(lock); } - auto fsb = - std::make_unique(type, bc, std::move(data), section_number_++); + block_no = section_number_++; + auto fsb = std::make_unique(type, bc, std::move(data), block_no); - fsb->compress(wg_); + fsb->compress(wg_, meta); queue_.push_back(std::move(fsb)); } cond_.notify_one(); + + return block_no; } template @@ -435,6 +480,26 @@ void filesystem_writer_::write_compressed_section( cond_.notify_one(); } +template +void filesystem_writer_::add_default_compressor( + block_compressor bc) { + if (default_bc_) { + DWARFS_THROW(runtime_error, "default compressor registered more than once"); + } + default_bc_ = std::move(bc); +} + +template +void filesystem_writer_::add_category_compressor( + fragment_category::value_type cat, block_compressor bc) { + LOG_DEBUG << "adding compressor (" << bc.describe() << ") for category " + << cat; + if (!bc_.emplace(cat, std::move(bc)).second) { + DWARFS_THROW(runtime_error, + "category compressor registered more than once"); + } +} + template void filesystem_writer_::copy_header( std::span header) { @@ -449,9 +514,11 @@ void filesystem_writer_::copy_header( } template -void filesystem_writer_::write_block( - std::shared_ptr&& data) { - write_section(section_type::BLOCK, std::move(data), bc_); +uint32_t filesystem_writer_::write_block( + fragment_category::value_type cat, std::shared_ptr&& data, + std::optional meta) { + return write_section(section_type::BLOCK, std::move(data), + compressor_for_category(cat), std::move(meta)); } template @@ -512,21 +579,12 @@ void filesystem_writer_::write_section_index() { filesystem_writer::filesystem_writer(std::ostream& os, logger& lgr, worker_group& wg, progress& prog, - const block_compressor& bc, - filesystem_writer_options const& options, - std::istream* header) - : filesystem_writer(os, lgr, wg, prog, bc, bc, bc, options, header) {} - -filesystem_writer::filesystem_writer(std::ostream& os, logger& lgr, - worker_group& wg, progress& prog, - const block_compressor& bc, const block_compressor& schema_bc, const block_compressor& metadata_bc, filesystem_writer_options const& options, std::istream* header) : impl_( make_unique_logging_object( - lgr, os, wg, prog, bc, schema_bc, metadata_bc, options, header)) { -} + lgr, os, wg, prog, schema_bc, metadata_bc, options, header)) {} } // namespace dwarfs diff --git a/src/dwarfs/fragment_chunkable.cpp b/src/dwarfs/fragment_chunkable.cpp index 5735cbf0..e273b4f4 100644 --- a/src/dwarfs/fragment_chunkable.cpp +++ b/src/dwarfs/fragment_chunkable.cpp @@ -33,7 +33,7 @@ namespace dwarfs { fragment_chunkable::fragment_chunkable(inode const& ino, single_inode_fragment& frag, file_off_t offset, mmif& mm, - categorizer_manager const& catmgr) + categorizer_manager const* catmgr) : ino_{ino} , frag_{frag} , offset_{offset} @@ -45,9 +45,13 @@ fragment_chunkable::~fragment_chunkable() = default; size_t fragment_chunkable::size() const { return frag_.size(); } std::string fragment_chunkable::description() const { - return fmt::format("{} fragment at offset {} of inode {} [{}] - size: {}", - catmgr_.category_name(frag_.category().value()), offset_, - ino_.num(), ino_.any()->name(), size()); + std::string prefix; + if (catmgr_) { + prefix = + fmt::format("{} ", catmgr_->category_name(frag_.category().value())); + } + return fmt::format("{}fragment at offset {} of inode {} [{}] - size: {}", + prefix, offset_, ino_.num(), ino_.any()->name(), size()); } std::span fragment_chunkable::span() const { diff --git a/src/dwarfs/inode_element_view.cpp b/src/dwarfs/inode_element_view.cpp index 6e0bde5f..f705d7f9 100644 --- a/src/dwarfs/inode_element_view.cpp +++ b/src/dwarfs/inode_element_view.cpp @@ -93,7 +93,7 @@ std::string inode_element_view::description(size_t i) const { } nilsimsa::hash_type const& inode_element_view::get_bits(size_t i) const { - return inodes_[i]->nilsimsa_similarity_hash(); + return *hash_cache_[i]; } } // namespace dwarfs diff --git a/src/dwarfs/inode_manager.cpp b/src/dwarfs/inode_manager.cpp index 7a0190ae..ca52e215 100644 --- a/src/dwarfs/inode_manager.cpp +++ b/src/dwarfs/inode_manager.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -69,10 +70,7 @@ class inode_ : public inode { public: using chunk_type = thrift::metadata::chunk; - inode_() { - std::fill(nilsimsa_similarity_hash_.begin(), - nilsimsa_similarity_hash_.end(), 0); - } + inode_() = default; void set_num(uint32_t num) override { DWARFS_CHECK((flags_ & kNumIsValid) == 0, @@ -89,27 +87,22 @@ class inode_ : public inode { bool has_category(fragment_category cat) const override { DWARFS_CHECK(!fragments_.empty(), "has_category() called with no fragments"); - if (fragments_.size() == 1) { - return fragments_.get_single_category() == cat; - } - auto& m = std::get(similarity_); - return m.find(cat) != m.end(); + return std::ranges::any_of( + fragments_, [cat](auto const& f) { return f.category() == cat; }); } uint32_t similarity_hash() const override { if (files_.empty()) { DWARFS_THROW(runtime_error, "inode has no file (similarity)"); } - // TODO - return similarity_hash_; + return std::get(similarity_); } nilsimsa::hash_type const& nilsimsa_similarity_hash() const override { if (files_.empty()) { DWARFS_THROW(runtime_error, "inode has no file (nilsimsa)"); } - // TODO - return nilsimsa_similarity_hash_; + return std::get(similarity_); } uint32_t similarity_hash(fragment_category cat) const override { @@ -129,6 +122,11 @@ class inode_ : public inode { files_ = std::move(fv); } + void populate(size_t size) override { + assert(fragments_.empty()); + fragments_.emplace_back(categorizer_manager::default_category(), size); + } + void scan(mmif* mm, inode_options const& opts) override { categorizer_job catjob; @@ -189,15 +187,14 @@ class inode_ : public inode { if (fragments_.empty()) { fragments_.emplace_back(categorizer_manager::default_category(), mm ? mm->size() : 0); + scan_full(mm, opts); } } void add_chunk(size_t block, size_t offset, size_t size) override { - chunk_type c; - c.block() = block; - c.offset() = offset; - c.size() = size; - chunks_.push_back(c); + DWARFS_CHECK(fragments_.size() == 1, + "exactly one fragment must be used in legacy add_chunk()"); + fragments_.back().add_chunk(block, offset, size); } size_t size() const override { return any()->size(); } @@ -212,10 +209,13 @@ class inode_ : public inode { } void append_chunks_to(std::vector& vec) const override { - vec.insert(vec.end(), chunks_.begin(), chunks_.end()); + for (auto const& frag : fragments_) { + auto chks = frag.chunks(); + vec.insert(vec.end(), chks.begin(), chks.end()); + } } - inode_fragments const& fragments() const override { return fragments_; } + inode_fragments& fragments() override { return fragments_; } void dump(std::ostream& os, inode_options const& options) const override { auto dump_category = [&os, &options](fragment_category const& cat) { @@ -379,7 +379,6 @@ class inode_ : public inode { } void scan_full(mmif* mm, inode_options const& opts) { - assert(mm); assert(fragments_.size() <= 1); auto order_mode = @@ -394,18 +393,20 @@ class inode_ : public inode { case file_order_mode::SIMILARITY: { similarity sc; - scan_range(mm, sc); - similarity_hash_ = sc.finalize(); // TODO + if (mm) { + scan_range(mm, sc); + } similarity_.emplace(sc.finalize()); } break; case file_order_mode::NILSIMSA: { nilsimsa nc; - scan_range(mm, nc); + if (mm) { + scan_range(mm, nc); + } // TODO: can we finalize in-place? nilsimsa::hash_type hash; nc.finalize(hash); - nilsimsa_similarity_hash_ = hash; // TODO similarity_.emplace(hash); } break; } @@ -434,12 +435,6 @@ class inode_ : public inode { similarity_map_type // 24 bytes > similarity_; - - // OLDE: - uint32_t similarity_hash_{0}; // TODO: remove (move to similarity_) - std::vector chunks_; // TODO: remove (part of fragments_ now) - nilsimsa::hash_type - nilsimsa_similarity_hash_; // TODO: remove (move to similarity_) }; } // namespace @@ -461,9 +456,6 @@ class inode_manager_ final : public inode_manager::impl { size_t count() const override { return inodes_.size(); } - void - order_inodes(worker_group& wg, inode_manager::inode_cb const& fn) override; - void for_each_inode_in_order( std::function const&)> const& fn) const override { @@ -527,6 +519,9 @@ class inode_manager_ final : public inode_manager::impl { return sortable_inode_span(inodes_); } + sortable_inode_span + ordered_span(fragment_category cat, worker_group& wg) const override; + private: static bool inodes_need_scanning(inode_options const& opts) { if (opts.categorizer_mgr) { @@ -539,29 +534,6 @@ class inode_manager_ final : public inode_manager::impl { }); } - void order_inodes_by_path() { - auto span = sortable_span(); - span.all(); - inode_ordering(LOG_GET_LOGGER, prog_).by_path(span); - - std::vector> tmp(span.begin(), span.end()); - inodes_.swap(tmp); - } - - void order_inodes_by_similarity() { - auto span = sortable_span(); - span.all(); - inode_ordering(LOG_GET_LOGGER, prog_).by_similarity(span); - - std::vector> tmp(span.begin(), span.end()); - inodes_.swap(tmp); - } - - void presort_index(std::vector>& inodes, - std::vector& index); - - void order_inodes_by_nilsimsa(worker_group& wg); - LOG_PROXY_DECL(LoggerPolicy); std::vector> inodes_; progress& prog_; @@ -587,123 +559,82 @@ void inode_manager_::scan_background(worker_group& wg, mm = os.map_file(p->fs_path(), size); } ino->scan(mm.get(), opts_); + prog_.fragments_found += ino->fragments().size(); ++prog_.similarity_scans; // TODO: we probably don't want this here prog_.similarity_bytes += size; ++prog_.inodes_scanned; ++prog_.files_scanned; }); } else { + ino->populate(p->size()); + prog_.fragments_found += ino->fragments().size(); ++prog_.inodes_scanned; ++prog_.files_scanned; } } -template -void inode_manager_::order_inodes( - worker_group& wg, inode_manager::inode_cb const& fn) { - // TODO: only use an index, never actually reorder inodes - - // TODO: - switch (opts_.fragment_order.get().mode) { - case file_order_mode::NONE: - LOG_INFO << "keeping inode order"; - break; - - case file_order_mode::PATH: { - LOG_INFO << "ordering " << count() << " inodes by path name..."; - auto ti = LOG_CPU_TIMED_INFO; - order_inodes_by_path(); - ti << count() << " inodes ordered"; - break; - } - - case file_order_mode::SIMILARITY: { - LOG_INFO << "ordering " << count() << " inodes by similarity..."; - auto ti = LOG_CPU_TIMED_INFO; - order_inodes_by_similarity(); - ti << count() << " inodes ordered"; - break; - } - - case file_order_mode::NILSIMSA: { - LOG_INFO << "ordering " << count() - << " inodes using new nilsimsa similarity..."; - auto ti = LOG_CPU_TIMED_INFO; - order_inodes_by_nilsimsa(wg); - ti << count() << " inodes ordered"; - break; - } - } - - LOG_INFO << "assigning file inodes..."; - for (const auto& ino : inodes_) { - fn(ino); - } -} - -template -void inode_manager_::presort_index( - std::vector>& inodes, std::vector& index) { - auto ti = LOG_TIMED_INFO; - size_t num_name = 0; - size_t num_path = 0; - - std::sort(index.begin(), index.end(), [&](auto a, auto b) { - auto const& ia = *inodes[a]; - auto const& ib = *inodes[b]; - auto sa = ia.size(); - auto sb = ib.size(); - - if (sa < sb) { - return true; - } else if (sa > sb) { - return false; - } - - ++num_name; - - auto fa = ia.any(); - auto fb = ib.any(); - auto& na = fa->name(); - auto& nb = fb->name(); - - if (na > nb) { - return true; - } else if (na < nb) { - return false; - } - - ++num_path; - - return !fa->less_revpath(*fb); - }); - - ti << "pre-sorted index (" << num_name << " name, " << num_path - << " path lookups)"; -} - -template -void inode_manager_::order_inodes_by_nilsimsa(worker_group& wg) { - auto const& file_order = opts_.fragment_order.get(); // TODO - similarity_ordering_options opts; - opts.max_children = file_order.nilsimsa_max_children; - opts.max_cluster_size = file_order.nilsimsa_max_cluster_size; - - auto span = sortable_span(); - span.all(); - - inode_ordering(LOG_GET_LOGGER, prog_).by_nilsimsa(wg, opts, span); - - std::vector> tmp(span.begin(), span.end()); - inodes_.swap(tmp); -} - template void inode_manager_::dump(std::ostream& os) const { for_each_inode_in_order( [this, &os](auto const& ino) { ino->dump(os, opts_); }); } +template +auto inode_manager_::ordered_span(fragment_category cat, + worker_group& wg) const + -> sortable_inode_span { + std::string prefix; + if (opts_.categorizer_mgr) { + prefix = + fmt::format("[{}] ", opts_.categorizer_mgr->category_name(cat.value())); + } + + auto opts = opts_.fragment_order.get(cat); + + auto span = sortable_span(); + span.select([cat](auto const& v) { return v->has_category(cat); }); + + inode_ordering order(LOG_GET_LOGGER, prog_); + + switch (opts.mode) { + case file_order_mode::NONE: + LOG_INFO << prefix << "keeping inode order"; + break; + + case file_order_mode::PATH: { + LOG_INFO << prefix << "ordering " << span.size() + << " inodes by path name..."; + auto ti = LOG_CPU_TIMED_INFO; + order.by_path(span); + ti << prefix << span.size() << " inodes ordered"; + break; + } + + case file_order_mode::SIMILARITY: { + LOG_INFO << prefix << "ordering " << span.size() + << " inodes by similarity..."; + auto ti = LOG_CPU_TIMED_INFO; + order.by_similarity(span, cat); + ti << prefix << span.size() << " inodes ordered"; + break; + } + + case file_order_mode::NILSIMSA: { + LOG_INFO << prefix << "ordering " << span.size() + << " inodes using nilsimsa similarity..."; + similarity_ordering_options soo; + soo.max_children = opts.nilsimsa_max_children; + soo.max_cluster_size = opts.nilsimsa_max_cluster_size; + auto ti = LOG_TIMED_INFO; + order.by_nilsimsa(wg, soo, span, cat); + ti << prefix << span.size() << " inodes ordered"; + break; + } + } + + return span; +} + inode_manager::inode_manager(logger& lgr, progress& prog, inode_options const& opts) : impl_(make_unique_logging_object( diff --git a/src/dwarfs/scanner.cpp b/src/dwarfs/scanner.cpp index 5ed0b3ac..ecd1bb59 100644 --- a/src/dwarfs/scanner.cpp +++ b/src/dwarfs/scanner.cpp @@ -38,17 +38,20 @@ #include #include "dwarfs/block_data.h" +#include "dwarfs/block_manager.h" #include "dwarfs/categorizer.h" #include "dwarfs/entry.h" #include "dwarfs/error.h" #include "dwarfs/file_scanner.h" #include "dwarfs/filesystem_writer.h" +#include "dwarfs/fragment_chunkable.h" #include "dwarfs/global_entry_data.h" #include "dwarfs/inode.h" -#include "dwarfs/inode_chunkable.h" #include "dwarfs/inode_manager.h" +#include "dwarfs/inode_ordering.h" #include "dwarfs/logger.h" #include "dwarfs/metadata_v2.h" +#include "dwarfs/mmif.h" #include "dwarfs/options.h" #include "dwarfs/os_access.h" #include "dwarfs/progress.h" @@ -663,7 +666,6 @@ void scanner_::scan( } LOG_INFO << "building blocks..."; - segmenter seg(LOG_GET_LOGGER, prog, cfg_, fsw); // TODO: // - get rid of multiple worker groups @@ -671,37 +673,77 @@ void scanner_::scan( // which gets run on a worker groups; each batch keeps track of // its CPU time and affects thread naming + // segmenter seg(LOG_GET_LOGGER, prog, cfg_, fsw); + + auto blockmgr = std::make_shared(); + { - worker_group blockify("blockify", 1, 1 << 20); + size_t const num_threads = std::max(folly::hardware_concurrency(), 1u); + worker_group wg_ordering("ordering", num_threads); + worker_group wg_blockify("blockify", num_threads); - { - // TODO - size_t const num_threads = std::max(folly::hardware_concurrency(), 1u); - worker_group wg_order("ordering", num_threads); + for (auto category : im.inode_categories()) { + auto catmgr = options_.inode.categorizer_mgr.get(); + std::string meta; - // ordering.add_job([&] { - im.order_inodes(wg_order, [&](std::shared_ptr const& ino) { - blockify.add_job([&, this] { - prog.current.store(ino.get()); - inode_chunkable ic(*ino, *os_); - seg.add_chunkable(ic); - prog.inodes_written++; - }); + if (catmgr) { + meta = catmgr->category_metadata(category); + } + + wg_ordering.add_job([this, catmgr, blockmgr, category, meta, &prog, &fsw, + &im, &wg_ordering, &wg_blockify] { + wg_blockify.add_job( + [this, catmgr, blockmgr, category, meta, &prog, &fsw, + span = im.ordered_span(category, wg_ordering)]() mutable { + // TODO: segmenter config per-category + segmenter seg(LOG_GET_LOGGER, prog, blockmgr, cfg_, + [category, meta, &fsw](auto block) { + return fsw.write_block(category.value(), + std::move(block), meta); + }); + + for (auto ino : span) { + prog.current.store(ino.get()); + + // TODO: factor this code out + auto f = ino->any(); + + if (auto size = f->size(); size > 0) { + auto mm = os_->map_file(f->fs_path(), size); + file_off_t offset{0}; + + for (auto& frag : ino->fragments()) { + if (frag.category() == category) { + fragment_chunkable fc(*ino, frag, offset, *mm, catmgr); + seg.add_chunkable(fc); + prog.fragments_written++; + } + + offset += frag.size(); + } + } + + prog.inodes_written++; // TODO: remove? + } + + seg.finish(); + }); }); - // }); - - // wg_order.wait(); } LOG_INFO << "waiting for segmenting/blockifying to finish..."; - blockify.wait(); + wg_blockify.wait(); + wg_ordering.wait(); - LOG_INFO << "segmenting/blockifying CPU time: " - << time_with_unit(blockify.get_cpu_time()); + LOG_INFO << "ordering CPU time: " + << time_with_unit(wg_ordering.get_cpu_time()); + + LOG_INFO << "segmenting CPU time: " + << time_with_unit(wg_blockify.get_cpu_time()); } - seg.finish(); + // seg.finish(); wg_.wait(); prog.set_status_function([](progress const&, size_t) { @@ -722,6 +764,8 @@ void scanner_::scan( ino->append_chunks_to(mv2.chunks().value()); }); + blockmgr->map_logical_blocks(mv2.chunks().value()); + // insert dummy inode to help determine number of chunks per inode DWARFS_NOTHROW(mv2.chunk_table()->at(im.count())) = mv2.chunks()->size(); diff --git a/src/dwarfs/segmenter.cpp b/src/dwarfs/segmenter.cpp index 1c3f298b..64b409fb 100644 --- a/src/dwarfs/segmenter.cpp +++ b/src/dwarfs/segmenter.cpp @@ -39,12 +39,12 @@ #include #include "dwarfs/block_data.h" +#include "dwarfs/block_manager.h" #include "dwarfs/chunkable.h" #include "dwarfs/compiler.h" #include "dwarfs/cyclic_hash.h" #include "dwarfs/entry.h" #include "dwarfs/error.h" -#include "dwarfs/filesystem_writer.h" #include "dwarfs/logger.h" #include "dwarfs/progress.h" #include "dwarfs/segmenter.h" @@ -288,12 +288,14 @@ class active_block { template class segmenter_ final : public segmenter::impl { public: - segmenter_(logger& lgr, progress& prog, const segmenter::config& cfg, - filesystem_writer& fsw) + segmenter_(logger& lgr, progress& prog, std::shared_ptr blkmgr, + const segmenter::config& cfg, + segmenter::block_ready_cb block_ready) : LOG_PROXY_INIT(lgr) , prog_{prog} + , blkmgr_{std::move(blkmgr)} , cfg_{cfg} - , fsw_{fsw} + , block_ready_{std::move(block_ready)} , window_size_{window_size(cfg)} , window_step_{window_step(cfg)} , block_size_{block_size(cfg)} @@ -346,13 +348,13 @@ class segmenter_ final : public segmenter::impl { LOG_PROXY_DECL(LoggerPolicy); progress& prog_; + std::shared_ptr blkmgr_; const segmenter::config& cfg_; - filesystem_writer& fsw_; + segmenter::block_ready_cb block_ready_; size_t const window_size_; size_t const window_step_; size_t const block_size_; - size_t block_count_{0}; chunk_state chunk_; @@ -512,7 +514,8 @@ template void segmenter_::block_ready() { auto& block = blocks_.back(); block.finalize(stats_); - fsw_.write_block(block.data()); + auto written_block_num = block_ready_(block.data()); + blkmgr_->set_written_block(block.num(), written_block_num); ++prog_.block_count; } @@ -529,7 +532,7 @@ void segmenter_::append_to_block(chunkable& chkable, filter_.merge(b.filter()); } - blocks_.emplace_back(block_count_++, block_size_, + blocks_.emplace_back(blkmgr_->get_logical_block(), block_size_, cfg_.max_active_blocks > 0 ? window_size_ : 0, window_step_, filter_.size()); } @@ -711,9 +714,10 @@ void segmenter_::segment_and_add_data(chunkable& chkable, finish_chunk(chkable); } -segmenter::segmenter(logger& lgr, progress& prog, const config& cfg, - filesystem_writer& fsw) +segmenter::segmenter(logger& lgr, progress& prog, + std::shared_ptr blkmgr, const config& cfg, + block_ready_cb block_ready) : impl_(make_unique_logging_object( - lgr, prog, cfg, fsw)) {} + lgr, prog, std::move(blkmgr), cfg, std::move(block_ready))) {} } // namespace dwarfs diff --git a/src/mkdwarfs_main.cpp b/src/mkdwarfs_main.cpp index 7c33a6f8..760f8c9e 100644 --- a/src/mkdwarfs_main.cpp +++ b/src/mkdwarfs_main.cpp @@ -372,7 +372,7 @@ int mkdwarfs_main(int argc, sys_char** argv) { "recompress an existing filesystem (none, block, metadata, all)") ("categorize", po::value(&categorizer_list_str) - ->default_value("pcmaudio,incompressible"), + ->implicit_value("pcmaudio,incompressible"), categorize_desc.c_str()) ("order", po::value>(&order)->multitoken(), @@ -923,10 +923,6 @@ int mkdwarfs_main(int argc, sys_char** argv) { progress prog(std::move(updater), interval_ms); - block_compressor bc(compression.front()); // TODO - block_compressor schema_bc(schema_compression); - block_compressor metadata_bc(metadata_compression); - auto min_memory_req = num_workers * (UINT64_C(1) << cfg.block_size_bits); // TODO: @@ -1023,6 +1019,12 @@ int mkdwarfs_main(int argc, sys_char** argv) { return 1; } + block_compressor schema_bc(schema_compression); + block_compressor metadata_bc(metadata_compression); + + filesystem_writer fsw(*os, lgr, wg_compress, prog, schema_bc, metadata_bc, + fswopts, header_ifs.get()); + try { categorized_option compression_opt; contextual_option_parser cop("--compression", compression_opt, cp, @@ -1030,10 +1032,14 @@ int mkdwarfs_main(int argc, sys_char** argv) { cop.parse(compression); LOG_DEBUG << cop.as_string(); - compression_opt.visit_contextual([catmgr = options.inode.categorizer_mgr]( - auto cat, block_compressor const& bc) { + fsw.add_default_compressor(compression_opt.get()); + + compression_opt.visit_contextual([catmgr = options.inode.categorizer_mgr, + &fsw](auto cat, + block_compressor const& bc) { try { catmgr->set_metadata_requirements(cat, bc.metadata_requirements()); + fsw.add_category_compressor(cat, bc); } catch (std::exception const& e) { throw std::runtime_error( fmt::format("compression '{}' cannot be used for category '{}': " @@ -1046,9 +1052,6 @@ int mkdwarfs_main(int argc, sys_char** argv) { return 1; } - filesystem_writer fsw(*os, lgr, wg_compress, prog, bc, schema_bc, metadata_bc, - fswopts, header_ifs.get()); - auto ti = LOG_TIMED_INFO; try { diff --git a/test/dwarfs.cpp b/test/dwarfs.cpp index f9d0f904..d32f5b0f 100644 --- a/test/dwarfs.cpp +++ b/test/dwarfs.cpp @@ -81,7 +81,8 @@ build_dwarfs(logger& lgr, std::shared_ptr input, } block_compressor bc(compression); - filesystem_writer fsw(oss, lgr, wg, *prog, bc); + filesystem_writer fsw(oss, lgr, wg, *prog, bc, bc); + fsw.add_default_compressor(bc); s.scan(fsw, std::filesystem::path("/"), *prog, input_list); diff --git a/test/dwarfs_benchmark.cpp b/test/dwarfs_benchmark.cpp index 03198481..14fcbf3b 100644 --- a/test/dwarfs_benchmark.cpp +++ b/test/dwarfs_benchmark.cpp @@ -125,7 +125,8 @@ std::string make_filesystem(::benchmark::State const& state) { progress prog([](const progress&, bool) {}, 1000); block_compressor bc("null"); - filesystem_writer fsw(oss, lgr, wg, prog, bc); + filesystem_writer fsw(oss, lgr, wg, prog, bc, bc); + fsw.add_default_compressor(bc); s.scan(fsw, "", prog); diff --git a/test/dwarfs_compat.cpp b/test/dwarfs_compat.cpp index 1db52499..eb313d05 100644 --- a/test/dwarfs_compat.cpp +++ b/test/dwarfs_compat.cpp @@ -1109,7 +1109,8 @@ TEST_P(rewrite, filesystem_rewrite) { std::ostringstream rewritten, idss; { - filesystem_writer fsw(rewritten, lgr, wg, prog, bc); + filesystem_writer fsw(rewritten, lgr, wg, prog, bc, bc); + fsw.add_default_compressor(bc); auto mm = std::make_shared(filename); EXPECT_NO_THROW(filesystem_v2::identify(lgr, mm, idss)); EXPECT_FALSE(filesystem_v2::header(mm)); @@ -1130,7 +1131,8 @@ TEST_P(rewrite, filesystem_rewrite) { { std::istringstream hdr_iss(format_sh); filesystem_writer_options fsw_opts; - filesystem_writer fsw(rewritten, lgr, wg, prog, bc, fsw_opts, &hdr_iss); + filesystem_writer fsw(rewritten, lgr, wg, prog, bc, bc, fsw_opts, &hdr_iss); + fsw.add_default_compressor(bc); filesystem_v2::rewrite(lgr, prog, std::make_shared(filename), fsw, opts); } @@ -1155,7 +1157,9 @@ TEST_P(rewrite, filesystem_rewrite) { { std::istringstream hdr_iss("D"); filesystem_writer_options fsw_opts; - filesystem_writer fsw(rewritten2, lgr, wg, prog, bc, fsw_opts, &hdr_iss); + filesystem_writer fsw(rewritten2, lgr, wg, prog, bc, bc, fsw_opts, + &hdr_iss); + fsw.add_default_compressor(bc); filesystem_v2::rewrite(lgr, prog, std::make_shared(rewritten.str()), fsw, opts); @@ -1173,7 +1177,8 @@ TEST_P(rewrite, filesystem_rewrite) { std::ostringstream rewritten3; { - filesystem_writer fsw(rewritten3, lgr, wg, prog, bc); + filesystem_writer fsw(rewritten3, lgr, wg, prog, bc, bc); + fsw.add_default_compressor(bc); filesystem_v2::rewrite(lgr, prog, std::make_shared(rewritten2.str()), fsw, opts); @@ -1193,7 +1198,8 @@ TEST_P(rewrite, filesystem_rewrite) { { filesystem_writer_options fsw_opts; fsw_opts.remove_header = true; - filesystem_writer fsw(rewritten4, lgr, wg, prog, bc, fsw_opts); + filesystem_writer fsw(rewritten4, lgr, wg, prog, bc, bc, fsw_opts); + fsw.add_default_compressor(bc); filesystem_v2::rewrite(lgr, prog, std::make_shared(rewritten3.str()), fsw, opts); @@ -1213,7 +1219,8 @@ TEST_P(rewrite, filesystem_rewrite) { { filesystem_writer_options fsw_opts; fsw_opts.no_section_index = true; - filesystem_writer fsw(rewritten5, lgr, wg, prog, bc, fsw_opts); + filesystem_writer fsw(rewritten5, lgr, wg, prog, bc, bc, fsw_opts); + fsw.add_default_compressor(bc); filesystem_v2::rewrite(lgr, prog, std::make_shared(rewritten4.str()), fsw, opts);