refactor: use callback for delayed updating of physical block numbers

This commit is contained in:
Marcus Holland-Moritz 2023-11-20 15:55:59 +01:00
parent a37c2716c8
commit 442e7b8fd5
6 changed files with 57 additions and 20 deletions

View File

@ -28,6 +28,8 @@
#include <span>
#include <utility>
#include <folly/Function.h>
#include "dwarfs/compression_constraints.h"
#include "dwarfs/fragment_category.h"
#include "dwarfs/fstypes.h"
@ -44,6 +46,8 @@ class worker_group;
class filesystem_writer {
public:
using physical_block_cb_type = folly::Function<void(size_t)>;
filesystem_writer(
std::ostream& os, logger& lgr, worker_group& wg, progress& prog,
const block_compressor& schema_bc, const block_compressor& metadata_bc,
@ -69,10 +73,17 @@ class filesystem_writer {
impl_->copy_header(header);
}
uint32_t write_block(fragment_category::value_type cat,
std::shared_ptr<block_data>&& data,
std::optional<std::string> meta = std::nullopt) {
return impl_->write_block(cat, std::move(data), std::move(meta));
void write_block(fragment_category cat, std::shared_ptr<block_data>&& data,
physical_block_cb_type physical_block_cb,
std::optional<std::string> meta = std::nullopt) {
impl_->write_block(cat, std::move(data), std::move(physical_block_cb),
std::move(meta));
}
void write_block(fragment_category::value_type cat,
std::shared_ptr<block_data>&& data,
std::optional<std::string> meta = std::nullopt) {
impl_->write_block(cat, std::move(data), std::move(meta));
}
void write_metadata_v2_schema(std::shared_ptr<block_data>&& data) {
@ -103,9 +114,13 @@ class filesystem_writer {
get_compression_constraints(fragment_category::value_type cat,
std::string const& metadata) const = 0;
virtual void copy_header(std::span<uint8_t const> header) = 0;
virtual uint32_t write_block(fragment_category::value_type cat,
std::shared_ptr<block_data>&& data,
std::optional<std::string> meta) = 0;
virtual void
write_block(fragment_category cat, std::shared_ptr<block_data>&& data,
physical_block_cb_type physical_block_cb,
std::optional<std::string> meta) = 0;
virtual void write_block(fragment_category::value_type cat,
std::shared_ptr<block_data>&& data,
std::optional<std::string> meta) = 0;
virtual void
write_metadata_v2_schema(std::shared_ptr<block_data>&& data) = 0;
virtual void write_metadata_v2(std::shared_ptr<block_data>&& data) = 0;

View File

@ -48,7 +48,9 @@ class segmenter {
unsigned block_size_bits{22};
};
using block_ready_cb = folly::Function<size_t(std::shared_ptr<block_data>)>;
using block_ready_cb =
folly::Function<void(std::shared_ptr<block_data>,
folly::Function<void(size_t)> physical_block_cb)>;
segmenter(logger& lgr, progress& prog, std::shared_ptr<block_manager> blkmgr,
config const& cfg, compression_constraints const& cc,

View File

@ -284,6 +284,8 @@ void fsblock::build_section_header(section_header_v2& sh,
template <typename LoggerPolicy>
class filesystem_writer_ final : public filesystem_writer::impl {
public:
using physical_block_cb_type = filesystem_writer::physical_block_cb_type;
filesystem_writer_(logger& lgr, std::ostream& os, worker_group& wg,
progress& prog, const block_compressor& schema_bc,
const block_compressor& metadata_bc,
@ -298,9 +300,12 @@ class filesystem_writer_ final : public filesystem_writer::impl {
get_compression_constraints(fragment_category::value_type cat,
std::string const& metadata) const override;
void copy_header(std::span<uint8_t const> header) override;
uint32_t write_block(fragment_category::value_type cat,
std::shared_ptr<block_data>&& data,
std::optional<std::string> meta) override;
void write_block(fragment_category cat, std::shared_ptr<block_data>&& data,
physical_block_cb_type physical_block_cb,
std::optional<std::string> meta) override;
void write_block(fragment_category::value_type cat,
std::shared_ptr<block_data>&& data,
std::optional<std::string> meta) override;
void write_metadata_v2_schema(std::shared_ptr<block_data>&& data) override;
void write_metadata_v2(std::shared_ptr<block_data>&& data) override;
void write_compressed_section(section_type type, compression_type compression,
@ -566,11 +571,21 @@ void filesystem_writer_<LoggerPolicy>::copy_header(
}
template <typename LoggerPolicy>
uint32_t filesystem_writer_<LoggerPolicy>::write_block(
void filesystem_writer_<LoggerPolicy>::write_block(
fragment_category cat, std::shared_ptr<block_data>&& data,
physical_block_cb_type physical_block_cb, std::optional<std::string> meta) {
auto physical_block =
write_section(section_type::BLOCK, std::move(data),
compressor_for_category(cat.value()), std::move(meta));
physical_block_cb(physical_block);
}
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write_block(
fragment_category::value_type cat, std::shared_ptr<block_data>&& data,
std::optional<std::string> meta) {
return write_section(section_type::BLOCK, std::move(data),
compressor_for_category(cat), std::move(meta));
write_section(section_type::BLOCK, std::move(data),
compressor_for_category(cat), std::move(meta));
}
template <typename LoggerPolicy>

View File

@ -698,8 +698,9 @@ void scanner_<LoggerPolicy>::scan(
auto seg = segmenter_factory_->create(
category, cat_size, cc, blockmgr,
[category, meta, &fsw](auto block) {
return fsw.write_block(category.value(), std::move(block), meta);
[category, meta, &fsw](auto block, auto physical_block_cb) {
fsw.write_block(category, std::move(block),
std::move(physical_block_cb), meta);
});
for (auto ino : span) {

View File

@ -969,8 +969,11 @@ template <typename LoggerPolicy, typename SegmentingPolicy>
void segmenter_<LoggerPolicy, SegmentingPolicy>::block_ready() {
auto& block = blocks_.back();
block.finalize(stats_);
auto written_block_num = block_ready_(block.data());
blkmgr_->set_written_block(block.num(), written_block_num);
block_ready_(block.data(), [blkmgr = blkmgr_,
logical_block_num =
block.num()](size_t physical_block_num) {
blkmgr->set_written_block(logical_block_num, physical_block_num);
});
++prog_.block_count;
}

View File

@ -145,10 +145,11 @@ void run_segmenter_test(unsigned iters, unsigned granularity,
std::vector<std::shared_ptr<dwarfs::block_data>> written;
dwarfs::segmenter seg(lgr, prog, blkmgr, cfg, cc, total_size,
[&written](std::shared_ptr<dwarfs::block_data> blk) {
[&written](std::shared_ptr<dwarfs::block_data> blk,
auto physical_block_cb) {
size_t num = written.size();
written.push_back(blk);
return num;
physical_block_cb(num);
});
suspender.dismiss();