Introduce block_data

This commit is contained in:
Marcus Holland-Moritz 2021-03-01 18:08:02 +01:00
parent cbca2f4d0c
commit 17d7b8d96c
7 changed files with 103 additions and 45 deletions

View File

@ -48,7 +48,7 @@ class block_compressor {
block_compressor(block_compressor&& bc) = default;
block_compressor& operator=(block_compressor&& rhs) = default;
std::vector<uint8_t> compress(const std::vector<uint8_t>& data) const {
std::vector<uint8_t> compress(std::vector<uint8_t> const& data) const {
return impl_->compress(data);
}

View File

@ -0,0 +1,49 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/**
* \author Marcus Holland-Moritz (github@mhxnet.de)
* \copyright Copyright (c) Marcus Holland-Moritz
*
* This file is part of dwarfs.
*
* dwarfs is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dwarfs is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <cstdint>
#include <vector>
namespace dwarfs {
class block_data {
public:
block_data() = default;
explicit block_data(std::vector<uint8_t>&& vec)
: vec_{std::move(vec)} {}
std::vector<uint8_t> const& vec() const { return vec_; }
std::vector<uint8_t>& vec() { return vec_; }
uint8_t const* data() const { return vec_.data(); }
uint8_t* data() { return vec_.data(); }
size_t size() const { return vec_.size(); }
bool empty() const { return vec_.empty(); }
private:
std::vector<uint8_t> vec_;
};
} // namespace dwarfs

View File

@ -26,7 +26,6 @@
#include <memory>
#include <ostream>
#include <utility>
#include <vector>
#include <folly/Range.h>
@ -36,6 +35,7 @@
namespace dwarfs {
class block_compressor;
class block_data;
class logger;
class progress;
class worker_group;
@ -51,15 +51,15 @@ class filesystem_writer {
const block_compressor& schema_bc,
const block_compressor& metadata_bc, size_t max_queue_size);
void write_block(std::vector<uint8_t>&& data) {
void write_block(std::shared_ptr<block_data>&& data) {
impl_->write_block(std::move(data));
}
void write_metadata_v2_schema(std::vector<uint8_t>&& data) {
void write_metadata_v2_schema(std::shared_ptr<block_data>&& data) {
impl_->write_metadata_v2_schema(std::move(data));
}
void write_metadata_v2(std::vector<uint8_t>&& data) {
void write_metadata_v2(std::shared_ptr<block_data>&& data) {
impl_->write_metadata_v2(std::move(data));
}
@ -78,9 +78,10 @@ class filesystem_writer {
public:
virtual ~impl() = default;
virtual void write_block(std::vector<uint8_t>&& data) = 0;
virtual void write_metadata_v2_schema(std::vector<uint8_t>&& data) = 0;
virtual void write_metadata_v2(std::vector<uint8_t>&& data) = 0;
virtual void write_block(std::shared_ptr<block_data>&& data) = 0;
virtual void
write_metadata_v2_schema(std::shared_ptr<block_data>&& data) = 0;
virtual void write_metadata_v2(std::shared_ptr<block_data>&& data) = 0;
virtual void
write_compressed_section(section_type type, compression_type compression,
folly::ByteRange data) = 0;

View File

@ -32,6 +32,7 @@
#include <sparsehash/dense_hash_map>
#include "dwarfs/block_data.h"
#include "dwarfs/block_manager.h"
#include "dwarfs/entry.h"
#include "dwarfs/error.h"
@ -117,10 +118,11 @@ class block_manager_ : public block_manager::impl {
, os_(os)
, current_block_(0)
, total_blocks_size_(0)
, block_{std::make_shared<block_data>()}
, hasher_(lgr, blockhash_window_size_)
, log_(lgr)
, prog_(prog) {
block_.reserve(block_size_);
block_->vec().reserve(block_size_);
for (auto size : blockhash_window_size_) {
block_hashes_.emplace_back(size);
@ -137,7 +139,7 @@ class block_manager_ : public block_manager::impl {
}
private:
size_t cur_offset() const { return block_.size(); }
size_t cur_offset() const { return block_->size(); }
void block_ready();
void update_hashes(const hash_map_type& hm, size_t offset, size_t size);
@ -169,7 +171,7 @@ class block_manager_ : public block_manager::impl {
std::shared_ptr<os_access> os_;
size_t current_block_;
size_t total_blocks_size_;
std::vector<uint8_t> block_;
std::shared_ptr<block_data> block_;
std::vector<block_hashes_t> block_hashes_;
hasher_type hasher_;
hash_map_type hm_;
@ -185,7 +187,7 @@ block_manager::config::config()
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::finish_blocks() {
if (!block_.empty()) {
if (!block_->empty()) {
block_ready();
}
@ -206,10 +208,10 @@ void block_manager_<LoggerPolicy>::finish_blocks() {
template <typename LoggerPolicy>
void block_manager_<LoggerPolicy>::block_ready() {
std::vector<uint8_t> tmp;
auto tmp = std::make_shared<block_data>();
block_.swap(tmp);
fsw_.write_block(std::move(tmp));
block_.reserve(block_size_);
block_->vec().reserve(block_size_);
for (auto& bh : block_hashes_) {
bh.values.clear();
}
@ -277,7 +279,7 @@ void block_manager_<LoggerPolicy>::add_chunk(const std::shared_ptr<inode>& ino,
const uint8_t* p, size_t offset,
size_t size,
const hash_map_type* hm) {
LOG_TRACE << "block " << current_block_ << " size: " << block_.size()
LOG_TRACE << "block " << current_block_ << " size: " << block_->size()
<< " of " << block_size_;
if (hm) {
@ -290,15 +292,15 @@ void block_manager_<LoggerPolicy>::add_chunk(const std::shared_ptr<inode>& ino,
<< ino->any()->name() << "] - block: " << current_block_
<< " offset: " << block_offset << ", size: " << size;
block_.resize(block_offset + size);
block_->vec().resize(block_offset + size);
::memcpy(&block_[block_offset], p + offset, size);
::memcpy(block_->data() + block_offset, p + offset, size);
ino->add_chunk(current_block_, block_offset, size);
prog_.chunk_count++;
prog_.filesystem_size += size;
if (block_.size() == block_size_) {
if (block_->size() == block_size_) {
block_ready();
}
}
@ -344,6 +346,7 @@ void block_manager_<LoggerPolicy>::add_inode(std::shared_ptr<inode> ino) {
if (blockhash_window_size_.empty() or
size < blockhash_window_size_.front()) {
// no point dealing with hashes, just write it out
// XXX: might be worth checking if the whole file has a match?
add_data(ino, mm->as<uint8_t>(), size);
} else {
const uint8_t* data = mm->as<uint8_t>();
@ -478,7 +481,7 @@ template <typename LoggerPolicy>
bool block_manager_<LoggerPolicy>::get_match_window(
const std::string& indent, match_window& win, size_t& block_offset,
const uint8_t* data, const match_window& search_win) const {
const uint8_t* blockdata = &block_[0];
const uint8_t* blockdata = block_->data();
LOG_TRACE << indent << "match(block_offset=" << block_offset << ", window=["
<< win.first << ", " << win.last << "], search_win=["
@ -498,7 +501,7 @@ bool block_manager_<LoggerPolicy>::get_match_window(
while (block_offset + win.size() < block_size_ and
win.last < search_win.last and
block_offset + win.size() < block_.size() and
block_offset + win.size() < block_->size() and
blockdata[block_offset + win.size()] == data[win.last]) {
++win.last;
}

View File

@ -36,6 +36,7 @@
#include "dwarfs/block_cache.h"
#include "dwarfs/block_compressor.h"
#include "dwarfs/block_data.h"
#include "dwarfs/error.h"
#include "dwarfs/filesystem_v2.h"
#include "dwarfs/filesystem_writer.h"
@ -439,9 +440,10 @@ void filesystem_v2::rewrite(logger& lgr, progress& prog,
// TODO: multi-thread this?
if (s->type() == section_type::BLOCK) {
if (opts.recompress_block) {
auto block = block_decompressor::decompress(
s->compression(), mm->as<uint8_t>(s->start()), s->length());
prog.filesystem_size += block.size();
auto block =
std::make_shared<block_data>(block_decompressor::decompress(
s->compression(), mm->as<uint8_t>(s->start()), s->length()));
prog.filesystem_size += block->size();
writer.write_block(std::move(block));
} else {
writer.write_compressed_section(s->type(), s->compression(),
@ -451,8 +453,9 @@ void filesystem_v2::rewrite(logger& lgr, progress& prog,
}
if (opts.recompress_metadata) {
writer.write_metadata_v2_schema(std::move(schema_raw));
writer.write_metadata_v2(std::move(meta_raw));
writer.write_metadata_v2_schema(
std::make_shared<block_data>(std::move(schema_raw)));
writer.write_metadata_v2(std::make_shared<block_data>(std::move(meta_raw)));
} else {
for (auto type : section_types) {
auto& sec = DWARFS_NOTHROW(sections.at(type));

View File

@ -32,6 +32,7 @@
#include <folly/system/ThreadName.h>
#include "dwarfs/block_compressor.h"
#include "dwarfs/block_data.h"
#include "dwarfs/checksum.h"
#include "dwarfs/filesystem_writer.h"
#include "dwarfs/fstypes.h"
@ -47,7 +48,7 @@ namespace {
class fsblock {
public:
fsblock(logger& lgr, section_type type, const block_compressor& bc,
std::vector<uint8_t>&& data);
std::shared_ptr<block_data>&& data);
fsblock(section_type type, compression_type compression,
folly::ByteRange data);
@ -82,18 +83,18 @@ class raw_fsblock : public fsblock::impl {
private:
class state {
public:
state(std::vector<uint8_t>&& data, logger& lgr)
state(std::shared_ptr<block_data>&& data, logger& lgr)
: compressed_(false)
, data_(std::move(data))
, LOG_PROXY_INIT(lgr) {}
void compress(const block_compressor& bc) {
std::vector<uint8_t> tmp;
std::shared_ptr<block_data> tmp;
{
auto td = LOG_TIMED_TRACE;
tmp = bc.compress(data_);
tmp = std::make_shared<block_data>(bc.compress(data_->vec()));
td << "block compression finished";
}
@ -112,27 +113,27 @@ class raw_fsblock : public fsblock::impl {
cond_.wait(lock, [&]() -> bool { return compressed_; });
}
const std::vector<uint8_t>& data() const { return data_; }
std::vector<uint8_t> const& data() const { return data_->vec(); }
size_t size() const {
std::lock_guard<std::mutex> lock(mx_);
return data_.size();
return data_->size();
}
private:
mutable std::mutex mx_;
std::condition_variable cond_;
std::atomic<bool> compressed_;
std::vector<uint8_t> data_;
std::shared_ptr<block_data> data_;
LOG_PROXY_DECL(LoggerPolicy);
};
public:
raw_fsblock(logger& lgr, section_type type, const block_compressor& bc,
std::vector<uint8_t>&& data)
std::shared_ptr<block_data>&& data)
: type_(type)
, bc_(bc)
, uncompressed_size_(data.size())
, uncompressed_size_(data->size())
, state_(std::make_shared<state>(std::move(data), lgr))
, LOG_PROXY_INIT(lgr) {}
@ -193,7 +194,7 @@ class compressed_fsblock : public fsblock::impl {
};
fsblock::fsblock(logger& lgr, section_type type, const block_compressor& bc,
std::vector<uint8_t>&& data)
std::shared_ptr<block_data>&& data)
: impl_(make_unique_logging_object<impl, raw_fsblock, logger_policies>(
lgr, type, bc, std::move(data))) {}
@ -211,9 +212,9 @@ class filesystem_writer_ : public filesystem_writer::impl {
size_t max_queue_size);
~filesystem_writer_() noexcept;
void write_block(std::vector<uint8_t>&& data) override;
void write_metadata_v2_schema(std::vector<uint8_t>&& data) override;
void write_metadata_v2(std::vector<uint8_t>&& data) override;
void write_block(std::shared_ptr<block_data>&& data) override;
void write_metadata_v2_schema(std::shared_ptr<block_data>&& data) override;
void write_metadata_v2(std::shared_ptr<block_data>&& data) override;
void write_compressed_section(section_type type, compression_type compression,
folly::ByteRange data) override;
void flush() override;
@ -221,7 +222,7 @@ class filesystem_writer_ : public filesystem_writer::impl {
int queue_fill() const override { return static_cast<int>(wg_.queue_size()); }
private:
void write_section(section_type type, std::vector<uint8_t>&& data,
void write_section(section_type type, std::shared_ptr<block_data>&& data,
block_compressor const& bc);
void write(section_type type, compression_type compression,
folly::ByteRange range);
@ -375,7 +376,7 @@ void filesystem_writer_<LoggerPolicy>::write(section_type type,
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write_section(
section_type type, std::vector<uint8_t>&& data,
section_type type, std::shared_ptr<block_data>&& data,
block_compressor const& bc) {
{
std::unique_lock<std::mutex> lock(mx_);
@ -413,19 +414,19 @@ void filesystem_writer_<LoggerPolicy>::write_compressed_section(
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write_block(
std::vector<uint8_t>&& data) {
std::shared_ptr<block_data>&& data) {
write_section(section_type::BLOCK, std::move(data), bc_);
}
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write_metadata_v2_schema(
std::vector<uint8_t>&& data) {
std::shared_ptr<block_data>&& data) {
write_section(section_type::METADATA_V2_SCHEMA, std::move(data), schema_bc_);
}
template <typename LoggerPolicy>
void filesystem_writer_<LoggerPolicy>::write_metadata_v2(
std::vector<uint8_t>&& data) {
std::shared_ptr<block_data>&& data) {
write_section(section_type::METADATA_V2, std::move(data), metadata_bc_);
}

View File

@ -39,6 +39,7 @@
#include <fmt/format.h>
#include "dwarfs/block_data.h"
#include "dwarfs/entry.h"
#include "dwarfs/error.h"
#include "dwarfs/filesystem_writer.h"
@ -627,8 +628,8 @@ void scanner_<LoggerPolicy>::scan(filesystem_writer& fsw,
auto [schema, data] = metadata_v2::freeze(mv2);
fsw.write_metadata_v2_schema(std::move(schema));
fsw.write_metadata_v2(std::move(data));
fsw.write_metadata_v2_schema(std::make_shared<block_data>(std::move(schema)));
fsw.write_metadata_v2(std::make_shared<block_data>(std::move(data)));
LOG_INFO << "waiting for compression to finish...";