mirror of
https://github.com/mhx/dwarfs.git
synced 2025-09-10 13:04:15 -04:00
feat(block_merger): support block policies and partial release
This commit is contained in:
parent
a284aecc35
commit
6ca388fa6b
@ -22,6 +22,9 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <stdexcept>
|
||||||
|
|
||||||
|
#include <fmt/format.h>
|
||||||
|
|
||||||
namespace dwarfs {
|
namespace dwarfs {
|
||||||
|
|
||||||
@ -29,7 +32,7 @@ class block_merger_base {
|
|||||||
public:
|
public:
|
||||||
virtual ~block_merger_base() = default;
|
virtual ~block_merger_base() = default;
|
||||||
|
|
||||||
virtual void release() = 0;
|
virtual void release(size_t amount) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
@ -42,14 +45,31 @@ class merged_block_holder {
|
|||||||
explicit merged_block_holder(block_type&& blk)
|
explicit merged_block_holder(block_type&& blk)
|
||||||
: block_{std::move(blk)} {}
|
: block_{std::move(blk)} {}
|
||||||
|
|
||||||
merged_block_holder(block_type&& blk,
|
merged_block_holder(block_type&& blk, size_t size,
|
||||||
std::shared_ptr<block_merger_base> merger)
|
std::shared_ptr<block_merger_base> merger)
|
||||||
: block_{std::move(blk)}
|
: block_{std::move(blk)}
|
||||||
|
, size_{size}
|
||||||
, merger_{std::move(merger)} {}
|
, merger_{std::move(merger)} {}
|
||||||
|
|
||||||
~merged_block_holder() {
|
~merged_block_holder() { release(); }
|
||||||
|
|
||||||
|
void release() {
|
||||||
if (merger_) {
|
if (merger_) {
|
||||||
merger_->release();
|
merger_->release(size_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void release_partial(size_t amount) {
|
||||||
|
if (amount > size_) {
|
||||||
|
throw std::runtime_error(fmt::format(
|
||||||
|
"merged_block_holder::release_partial: amount {} > size {}", amount,
|
||||||
|
size_));
|
||||||
|
}
|
||||||
|
|
||||||
|
size_ -= amount;
|
||||||
|
|
||||||
|
if (merger_) {
|
||||||
|
merger_->release(amount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,6 +96,7 @@ class merged_block_holder {
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
block_type block_;
|
block_type block_;
|
||||||
|
size_t size_{0};
|
||||||
std::shared_ptr<block_merger_base> merger_;
|
std::shared_ptr<block_merger_base> merger_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -24,50 +24,46 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <functional>
|
#include <deque>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <optional>
|
#include <optional>
|
||||||
#include <queue>
|
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
|
|
||||||
|
#include <folly/Function.h>
|
||||||
|
#include <folly/String.h>
|
||||||
|
#include <folly/gen/String.h>
|
||||||
|
|
||||||
#include "dwarfs/block_merger.h"
|
#include "dwarfs/block_merger.h"
|
||||||
|
#include "dwarfs/terminal.h"
|
||||||
|
|
||||||
namespace dwarfs::detail {
|
namespace dwarfs::detail {
|
||||||
|
|
||||||
/**
|
template <typename SourceT, typename BlockT, typename BlockPolicy>
|
||||||
* TODO: Support different policies for how much data can be queued.
|
|
||||||
* The current behavior is to limit the total number of blocks that
|
|
||||||
* can be queued. This is not ideal for sources that produce blocks
|
|
||||||
* of different sizes, or when blocks that are still held without
|
|
||||||
* being released change their size (because they have been compressed).
|
|
||||||
*
|
|
||||||
* You can then release a *size* instead of a *block*, and each source
|
|
||||||
* is assigned a worst-case size that is used to determine if a new
|
|
||||||
* block can be queued or not (via the source_distance() function).
|
|
||||||
*/
|
|
||||||
|
|
||||||
template <typename SourceT, typename BlockT>
|
|
||||||
class multi_queue_block_merger_impl : public block_merger_base,
|
class multi_queue_block_merger_impl : public block_merger_base,
|
||||||
public block_merger<SourceT, BlockT> {
|
public block_merger<SourceT, BlockT>,
|
||||||
|
private BlockPolicy {
|
||||||
public:
|
public:
|
||||||
static constexpr bool const debug{false};
|
static constexpr bool const debug{false};
|
||||||
|
|
||||||
using source_type = SourceT;
|
using source_type = SourceT;
|
||||||
using block_type = BlockT;
|
using block_type = BlockT;
|
||||||
using on_block_merged_callback_type = std::function<void(block_type&&)>;
|
using on_block_merged_callback_type =
|
||||||
|
folly::Function<void(block_type&&, size_t)>;
|
||||||
|
|
||||||
multi_queue_block_merger_impl(
|
multi_queue_block_merger_impl(
|
||||||
size_t num_active_slots, size_t max_queued_blocks,
|
size_t num_active_slots, size_t max_queued_size,
|
||||||
std::vector<source_type> const& sources,
|
std::vector<source_type> const& sources,
|
||||||
on_block_merged_callback_type on_block_merged_callback)
|
on_block_merged_callback_type&& on_block_merged_callback,
|
||||||
: num_queueable_{max_queued_blocks}
|
BlockPolicy&& policy)
|
||||||
|
: BlockPolicy{std::move(policy)}
|
||||||
|
, queueable_size_{max_queued_size}
|
||||||
, source_queue_{sources.begin(), sources.end()}
|
, source_queue_{sources.begin(), sources.end()}
|
||||||
, active_slots_(num_active_slots)
|
, active_slots_(num_active_slots)
|
||||||
, on_block_merged_callback_{on_block_merged_callback} {
|
, on_block_merged_callback_{std::move(on_block_merged_callback)} {
|
||||||
for (size_t i = 0; i < active_slots_.size() && !source_queue_.empty();
|
for (size_t i = 0; i < active_slots_.size() && !source_queue_.empty();
|
||||||
++i) {
|
++i) {
|
||||||
active_slots_[i] = source_queue_.front();
|
active_slots_[i] = source_queue_.front();
|
||||||
@ -75,22 +71,40 @@ class multi_queue_block_merger_impl : public block_merger_base,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
multi_queue_block_merger_impl(multi_queue_block_merger_impl&&) = default;
|
||||||
|
multi_queue_block_merger_impl&
|
||||||
|
operator=(multi_queue_block_merger_impl&&) = default;
|
||||||
|
multi_queue_block_merger_impl(const multi_queue_block_merger_impl&) = delete;
|
||||||
|
multi_queue_block_merger_impl&
|
||||||
|
operator=(const multi_queue_block_merger_impl&) = delete;
|
||||||
|
|
||||||
void add(source_type src, block_type blk) override {
|
void add(source_type src, block_type blk) override {
|
||||||
|
auto const block_size = this->block_size(blk);
|
||||||
|
|
||||||
std::unique_lock lock{mx_};
|
std::unique_lock lock{mx_};
|
||||||
|
|
||||||
cv_.wait(lock,
|
cv_.wait(lock, [this, &src, &block_size] {
|
||||||
[this, &src] { return source_distance(src) < num_queueable_; });
|
// if this is the active slot, we can accept the block if there is
|
||||||
|
// enough space left in the queue
|
||||||
|
if (active_slots_[active_slot_index_] == src) {
|
||||||
|
return block_size <= queueable_size_;
|
||||||
|
}
|
||||||
|
|
||||||
--num_queueable_;
|
// otherwise, we must ensure that it is always possible to accept
|
||||||
|
// a worst case sized block
|
||||||
|
return block_size + max_worst_case_source_block_size() <= queueable_size_;
|
||||||
|
});
|
||||||
|
|
||||||
|
queueable_size_ -= block_size;
|
||||||
|
|
||||||
if (!is_valid_source(src)) {
|
if (!is_valid_source(src)) {
|
||||||
throw std::runtime_error{"invalid source"};
|
throw std::runtime_error{"invalid source"};
|
||||||
}
|
}
|
||||||
|
|
||||||
block_queues_[src].emplace(std::move(blk));
|
block_queues_[src].emplace_back(std::move(blk));
|
||||||
|
|
||||||
if constexpr (debug) {
|
if constexpr (debug) {
|
||||||
dump_state(fmt::format("add({})", src));
|
dump_state(fmt::format("add({}, {})", src, block_size), termcolor::RED);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (try_merge_block()) {
|
while (try_merge_block()) {
|
||||||
@ -102,10 +116,10 @@ class multi_queue_block_merger_impl : public block_merger_base,
|
|||||||
void finish(source_type src) override {
|
void finish(source_type src) override {
|
||||||
std::unique_lock lock{mx_};
|
std::unique_lock lock{mx_};
|
||||||
|
|
||||||
block_queues_[src].emplace(std::nullopt);
|
block_queues_[src].emplace_back(std::nullopt);
|
||||||
|
|
||||||
if constexpr (debug) {
|
if constexpr (debug) {
|
||||||
dump_state(fmt::format("finish({})", src));
|
dump_state(fmt::format("finish({})", src), termcolor::CYAN);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (try_merge_block()) {
|
while (try_merge_block()) {
|
||||||
@ -114,51 +128,73 @@ class multi_queue_block_merger_impl : public block_merger_base,
|
|||||||
cv_.notify_all();
|
cv_.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
void release() override {
|
void release(size_t amount) override {
|
||||||
std::unique_lock lock{mx_};
|
std::unique_lock lock{mx_};
|
||||||
|
|
||||||
assert(num_releaseable_ > 0);
|
assert(releaseable_size_ >= amount);
|
||||||
|
|
||||||
--num_releaseable_;
|
releaseable_size_ -= amount;
|
||||||
++num_queueable_;
|
queueable_size_ += amount;
|
||||||
|
|
||||||
if constexpr (debug) {
|
if constexpr (debug) {
|
||||||
dump_state("release");
|
dump_state(fmt::format("release({})", amount), termcolor::YELLOW);
|
||||||
}
|
}
|
||||||
|
|
||||||
cv_.notify_all();
|
cv_.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void dump_state(std::string what) const {
|
void dump_state(std::string what, termcolor color) const {
|
||||||
std::cout << "**** " << what << " ****" << std::endl;
|
std::cout << terminal_colored(fmt::format("**** {} ****", what), color)
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
std::cout << "index: " << active_slot_index_
|
std::cout << "index: " << active_slot_index_
|
||||||
<< ", queueable: " << num_queueable_
|
<< ", queueable: " << queueable_size_
|
||||||
<< ", releaseable: " << num_releaseable_ << std::endl;
|
<< ", releaseable: " << releaseable_size_ << std::endl;
|
||||||
|
|
||||||
std::cout << "active: ";
|
std::cout << "active: ";
|
||||||
for (auto const& src : active_slots_) {
|
for (size_t i = 0; i < active_slots_.size(); ++i) {
|
||||||
|
auto const& src = active_slots_[i];
|
||||||
if (src) {
|
if (src) {
|
||||||
std::cout << src.value() << " ";
|
std::cout << terminal_colored(
|
||||||
|
fmt::format("{} ", src.value()),
|
||||||
|
i == active_slot_index_ ? termcolor::BOLD_GREEN : termcolor::GRAY);
|
||||||
} else {
|
} else {
|
||||||
std::cout << "- ";
|
std::cout << terminal_colored("- ", termcolor::GRAY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
std::cout << std::endl;
|
std::cout << std::endl;
|
||||||
|
|
||||||
std::cout << "queue: ";
|
std::cout << "queued: ";
|
||||||
for (auto const& src : source_queue_) {
|
for (auto const& src : source_queue_) {
|
||||||
std::cout << src << " ";
|
std::cout << src << " ";
|
||||||
}
|
}
|
||||||
std::cout << std::endl;
|
std::cout << std::endl;
|
||||||
|
|
||||||
std::cout << "blocks: ";
|
|
||||||
for (auto const& [src, q] : block_queues_) {
|
for (auto const& [src, q] : block_queues_) {
|
||||||
std::cout << src << "(" << q.size() << ") ";
|
if (q.empty()) {
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto const queued_sizes = folly::join(
|
||||||
|
", ", folly::gen::from(q) | folly::gen::map([this](auto const& blk) {
|
||||||
|
return blk.has_value()
|
||||||
|
? std::to_string(this->block_size(*blk))
|
||||||
|
: "&";
|
||||||
|
}) | folly::gen::as<std::vector<std::string>>());
|
||||||
|
|
||||||
|
auto const text =
|
||||||
|
fmt::format("blocks({}): {} -> {}", src, q.size(), queued_sizes);
|
||||||
|
|
||||||
|
if (src == active_slots_[active_slot_index_]) {
|
||||||
|
std::cout << terminal_colored(text, termcolor::BOLD_GREEN);
|
||||||
|
} else {
|
||||||
|
std::cout << text;
|
||||||
|
}
|
||||||
|
|
||||||
std::cout << std::endl;
|
std::cout << std::endl;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool is_valid_source(source_type src) const {
|
bool is_valid_source(source_type src) const {
|
||||||
return std::find(begin(active_slots_), end(active_slots_), src) !=
|
return std::find(begin(active_slots_), end(active_slots_), src) !=
|
||||||
@ -167,29 +203,25 @@ class multi_queue_block_merger_impl : public block_merger_base,
|
|||||||
end(source_queue_);
|
end(source_queue_);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t source_distance(source_type src) const {
|
size_t max_worst_case_source_block_size() const {
|
||||||
auto ix = active_slot_index_;
|
if (!cached_max_worst_case_source_block_size_) {
|
||||||
size_t distance{0};
|
size_t max_size{0};
|
||||||
|
|
||||||
while (active_slots_[ix] && active_slots_[ix].value() != src) {
|
for (auto const& src : active_slots_) {
|
||||||
++distance;
|
if (src) {
|
||||||
|
max_size =
|
||||||
do {
|
std::max(max_size, this->worst_case_source_block_size(*src));
|
||||||
ix = (ix + 1) % active_slots_.size();
|
|
||||||
} while (ix != active_slot_index_ && !active_slots_[ix]);
|
|
||||||
|
|
||||||
if (ix == active_slot_index_) {
|
|
||||||
auto it = std::find(begin(source_queue_), end(source_queue_), src);
|
|
||||||
distance += std::distance(begin(source_queue_), it);
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if constexpr (debug) {
|
for (auto const& src : source_queue_) {
|
||||||
std::cout << "distance(" << src << "): " << distance << std::endl;
|
max_size = std::max(max_size, this->worst_case_source_block_size(src));
|
||||||
}
|
}
|
||||||
|
|
||||||
return distance;
|
cached_max_worst_case_source_block_size_ = max_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
return *cached_max_worst_case_source_block_size_;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool try_merge_block() {
|
bool try_merge_block() {
|
||||||
@ -205,16 +237,19 @@ class multi_queue_block_merger_impl : public block_merger_base,
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto blk = std::move(it->second.front());
|
auto blk = std::move(it->second.front());
|
||||||
it->second.pop();
|
it->second.pop_front();
|
||||||
|
|
||||||
const bool not_last = blk.has_value();
|
auto const not_last = blk.has_value();
|
||||||
|
std::optional<size_t> block_size;
|
||||||
|
|
||||||
if (not_last) {
|
if (not_last) {
|
||||||
++num_releaseable_;
|
block_size = this->block_size(*blk);
|
||||||
on_block_merged_callback_(std::move(*blk));
|
releaseable_size_ += *block_size;
|
||||||
|
on_block_merged_callback_(std::move(*blk), *block_size);
|
||||||
} else {
|
} else {
|
||||||
block_queues_.erase(it);
|
block_queues_.erase(it);
|
||||||
update_active(ix);
|
update_active(ix);
|
||||||
|
cached_max_worst_case_source_block_size_.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
do {
|
do {
|
||||||
@ -222,8 +257,12 @@ class multi_queue_block_merger_impl : public block_merger_base,
|
|||||||
} while (active_slot_index_ != ix && !active_slots_[active_slot_index_]);
|
} while (active_slot_index_ != ix && !active_slots_[active_slot_index_]);
|
||||||
|
|
||||||
if constexpr (debug) {
|
if constexpr (debug) {
|
||||||
dump_state(not_last ? fmt::format("merge({})", src)
|
if (not_last) {
|
||||||
: fmt::format("final({})", src));
|
dump_state(fmt::format("merge({}, {})", src, *block_size),
|
||||||
|
termcolor::GREEN);
|
||||||
|
} else {
|
||||||
|
dump_state(fmt::format("final({})", src), termcolor::BOLD_GREEN);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return active_slot_index_ != ix || active_slots_[active_slot_index_];
|
return active_slot_index_ != ix || active_slots_[active_slot_index_];
|
||||||
@ -241,9 +280,10 @@ class multi_queue_block_merger_impl : public block_merger_base,
|
|||||||
std::recursive_mutex mx_;
|
std::recursive_mutex mx_;
|
||||||
std::condition_variable_any cv_;
|
std::condition_variable_any cv_;
|
||||||
size_t active_slot_index_{0};
|
size_t active_slot_index_{0};
|
||||||
size_t num_queueable_;
|
size_t queueable_size_;
|
||||||
size_t num_releaseable_{0};
|
size_t releaseable_size_{0};
|
||||||
std::unordered_map<source_type, std::queue<std::optional<block_type>>>
|
std::optional<size_t> mutable cached_max_worst_case_source_block_size_;
|
||||||
|
std::unordered_map<source_type, std::deque<std::optional<block_type>>>
|
||||||
block_queues_;
|
block_queues_;
|
||||||
std::deque<source_type> source_queue_;
|
std::deque<source_type> source_queue_;
|
||||||
std::vector<std::optional<source_type>> active_slots_;
|
std::vector<std::optional<source_type>> active_slots_;
|
||||||
|
@ -21,15 +21,30 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <functional>
|
#include <cassert>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
#include <folly/Function.h>
|
||||||
|
|
||||||
#include "dwarfs/block_merger.h"
|
#include "dwarfs/block_merger.h"
|
||||||
#include "dwarfs/detail/multi_queue_block_merger_impl.h"
|
#include "dwarfs/detail/multi_queue_block_merger_impl.h"
|
||||||
|
|
||||||
namespace dwarfs {
|
namespace dwarfs {
|
||||||
|
|
||||||
|
class block_merger_whole_block_policy {
|
||||||
|
public:
|
||||||
|
template <typename BlockT>
|
||||||
|
static size_t block_size(BlockT const&) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename SourceT>
|
||||||
|
static size_t worst_case_source_block_size(SourceT const&) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deterministically merge blocks from multiple sources into a single stream.
|
* Deterministically merge blocks from multiple sources into a single stream.
|
||||||
*
|
*
|
||||||
@ -44,7 +59,9 @@ namespace dwarfs {
|
|||||||
* that are used to produce blocks. While it is possible to use more threads
|
* that are used to produce blocks. While it is possible to use more threads
|
||||||
* than active slots, this will not improve performance and will only increase
|
* than active slots, this will not improve performance and will only increase
|
||||||
* the memory footprint. However, it is not possible to use less threads than
|
* the memory footprint. However, it is not possible to use less threads than
|
||||||
* active slots, as this will cause the merger to eventually block.
|
* active slots, as this will cause the merger to ultimately block all threads
|
||||||
|
* and deadlock, since it is assuming that another thread will eventually add
|
||||||
|
* more blocks.
|
||||||
*
|
*
|
||||||
* The order of the blocks in the output stream is only determined by the order
|
* The order of the blocks in the output stream is only determined by the order
|
||||||
* of the sources and the number of active slots. The number of queued blocks
|
* of the sources and the number of active slots. The number of queued blocks
|
||||||
@ -63,38 +80,70 @@ namespace dwarfs {
|
|||||||
* the holder is alive, the held block will count towards the number of
|
* the holder is alive, the held block will count towards the number of
|
||||||
* queued blocks. Once the holder is destroyed, the held block will be
|
* queued blocks. Once the holder is destroyed, the held block will be
|
||||||
* released and the number of queued blocks will be decremented.
|
* released and the number of queued blocks will be decremented.
|
||||||
|
*
|
||||||
|
* It is also possible to provide a policy that returns the size of a block
|
||||||
|
* as well as the worst case size for a block from a certain source. This
|
||||||
|
* can be useful to keep an upper bound on the memory usage of the merger.
|
||||||
|
* It is even possible to only partially release a block, e.g. after the
|
||||||
|
* block has been compressed.
|
||||||
*/
|
*/
|
||||||
template <typename SourceT, typename BlockT>
|
template <typename SourceT, typename BlockT,
|
||||||
|
typename BlockPolicy = block_merger_whole_block_policy>
|
||||||
class multi_queue_block_merger : public block_merger<SourceT, BlockT> {
|
class multi_queue_block_merger : public block_merger<SourceT, BlockT> {
|
||||||
public:
|
public:
|
||||||
using source_type = SourceT;
|
using source_type = SourceT;
|
||||||
using block_type = BlockT;
|
using block_type = BlockT;
|
||||||
using block_holder_type = merged_block_holder<block_type>;
|
using block_holder_type = merged_block_holder<block_type>;
|
||||||
using on_block_merged_callback_type = std::function<void(block_holder_type)>;
|
using on_block_merged_callback_type =
|
||||||
|
folly::Function<void(block_holder_type)>;
|
||||||
|
|
||||||
|
multi_queue_block_merger() = default;
|
||||||
|
|
||||||
multi_queue_block_merger(
|
multi_queue_block_merger(
|
||||||
size_t num_active_slots, size_t max_queued_blocks,
|
size_t num_active_slots, size_t max_queued_blocks,
|
||||||
std::vector<source_type> const& sources,
|
std::vector<source_type> const& sources,
|
||||||
on_block_merged_callback_type on_block_merged_callback)
|
on_block_merged_callback_type&& on_block_merged_callback,
|
||||||
: impl_{std::make_shared<
|
BlockPolicy&& policy = block_merger_whole_block_policy{})
|
||||||
detail::multi_queue_block_merger_impl<SourceT, BlockT>>(
|
: state_{std::make_unique<state>(
|
||||||
num_active_slots, max_queued_blocks, sources,
|
num_active_slots, max_queued_blocks, sources,
|
||||||
[this](block_type&& blk) { on_block_merged(std::move(blk)); })}
|
std::move(on_block_merged_callback), std::move(policy))} {}
|
||||||
, on_block_merged_callback_{on_block_merged_callback} {}
|
|
||||||
|
|
||||||
void add(source_type src, block_type blk) override {
|
void add(source_type src, block_type blk) override {
|
||||||
impl_->add(std::move(src), std::move(blk));
|
assert(state_);
|
||||||
|
state_->impl->add(std::move(src), std::move(blk));
|
||||||
}
|
}
|
||||||
|
|
||||||
void finish(source_type src) override { impl_->finish(std::move(src)); }
|
void finish(source_type src) override {
|
||||||
|
assert(state_);
|
||||||
|
state_->impl->finish(std::move(src));
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void on_block_merged(block_type&& blk) {
|
using impl_type =
|
||||||
on_block_merged_callback_(block_holder_type{std::move(blk), impl_});
|
detail::multi_queue_block_merger_impl<SourceT, BlockT, BlockPolicy>;
|
||||||
|
|
||||||
|
struct state {
|
||||||
|
state(size_t num_active_slots, size_t max_queued_blocks,
|
||||||
|
std::vector<source_type> const& sources,
|
||||||
|
on_block_merged_callback_type&& on_block_merged_callback,
|
||||||
|
BlockPolicy&& policy = block_merger_whole_block_policy{})
|
||||||
|
: callback{std::move(on_block_merged_callback)}
|
||||||
|
, impl{std::make_shared<impl_type>(
|
||||||
|
num_active_slots, max_queued_blocks, sources,
|
||||||
|
[this](block_type&& blk, size_t size) {
|
||||||
|
on_block_merged(std::move(blk), size);
|
||||||
|
},
|
||||||
|
std::move(policy))} {}
|
||||||
|
|
||||||
|
void on_block_merged(block_type&& blk, size_t size) {
|
||||||
|
callback(block_holder_type{std::move(blk), size, impl});
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<detail::multi_queue_block_merger_impl<SourceT, BlockT>> impl_;
|
on_block_merged_callback_type callback;
|
||||||
on_block_merged_callback_type on_block_merged_callback_;
|
std::shared_ptr<impl_type> impl;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::unique_ptr<state> state_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace dwarfs
|
} // namespace dwarfs
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
#include <chrono>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <numeric>
|
#include <numeric>
|
||||||
#include <queue>
|
#include <queue>
|
||||||
@ -43,11 +44,79 @@ namespace {
|
|||||||
|
|
||||||
constexpr int const debuglevel{0};
|
constexpr int const debuglevel{0};
|
||||||
|
|
||||||
constexpr size_t const max_runs{250};
|
constexpr size_t const max_runs_regular{250};
|
||||||
|
constexpr size_t const max_runs_partial{50};
|
||||||
constexpr size_t const num_runner_threads{16};
|
constexpr size_t const num_runner_threads{16};
|
||||||
constexpr size_t const num_repetitions{4};
|
constexpr size_t const num_repetitions{4};
|
||||||
|
|
||||||
using block = std::pair<size_t, size_t>;
|
struct block {
|
||||||
|
static constexpr bool const kIsSized{false};
|
||||||
|
|
||||||
|
block() = default;
|
||||||
|
block(size_t src_id, size_t idx, size_t /*sz*/)
|
||||||
|
: source_id{src_id}
|
||||||
|
, index{idx} {}
|
||||||
|
|
||||||
|
bool operator==(block const&) const = default;
|
||||||
|
auto operator<=>(block const&) const = default;
|
||||||
|
|
||||||
|
std::ostream& operator<<(std::ostream& os) const {
|
||||||
|
return os << source_id << "." << index;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t source_id;
|
||||||
|
size_t index;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct sized_block {
|
||||||
|
static constexpr bool const kIsSized{true};
|
||||||
|
|
||||||
|
sized_block() = default;
|
||||||
|
sized_block(size_t src_id, size_t idx, size_t sz)
|
||||||
|
: source_id{src_id}
|
||||||
|
, index{idx}
|
||||||
|
, size{sz} {}
|
||||||
|
|
||||||
|
bool operator==(sized_block const&) const = default;
|
||||||
|
auto operator<=>(sized_block const&) const = default;
|
||||||
|
|
||||||
|
std::ostream& operator<<(std::ostream& os) const {
|
||||||
|
return os << source_id << "." << index << " (" << size << ")";
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t source_id;
|
||||||
|
size_t index;
|
||||||
|
size_t size;
|
||||||
|
};
|
||||||
|
|
||||||
|
class sized_block_merger_policy {
|
||||||
|
public:
|
||||||
|
sized_block_merger_policy(std::vector<size_t>&& worst_case_block_size)
|
||||||
|
: worst_case_block_size_{std::move(worst_case_block_size)} {}
|
||||||
|
|
||||||
|
static size_t block_size(sized_block const& blk) { return blk.size; }
|
||||||
|
size_t worst_case_source_block_size(size_t source_id) const {
|
||||||
|
return worst_case_block_size_[source_id];
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::vector<size_t> worst_case_block_size_;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename BlockT>
|
||||||
|
struct timed_release_block {
|
||||||
|
std::chrono::steady_clock::time_point when;
|
||||||
|
merged_block_holder<BlockT> holder;
|
||||||
|
|
||||||
|
timed_release_block(std::chrono::steady_clock::time_point when,
|
||||||
|
merged_block_holder<BlockT>&& holder)
|
||||||
|
: when{when}
|
||||||
|
, holder{std::move(holder)} {}
|
||||||
|
|
||||||
|
bool operator<(timed_release_block const& other) const {
|
||||||
|
return when > other.when;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Use std::shared_mutex because folly::SharedMutex might trigger TSAN
|
// Use std::shared_mutex because folly::SharedMutex might trigger TSAN
|
||||||
template <typename T>
|
template <typename T>
|
||||||
@ -56,16 +125,18 @@ using synchronized = folly::Synchronized<T, std::shared_mutex>;
|
|||||||
template <typename T>
|
template <typename T>
|
||||||
using sync_queue = synchronized<std::queue<T>>;
|
using sync_queue = synchronized<std::queue<T>>;
|
||||||
|
|
||||||
|
template <typename BlockT>
|
||||||
class source {
|
class source {
|
||||||
public:
|
public:
|
||||||
source(size_t id, std::mt19937& delay_rng, std::mt19937& rng,
|
source(size_t id, std::mt19937& delay_rng, std::mt19937& rng,
|
||||||
size_t max_blocks = 20, double ips = 5000.0)
|
size_t max_blocks = 20, double ips = 5000.0, size_t max_size = 10000)
|
||||||
: id_{id}
|
: id_{id}
|
||||||
, blocks_{init_blocks(delay_rng, rng, max_blocks, ips)} {}
|
, blocks_{init_blocks(delay_rng, rng, max_blocks, ips, max_size)} {}
|
||||||
|
|
||||||
std::tuple<block, bool, double> next() {
|
std::tuple<BlockT, bool, double> next() {
|
||||||
auto idx = idx_++;
|
auto idx = idx_++;
|
||||||
return {std::make_pair(id_, idx), idx_ >= blocks_.size(), blocks_[idx]};
|
return {BlockT(id_, idx, blocks_[idx].first), idx_ >= blocks_.size(),
|
||||||
|
blocks_[idx].second};
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t id() const { return id_; }
|
size_t id() const { return id_; }
|
||||||
@ -73,33 +144,39 @@ class source {
|
|||||||
size_t num_blocks() const { return blocks_.size(); }
|
size_t num_blocks() const { return blocks_.size(); }
|
||||||
|
|
||||||
std::chrono::nanoseconds total_time() const {
|
std::chrono::nanoseconds total_time() const {
|
||||||
auto seconds = std::accumulate(begin(blocks_), end(blocks_), 0.0);
|
auto seconds = std::accumulate(
|
||||||
|
begin(blocks_), end(blocks_), 0.0,
|
||||||
|
[](auto const& a, auto const& b) { return a + b.second; });
|
||||||
return std::chrono::duration_cast<std::chrono::nanoseconds>(
|
return std::chrono::duration_cast<std::chrono::nanoseconds>(
|
||||||
std::chrono::duration<double>(seconds));
|
std::chrono::duration<double>(seconds));
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static std::vector<double>
|
static std::vector<std::pair<size_t, double>>
|
||||||
init_blocks(std::mt19937& delay_rng, std::mt19937& rng, size_t max_blocks,
|
init_blocks(std::mt19937& delay_rng, std::mt19937& rng, size_t max_blocks,
|
||||||
double ips) {
|
double ips, size_t max_size) {
|
||||||
std::uniform_int_distribution<> idist(1, max_blocks);
|
std::uniform_int_distribution<size_t> bdist(1, max_blocks);
|
||||||
|
std::uniform_int_distribution<size_t> sdist(BlockT::kIsSized ? 1 : 0,
|
||||||
|
max_size);
|
||||||
std::exponential_distribution<> edist(ips);
|
std::exponential_distribution<> edist(ips);
|
||||||
std::vector<double> blocks;
|
std::vector<std::pair<size_t, double>> blocks;
|
||||||
blocks.resize(idist(rng));
|
blocks.resize(bdist(rng));
|
||||||
std::generate(begin(blocks), end(blocks), [&] { return edist(delay_rng); });
|
std::generate(begin(blocks), end(blocks),
|
||||||
|
[&] { return std::make_pair(sdist(rng), edist(delay_rng)); });
|
||||||
return blocks;
|
return blocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t idx_{0};
|
size_t idx_{0};
|
||||||
size_t id_;
|
size_t id_;
|
||||||
std::vector<double> blocks_;
|
std::vector<std::pair<size_t, double>> blocks_;
|
||||||
};
|
};
|
||||||
|
|
||||||
void emitter(sync_queue<source>& sources,
|
template <typename BlockMergerT,
|
||||||
dwarfs::block_merger<size_t, block>& merger) {
|
typename BlockT = typename BlockMergerT::block_type>
|
||||||
|
void emitter(sync_queue<source<BlockT>>& sources, BlockMergerT& merger) {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
auto src = sources.withWLock([](auto&& q) {
|
auto src = sources.withWLock([](auto&& q) {
|
||||||
std::optional<source> src;
|
std::optional<source<BlockT>> src;
|
||||||
|
|
||||||
if (!q.empty()) {
|
if (!q.empty()) {
|
||||||
src = std::move(q.front());
|
src = std::move(q.front());
|
||||||
@ -123,37 +200,54 @@ void emitter(sync_queue<source>& sources,
|
|||||||
|
|
||||||
std::this_thread::sleep_until(t);
|
std::this_thread::sleep_until(t);
|
||||||
|
|
||||||
merger.add(blk.first, blk);
|
merger.add(blk.source_id, blk);
|
||||||
|
|
||||||
if (is_last) {
|
if (is_last) {
|
||||||
merger.finish(blk.first);
|
merger.finish(blk.source_id);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<block>
|
template <typename BlockMergerT, bool PartialRelease,
|
||||||
|
typename BlockT = typename BlockMergerT::block_type>
|
||||||
|
std::vector<BlockT>
|
||||||
do_run(std::mutex& out_mx, size_t run, std::mt19937& delay_rng) {
|
do_run(std::mutex& out_mx, size_t run, std::mt19937& delay_rng) {
|
||||||
std::mt19937 rng(run);
|
std::mt19937 rng(run);
|
||||||
std::exponential_distribution<> sources_dist(0.1);
|
std::exponential_distribution<> sources_dist(0.1);
|
||||||
std::exponential_distribution<> threads_dist(0.1);
|
std::exponential_distribution<> threads_dist(0.1);
|
||||||
std::exponential_distribution<> slots_dist(0.1);
|
std::exponential_distribution<> slots_dist(0.1);
|
||||||
std::exponential_distribution<> inflight_dist(0.1);
|
std::exponential_distribution<> inflight_dist(BlockT::kIsSized ? 0.00001
|
||||||
|
: 0.1);
|
||||||
std::uniform_real_distribution<> speed_dist(0.1, 10.0);
|
std::uniform_real_distribution<> speed_dist(0.1, 10.0);
|
||||||
std::uniform_int_distribution<> merged_queue_dist(0, 1);
|
std::uniform_int_distribution<> merged_queue_dist(0, 1);
|
||||||
|
std::uniform_int_distribution<> worst_case_size_dist(1, 10000);
|
||||||
|
std::uniform_int_distribution<> release_after_us_dist(1, 10000);
|
||||||
|
std::uniform_int_distribution<> partial_release_repeat_dist(0, 2);
|
||||||
auto const num_sources{std::max<size_t>(1, sources_dist(rng))};
|
auto const num_sources{std::max<size_t>(1, sources_dist(rng))};
|
||||||
auto const num_slots{std::max<size_t>(1, slots_dist(rng))};
|
auto const num_slots{std::max<size_t>(1, slots_dist(rng))};
|
||||||
auto const num_threads{std::max<size_t>(num_slots, threads_dist(delay_rng))};
|
auto const num_threads{std::max<size_t>(num_slots, threads_dist(delay_rng))};
|
||||||
auto const max_in_flight{std::max<size_t>(1, inflight_dist(delay_rng))};
|
auto const max_in_flight{
|
||||||
|
std::max<size_t>(BlockT::kIsSized ? 10000 : 1, inflight_dist(delay_rng))};
|
||||||
bool const use_merged_queue{merged_queue_dist(delay_rng) != 0};
|
bool const use_merged_queue{merged_queue_dist(delay_rng) != 0};
|
||||||
|
|
||||||
std::vector<size_t> source_ids;
|
std::vector<size_t> source_ids;
|
||||||
sync_queue<source> sources;
|
sync_queue<source<BlockT>> sources;
|
||||||
std::chrono::nanoseconds total_time{};
|
std::chrono::nanoseconds total_time{};
|
||||||
|
|
||||||
|
std::vector<size_t> worst_case_block_size;
|
||||||
|
|
||||||
for (size_t i = 0; i < num_sources; ++i) {
|
for (size_t i = 0; i < num_sources; ++i) {
|
||||||
auto src = source(i, delay_rng, rng, 30, 10000.0 * speed_dist(delay_rng));
|
size_t worst_case_size{0};
|
||||||
|
|
||||||
|
if constexpr (BlockT::kIsSized) {
|
||||||
|
worst_case_size = worst_case_size_dist(rng);
|
||||||
|
worst_case_block_size.emplace_back(worst_case_size);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto src = source<BlockT>(i, delay_rng, rng, 30,
|
||||||
|
10000.0 * speed_dist(delay_rng), worst_case_size);
|
||||||
total_time += src.total_time();
|
total_time += src.total_time();
|
||||||
source_ids.emplace_back(src.id());
|
source_ids.emplace_back(src.id());
|
||||||
sources.wlock()->emplace(std::move(src));
|
sources.wlock()->emplace(std::move(src));
|
||||||
@ -168,34 +262,100 @@ do_run(std::mutex& out_mx, size_t run, std::mt19937& delay_rng) {
|
|||||||
std::cout << config << "\n";
|
std::cout << config << "\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
sync_queue<merged_block_holder<block>> merged_queue;
|
synchronized<std::vector<timed_release_block<BlockT>>> merged_queue;
|
||||||
std::vector<block> merged;
|
std::vector<BlockT> merged;
|
||||||
|
|
||||||
dwarfs::multi_queue_block_merger<size_t, block> merger(
|
auto merge_cb = [&](merged_block_holder<BlockT> holder) {
|
||||||
num_slots, max_in_flight, source_ids,
|
|
||||||
[&](merged_block_holder<block> holder) {
|
|
||||||
if (use_merged_queue) {
|
|
||||||
merged_queue.wlock()->emplace(std::move(holder));
|
|
||||||
} else {
|
|
||||||
merged.emplace_back(std::move(holder.value()));
|
merged.emplace_back(std::move(holder.value()));
|
||||||
}
|
|
||||||
|
if (use_merged_queue) {
|
||||||
|
if constexpr (PartialRelease) {
|
||||||
|
auto when = std::chrono::steady_clock::now() +
|
||||||
|
std::chrono::microseconds(release_after_us_dist(delay_rng));
|
||||||
|
merged_queue.withWLock([&](auto&& q) {
|
||||||
|
q.emplace_back(when, std::move(holder));
|
||||||
|
std::push_heap(begin(q), end(q));
|
||||||
});
|
});
|
||||||
|
} else {
|
||||||
|
merged_queue.withWLock([&](auto&& q) {
|
||||||
|
q.emplace_back(std::chrono::steady_clock::time_point{},
|
||||||
|
std::move(holder));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
BlockMergerT merger;
|
||||||
|
|
||||||
|
if constexpr (BlockT::kIsSized) {
|
||||||
|
merger = BlockMergerT(
|
||||||
|
num_slots, max_in_flight, source_ids, std::move(merge_cb),
|
||||||
|
sized_block_merger_policy{std::move(worst_case_block_size)});
|
||||||
|
} else {
|
||||||
|
merger =
|
||||||
|
BlockMergerT(num_slots, max_in_flight, source_ids, std::move(merge_cb));
|
||||||
|
}
|
||||||
|
|
||||||
std::vector<std::thread> thr;
|
std::vector<std::thread> thr;
|
||||||
std::atomic<bool> running{use_merged_queue};
|
std::atomic<bool> running{use_merged_queue};
|
||||||
|
|
||||||
std::thread releaser([&] {
|
std::thread releaser([&] {
|
||||||
|
std::mt19937 partial_rng(run);
|
||||||
|
|
||||||
while (running || !merged_queue.rlock()->empty()) {
|
while (running || !merged_queue.rlock()->empty()) {
|
||||||
std::this_thread::sleep_for(std::chrono::microseconds(10));
|
auto now = std::chrono::steady_clock::now();
|
||||||
std::vector<merged_block_holder<block>> holders;
|
std::chrono::steady_clock::time_point next;
|
||||||
|
std::vector<merged_block_holder<BlockT>> holders;
|
||||||
|
|
||||||
merged_queue.withWLock([&](auto&& q) {
|
merged_queue.withWLock([&](auto&& q) {
|
||||||
while (!q.empty()) {
|
while (!q.empty()) {
|
||||||
holders.emplace_back(std::move(q.front()));
|
if constexpr (PartialRelease) {
|
||||||
q.pop();
|
std::pop_heap(begin(q), end(q));
|
||||||
|
}
|
||||||
|
auto& item = q.back();
|
||||||
|
if constexpr (PartialRelease) {
|
||||||
|
if (item.when > now) {
|
||||||
|
next = item.when;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
holders.emplace_back(std::move(item.holder));
|
||||||
|
q.pop_back();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
for (auto& holder : holders) {
|
|
||||||
merged.emplace_back(std::move(holder.value()));
|
if constexpr (PartialRelease) {
|
||||||
|
std::vector<merged_block_holder<BlockT>> partial;
|
||||||
|
|
||||||
|
for (auto& h : holders) {
|
||||||
|
if (partial_release_repeat_dist(partial_rng) > 0) {
|
||||||
|
auto& size = h.value().size;
|
||||||
|
if (size > 10) {
|
||||||
|
auto to_release = size / 2;
|
||||||
|
size -= to_release;
|
||||||
|
h.release_partial(to_release);
|
||||||
|
partial.emplace_back(std::move(h));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
merged_queue.withWLock([&](auto&& q) {
|
||||||
|
for (auto& h : partial) {
|
||||||
|
auto when = now + std::chrono::microseconds(
|
||||||
|
release_after_us_dist(partial_rng));
|
||||||
|
q.emplace_back(when, std::move(h));
|
||||||
|
std::push_heap(begin(q), end(q));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
holders.clear();
|
||||||
|
|
||||||
|
if constexpr (PartialRelease) {
|
||||||
|
std::this_thread::sleep_until(next);
|
||||||
|
} else {
|
||||||
|
std::this_thread::sleep_for(std::chrono::microseconds(10));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -231,21 +391,22 @@ do_run(std::mutex& out_mx, size_t run, std::mt19937& delay_rng) {
|
|||||||
return merged;
|
return merged;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename BlockT>
|
||||||
[[maybe_unused]] void
|
[[maybe_unused]] void
|
||||||
dump(std::mutex& out_mx, std::vector<block> const& blocks) {
|
dump(std::mutex& out_mx, std::vector<BlockT> const& blocks) {
|
||||||
if constexpr (debuglevel > 1) {
|
if constexpr (debuglevel > 1) {
|
||||||
std::lock_guard lock{out_mx};
|
std::lock_guard lock{out_mx};
|
||||||
for (size_t i = 0; i < blocks.size(); ++i) {
|
for (size_t i = 0; i < blocks.size(); ++i) {
|
||||||
if (i > 0) {
|
if (i > 0) {
|
||||||
std::cout << ", ";
|
std::cout << ", ";
|
||||||
}
|
}
|
||||||
auto const& b = blocks[i];
|
std::cout << blocks[i];
|
||||||
std::cout << b.first << "." << b.second;
|
|
||||||
}
|
}
|
||||||
std::cout << "\n";
|
std::cout << "\n";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename BlockMergerT, bool PartialRelease>
|
||||||
void runner_thread(size_t tid, std::mutex& out_mx, std::atomic<size_t>& runs,
|
void runner_thread(size_t tid, std::mutex& out_mx, std::atomic<size_t>& runs,
|
||||||
size_t const max_runs, std::atomic<size_t>& passes,
|
size_t const max_runs, std::atomic<size_t>& passes,
|
||||||
synchronized<std::vector<size_t>>& fails) {
|
synchronized<std::vector<size_t>>& fails) {
|
||||||
@ -260,14 +421,14 @@ void runner_thread(size_t tid, std::mutex& out_mx, std::atomic<size_t>& runs,
|
|||||||
std::lock_guard lock{out_mx};
|
std::lock_guard lock{out_mx};
|
||||||
std::cout << "[" << run << "/" << tid << "] ref\n";
|
std::cout << "[" << run << "/" << tid << "] ref\n";
|
||||||
}
|
}
|
||||||
auto ref = do_run(out_mx, run, delay_rng);
|
auto ref = do_run<BlockMergerT, PartialRelease>(out_mx, run, delay_rng);
|
||||||
dump(out_mx, ref);
|
dump(out_mx, ref);
|
||||||
for (size_t rep = 0; rep < num_repetitions; ++rep) {
|
for (size_t rep = 0; rep < num_repetitions; ++rep) {
|
||||||
if constexpr (debuglevel > 0) {
|
if constexpr (debuglevel > 0) {
|
||||||
std::lock_guard lock{out_mx};
|
std::lock_guard lock{out_mx};
|
||||||
std::cout << "[" << run << "/" << tid << "] test\n";
|
std::cout << "[" << run << "/" << tid << "] test\n";
|
||||||
}
|
}
|
||||||
auto test = do_run(out_mx, run, delay_rng);
|
auto test = do_run<BlockMergerT, PartialRelease>(out_mx, run, delay_rng);
|
||||||
dump(out_mx, test);
|
dump(out_mx, test);
|
||||||
if (test == ref) {
|
if (test == ref) {
|
||||||
++passes;
|
++passes;
|
||||||
@ -278,9 +439,9 @@ void runner_thread(size_t tid, std::mutex& out_mx, std::atomic<size_t>& runs,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace
|
template <typename BlockMergerT, bool PartialRelease = false>
|
||||||
|
std::tuple<size_t, std::vector<size_t>>
|
||||||
TEST(block_merger, random) {
|
block_merger_test(size_t const max_runs) {
|
||||||
std::mutex out_mx;
|
std::mutex out_mx;
|
||||||
std::atomic<size_t> runs{0};
|
std::atomic<size_t> runs{0};
|
||||||
std::atomic<size_t> passes{0};
|
std::atomic<size_t> passes{0};
|
||||||
@ -289,14 +450,48 @@ TEST(block_merger, random) {
|
|||||||
std::vector<std::thread> thr;
|
std::vector<std::thread> thr;
|
||||||
|
|
||||||
for (size_t i = 0; i < num_runner_threads; ++i) {
|
for (size_t i = 0; i < num_runner_threads; ++i) {
|
||||||
thr.emplace_back(
|
thr.emplace_back([&, i] {
|
||||||
[&, i] { runner_thread(i, out_mx, runs, max_runs, passes, fails); });
|
runner_thread<BlockMergerT, PartialRelease>(i, out_mx, runs, max_runs,
|
||||||
|
passes, fails);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto& t : thr) {
|
for (auto& t : thr) {
|
||||||
t.join();
|
t.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
EXPECT_EQ(max_runs * num_repetitions, passes);
|
return {passes.load(), *fails.rlock()};
|
||||||
EXPECT_TRUE(fails.rlock()->empty()) << folly::join(", ", *fails.rlock());
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
TEST(block_merger, random) {
|
||||||
|
using merger_type = dwarfs::multi_queue_block_merger<size_t, block>;
|
||||||
|
|
||||||
|
auto [passes, fails] = block_merger_test<merger_type>(max_runs_regular);
|
||||||
|
|
||||||
|
EXPECT_EQ(max_runs_regular * num_repetitions, passes);
|
||||||
|
EXPECT_TRUE(fails.empty()) << folly::join(", ", fails);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(block_merger, random_sized) {
|
||||||
|
using merger_type =
|
||||||
|
dwarfs::multi_queue_block_merger<size_t, sized_block,
|
||||||
|
sized_block_merger_policy>;
|
||||||
|
|
||||||
|
auto [passes, fails] = block_merger_test<merger_type>(max_runs_regular);
|
||||||
|
|
||||||
|
EXPECT_EQ(max_runs_regular * num_repetitions, passes);
|
||||||
|
EXPECT_TRUE(fails.empty()) << folly::join(", ", fails);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(block_merger, random_sized_partial) {
|
||||||
|
using merger_type =
|
||||||
|
dwarfs::multi_queue_block_merger<size_t, sized_block,
|
||||||
|
sized_block_merger_policy>;
|
||||||
|
|
||||||
|
auto [passes, fails] = block_merger_test<merger_type, true>(max_runs_partial);
|
||||||
|
|
||||||
|
EXPECT_EQ(max_runs_partial * num_repetitions, passes);
|
||||||
|
EXPECT_TRUE(fails.empty()) << folly::join(", ", fails);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user