From 6ca388fa6ba8ff7e40be46c6d813a1074e9c91f9 Mon Sep 17 00:00:00 2001 From: Marcus Holland-Moritz Date: Mon, 27 Nov 2023 07:50:05 +0100 Subject: [PATCH] feat(block_merger): support block policies and partial release --- include/dwarfs/block_merger.h | 29 +- .../detail/multi_queue_block_merger_impl.h | 184 ++++++----- include/dwarfs/multi_queue_block_merger.h | 81 ++++- test/block_merger_test.cpp | 301 +++++++++++++++--- 4 files changed, 450 insertions(+), 145 deletions(-) diff --git a/include/dwarfs/block_merger.h b/include/dwarfs/block_merger.h index f3b20221..b7c813a7 100644 --- a/include/dwarfs/block_merger.h +++ b/include/dwarfs/block_merger.h @@ -22,6 +22,9 @@ #pragma once #include +#include + +#include namespace dwarfs { @@ -29,7 +32,7 @@ class block_merger_base { public: virtual ~block_merger_base() = default; - virtual void release() = 0; + virtual void release(size_t amount) = 0; }; template @@ -42,14 +45,31 @@ class merged_block_holder { explicit merged_block_holder(block_type&& blk) : block_{std::move(blk)} {} - merged_block_holder(block_type&& blk, + merged_block_holder(block_type&& blk, size_t size, std::shared_ptr merger) : block_{std::move(blk)} + , size_{size} , merger_{std::move(merger)} {} - ~merged_block_holder() { + ~merged_block_holder() { release(); } + + void release() { if (merger_) { - merger_->release(); + merger_->release(size_); + } + } + + void release_partial(size_t amount) { + if (amount > size_) { + throw std::runtime_error(fmt::format( + "merged_block_holder::release_partial: amount {} > size {}", amount, + size_)); + } + + size_ -= amount; + + if (merger_) { + merger_->release(amount); } } @@ -76,6 +96,7 @@ class merged_block_holder { private: block_type block_; + size_t size_{0}; std::shared_ptr merger_; }; diff --git a/include/dwarfs/detail/multi_queue_block_merger_impl.h b/include/dwarfs/detail/multi_queue_block_merger_impl.h index f8f5b942..5123e806 100644 --- a/include/dwarfs/detail/multi_queue_block_merger_impl.h +++ b/include/dwarfs/detail/multi_queue_block_merger_impl.h @@ -24,50 +24,46 @@ #include #include #include -#include +#include #include #include -#include #include #include #include #include +#include +#include +#include + #include "dwarfs/block_merger.h" +#include "dwarfs/terminal.h" namespace dwarfs::detail { -/** - * TODO: Support different policies for how much data can be queued. - * The current behavior is to limit the total number of blocks that - * can be queued. This is not ideal for sources that produce blocks - * of different sizes, or when blocks that are still held without - * being released change their size (because they have been compressed). - * - * You can then release a *size* instead of a *block*, and each source - * is assigned a worst-case size that is used to determine if a new - * block can be queued or not (via the source_distance() function). - */ - -template +template class multi_queue_block_merger_impl : public block_merger_base, - public block_merger { + public block_merger, + private BlockPolicy { public: static constexpr bool const debug{false}; using source_type = SourceT; using block_type = BlockT; - using on_block_merged_callback_type = std::function; + using on_block_merged_callback_type = + folly::Function; multi_queue_block_merger_impl( - size_t num_active_slots, size_t max_queued_blocks, + size_t num_active_slots, size_t max_queued_size, std::vector const& sources, - on_block_merged_callback_type on_block_merged_callback) - : num_queueable_{max_queued_blocks} + on_block_merged_callback_type&& on_block_merged_callback, + BlockPolicy&& policy) + : BlockPolicy{std::move(policy)} + , queueable_size_{max_queued_size} , source_queue_{sources.begin(), sources.end()} , active_slots_(num_active_slots) - , on_block_merged_callback_{on_block_merged_callback} { + , on_block_merged_callback_{std::move(on_block_merged_callback)} { for (size_t i = 0; i < active_slots_.size() && !source_queue_.empty(); ++i) { active_slots_[i] = source_queue_.front(); @@ -75,22 +71,40 @@ class multi_queue_block_merger_impl : public block_merger_base, } } + multi_queue_block_merger_impl(multi_queue_block_merger_impl&&) = default; + multi_queue_block_merger_impl& + operator=(multi_queue_block_merger_impl&&) = default; + multi_queue_block_merger_impl(const multi_queue_block_merger_impl&) = delete; + multi_queue_block_merger_impl& + operator=(const multi_queue_block_merger_impl&) = delete; + void add(source_type src, block_type blk) override { + auto const block_size = this->block_size(blk); + std::unique_lock lock{mx_}; - cv_.wait(lock, - [this, &src] { return source_distance(src) < num_queueable_; }); + cv_.wait(lock, [this, &src, &block_size] { + // if this is the active slot, we can accept the block if there is + // enough space left in the queue + if (active_slots_[active_slot_index_] == src) { + return block_size <= queueable_size_; + } - --num_queueable_; + // otherwise, we must ensure that it is always possible to accept + // a worst case sized block + return block_size + max_worst_case_source_block_size() <= queueable_size_; + }); + + queueable_size_ -= block_size; if (!is_valid_source(src)) { throw std::runtime_error{"invalid source"}; } - block_queues_[src].emplace(std::move(blk)); + block_queues_[src].emplace_back(std::move(blk)); if constexpr (debug) { - dump_state(fmt::format("add({})", src)); + dump_state(fmt::format("add({}, {})", src, block_size), termcolor::RED); } while (try_merge_block()) { @@ -102,10 +116,10 @@ class multi_queue_block_merger_impl : public block_merger_base, void finish(source_type src) override { std::unique_lock lock{mx_}; - block_queues_[src].emplace(std::nullopt); + block_queues_[src].emplace_back(std::nullopt); if constexpr (debug) { - dump_state(fmt::format("finish({})", src)); + dump_state(fmt::format("finish({})", src), termcolor::CYAN); } while (try_merge_block()) { @@ -114,50 +128,72 @@ class multi_queue_block_merger_impl : public block_merger_base, cv_.notify_all(); } - void release() override { + void release(size_t amount) override { std::unique_lock lock{mx_}; - assert(num_releaseable_ > 0); + assert(releaseable_size_ >= amount); - --num_releaseable_; - ++num_queueable_; + releaseable_size_ -= amount; + queueable_size_ += amount; if constexpr (debug) { - dump_state("release"); + dump_state(fmt::format("release({})", amount), termcolor::YELLOW); } cv_.notify_all(); } private: - void dump_state(std::string what) const { - std::cout << "**** " << what << " ****" << std::endl; + void dump_state(std::string what, termcolor color) const { + std::cout << terminal_colored(fmt::format("**** {} ****", what), color) + << std::endl; std::cout << "index: " << active_slot_index_ - << ", queueable: " << num_queueable_ - << ", releaseable: " << num_releaseable_ << std::endl; + << ", queueable: " << queueable_size_ + << ", releaseable: " << releaseable_size_ << std::endl; std::cout << "active: "; - for (auto const& src : active_slots_) { + for (size_t i = 0; i < active_slots_.size(); ++i) { + auto const& src = active_slots_[i]; if (src) { - std::cout << src.value() << " "; + std::cout << terminal_colored( + fmt::format("{} ", src.value()), + i == active_slot_index_ ? termcolor::BOLD_GREEN : termcolor::GRAY); } else { - std::cout << "- "; + std::cout << terminal_colored("- ", termcolor::GRAY); } } std::cout << std::endl; - std::cout << "queue: "; + std::cout << "queued: "; for (auto const& src : source_queue_) { std::cout << src << " "; } std::cout << std::endl; - std::cout << "blocks: "; for (auto const& [src, q] : block_queues_) { - std::cout << src << "(" << q.size() << ") "; + if (q.empty()) { + continue; + } + + auto const queued_sizes = folly::join( + ", ", folly::gen::from(q) | folly::gen::map([this](auto const& blk) { + return blk.has_value() + ? std::to_string(this->block_size(*blk)) + : "&"; + }) | folly::gen::as>()); + + auto const text = + fmt::format("blocks({}): {} -> {}", src, q.size(), queued_sizes); + + if (src == active_slots_[active_slot_index_]) { + std::cout << terminal_colored(text, termcolor::BOLD_GREEN); + } else { + std::cout << text; + } + + std::cout << std::endl; } - std::cout << std::endl; } bool is_valid_source(source_type src) const { @@ -167,29 +203,25 @@ class multi_queue_block_merger_impl : public block_merger_base, end(source_queue_); } - size_t source_distance(source_type src) const { - auto ix = active_slot_index_; - size_t distance{0}; + size_t max_worst_case_source_block_size() const { + if (!cached_max_worst_case_source_block_size_) { + size_t max_size{0}; - while (active_slots_[ix] && active_slots_[ix].value() != src) { - ++distance; - - do { - ix = (ix + 1) % active_slots_.size(); - } while (ix != active_slot_index_ && !active_slots_[ix]); - - if (ix == active_slot_index_) { - auto it = std::find(begin(source_queue_), end(source_queue_), src); - distance += std::distance(begin(source_queue_), it); - break; + for (auto const& src : active_slots_) { + if (src) { + max_size = + std::max(max_size, this->worst_case_source_block_size(*src)); + } } + + for (auto const& src : source_queue_) { + max_size = std::max(max_size, this->worst_case_source_block_size(src)); + } + + cached_max_worst_case_source_block_size_ = max_size; } - if constexpr (debug) { - std::cout << "distance(" << src << "): " << distance << std::endl; - } - - return distance; + return *cached_max_worst_case_source_block_size_; } bool try_merge_block() { @@ -205,16 +237,19 @@ class multi_queue_block_merger_impl : public block_merger_base, } auto blk = std::move(it->second.front()); - it->second.pop(); + it->second.pop_front(); - const bool not_last = blk.has_value(); + auto const not_last = blk.has_value(); + std::optional block_size; if (not_last) { - ++num_releaseable_; - on_block_merged_callback_(std::move(*blk)); + block_size = this->block_size(*blk); + releaseable_size_ += *block_size; + on_block_merged_callback_(std::move(*blk), *block_size); } else { block_queues_.erase(it); update_active(ix); + cached_max_worst_case_source_block_size_.reset(); } do { @@ -222,8 +257,12 @@ class multi_queue_block_merger_impl : public block_merger_base, } while (active_slot_index_ != ix && !active_slots_[active_slot_index_]); if constexpr (debug) { - dump_state(not_last ? fmt::format("merge({})", src) - : fmt::format("final({})", src)); + if (not_last) { + dump_state(fmt::format("merge({}, {})", src, *block_size), + termcolor::GREEN); + } else { + dump_state(fmt::format("final({})", src), termcolor::BOLD_GREEN); + } } return active_slot_index_ != ix || active_slots_[active_slot_index_]; @@ -241,9 +280,10 @@ class multi_queue_block_merger_impl : public block_merger_base, std::recursive_mutex mx_; std::condition_variable_any cv_; size_t active_slot_index_{0}; - size_t num_queueable_; - size_t num_releaseable_{0}; - std::unordered_map>> + size_t queueable_size_; + size_t releaseable_size_{0}; + std::optional mutable cached_max_worst_case_source_block_size_; + std::unordered_map>> block_queues_; std::deque source_queue_; std::vector> active_slots_; diff --git a/include/dwarfs/multi_queue_block_merger.h b/include/dwarfs/multi_queue_block_merger.h index 24f1d81d..a3476d89 100644 --- a/include/dwarfs/multi_queue_block_merger.h +++ b/include/dwarfs/multi_queue_block_merger.h @@ -21,15 +21,30 @@ #pragma once -#include +#include #include #include +#include + #include "dwarfs/block_merger.h" #include "dwarfs/detail/multi_queue_block_merger_impl.h" namespace dwarfs { +class block_merger_whole_block_policy { + public: + template + static size_t block_size(BlockT const&) { + return 1; + } + + template + static size_t worst_case_source_block_size(SourceT const&) { + return 1; + } +}; + /** * Deterministically merge blocks from multiple sources into a single stream. * @@ -44,7 +59,9 @@ namespace dwarfs { * that are used to produce blocks. While it is possible to use more threads * than active slots, this will not improve performance and will only increase * the memory footprint. However, it is not possible to use less threads than - * active slots, as this will cause the merger to eventually block. + * active slots, as this will cause the merger to ultimately block all threads + * and deadlock, since it is assuming that another thread will eventually add + * more blocks. * * The order of the blocks in the output stream is only determined by the order * of the sources and the number of active slots. The number of queued blocks @@ -63,38 +80,70 @@ namespace dwarfs { * the holder is alive, the held block will count towards the number of * queued blocks. Once the holder is destroyed, the held block will be * released and the number of queued blocks will be decremented. + * + * It is also possible to provide a policy that returns the size of a block + * as well as the worst case size for a block from a certain source. This + * can be useful to keep an upper bound on the memory usage of the merger. + * It is even possible to only partially release a block, e.g. after the + * block has been compressed. */ -template +template class multi_queue_block_merger : public block_merger { public: using source_type = SourceT; using block_type = BlockT; using block_holder_type = merged_block_holder; - using on_block_merged_callback_type = std::function; + using on_block_merged_callback_type = + folly::Function; + + multi_queue_block_merger() = default; multi_queue_block_merger( size_t num_active_slots, size_t max_queued_blocks, std::vector const& sources, - on_block_merged_callback_type on_block_merged_callback) - : impl_{std::make_shared< - detail::multi_queue_block_merger_impl>( + on_block_merged_callback_type&& on_block_merged_callback, + BlockPolicy&& policy = block_merger_whole_block_policy{}) + : state_{std::make_unique( num_active_slots, max_queued_blocks, sources, - [this](block_type&& blk) { on_block_merged(std::move(blk)); })} - , on_block_merged_callback_{on_block_merged_callback} {} + std::move(on_block_merged_callback), std::move(policy))} {} void add(source_type src, block_type blk) override { - impl_->add(std::move(src), std::move(blk)); + assert(state_); + state_->impl->add(std::move(src), std::move(blk)); } - void finish(source_type src) override { impl_->finish(std::move(src)); } + void finish(source_type src) override { + assert(state_); + state_->impl->finish(std::move(src)); + } private: - void on_block_merged(block_type&& blk) { - on_block_merged_callback_(block_holder_type{std::move(blk), impl_}); - } + using impl_type = + detail::multi_queue_block_merger_impl; - std::shared_ptr> impl_; - on_block_merged_callback_type on_block_merged_callback_; + struct state { + state(size_t num_active_slots, size_t max_queued_blocks, + std::vector const& sources, + on_block_merged_callback_type&& on_block_merged_callback, + BlockPolicy&& policy = block_merger_whole_block_policy{}) + : callback{std::move(on_block_merged_callback)} + , impl{std::make_shared( + num_active_slots, max_queued_blocks, sources, + [this](block_type&& blk, size_t size) { + on_block_merged(std::move(blk), size); + }, + std::move(policy))} {} + + void on_block_merged(block_type&& blk, size_t size) { + callback(block_holder_type{std::move(blk), size, impl}); + } + + on_block_merged_callback_type callback; + std::shared_ptr impl; + }; + + std::unique_ptr state_; }; } // namespace dwarfs diff --git a/test/block_merger_test.cpp b/test/block_merger_test.cpp index 3b5618d2..2bed1dc5 100644 --- a/test/block_merger_test.cpp +++ b/test/block_merger_test.cpp @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -43,11 +44,79 @@ namespace { constexpr int const debuglevel{0}; -constexpr size_t const max_runs{250}; +constexpr size_t const max_runs_regular{250}; +constexpr size_t const max_runs_partial{50}; constexpr size_t const num_runner_threads{16}; constexpr size_t const num_repetitions{4}; -using block = std::pair; +struct block { + static constexpr bool const kIsSized{false}; + + block() = default; + block(size_t src_id, size_t idx, size_t /*sz*/) + : source_id{src_id} + , index{idx} {} + + bool operator==(block const&) const = default; + auto operator<=>(block const&) const = default; + + std::ostream& operator<<(std::ostream& os) const { + return os << source_id << "." << index; + } + + size_t source_id; + size_t index; +}; + +struct sized_block { + static constexpr bool const kIsSized{true}; + + sized_block() = default; + sized_block(size_t src_id, size_t idx, size_t sz) + : source_id{src_id} + , index{idx} + , size{sz} {} + + bool operator==(sized_block const&) const = default; + auto operator<=>(sized_block const&) const = default; + + std::ostream& operator<<(std::ostream& os) const { + return os << source_id << "." << index << " (" << size << ")"; + } + + size_t source_id; + size_t index; + size_t size; +}; + +class sized_block_merger_policy { + public: + sized_block_merger_policy(std::vector&& worst_case_block_size) + : worst_case_block_size_{std::move(worst_case_block_size)} {} + + static size_t block_size(sized_block const& blk) { return blk.size; } + size_t worst_case_source_block_size(size_t source_id) const { + return worst_case_block_size_[source_id]; + } + + private: + std::vector worst_case_block_size_; +}; + +template +struct timed_release_block { + std::chrono::steady_clock::time_point when; + merged_block_holder holder; + + timed_release_block(std::chrono::steady_clock::time_point when, + merged_block_holder&& holder) + : when{when} + , holder{std::move(holder)} {} + + bool operator<(timed_release_block const& other) const { + return when > other.when; + } +}; // Use std::shared_mutex because folly::SharedMutex might trigger TSAN template @@ -56,16 +125,18 @@ using synchronized = folly::Synchronized; template using sync_queue = synchronized>; +template class source { public: source(size_t id, std::mt19937& delay_rng, std::mt19937& rng, - size_t max_blocks = 20, double ips = 5000.0) + size_t max_blocks = 20, double ips = 5000.0, size_t max_size = 10000) : id_{id} - , blocks_{init_blocks(delay_rng, rng, max_blocks, ips)} {} + , blocks_{init_blocks(delay_rng, rng, max_blocks, ips, max_size)} {} - std::tuple next() { + std::tuple next() { auto idx = idx_++; - return {std::make_pair(id_, idx), idx_ >= blocks_.size(), blocks_[idx]}; + return {BlockT(id_, idx, blocks_[idx].first), idx_ >= blocks_.size(), + blocks_[idx].second}; } size_t id() const { return id_; } @@ -73,33 +144,39 @@ class source { size_t num_blocks() const { return blocks_.size(); } std::chrono::nanoseconds total_time() const { - auto seconds = std::accumulate(begin(blocks_), end(blocks_), 0.0); + auto seconds = std::accumulate( + begin(blocks_), end(blocks_), 0.0, + [](auto const& a, auto const& b) { return a + b.second; }); return std::chrono::duration_cast( std::chrono::duration(seconds)); } private: - static std::vector + static std::vector> init_blocks(std::mt19937& delay_rng, std::mt19937& rng, size_t max_blocks, - double ips) { - std::uniform_int_distribution<> idist(1, max_blocks); + double ips, size_t max_size) { + std::uniform_int_distribution bdist(1, max_blocks); + std::uniform_int_distribution sdist(BlockT::kIsSized ? 1 : 0, + max_size); std::exponential_distribution<> edist(ips); - std::vector blocks; - blocks.resize(idist(rng)); - std::generate(begin(blocks), end(blocks), [&] { return edist(delay_rng); }); + std::vector> blocks; + blocks.resize(bdist(rng)); + std::generate(begin(blocks), end(blocks), + [&] { return std::make_pair(sdist(rng), edist(delay_rng)); }); return blocks; } size_t idx_{0}; size_t id_; - std::vector blocks_; + std::vector> blocks_; }; -void emitter(sync_queue& sources, - dwarfs::block_merger& merger) { +template +void emitter(sync_queue>& sources, BlockMergerT& merger) { for (;;) { auto src = sources.withWLock([](auto&& q) { - std::optional src; + std::optional> src; if (!q.empty()) { src = std::move(q.front()); @@ -123,37 +200,54 @@ void emitter(sync_queue& sources, std::this_thread::sleep_until(t); - merger.add(blk.first, blk); + merger.add(blk.source_id, blk); if (is_last) { - merger.finish(blk.first); + merger.finish(blk.source_id); break; } } } } -std::vector +template +std::vector do_run(std::mutex& out_mx, size_t run, std::mt19937& delay_rng) { std::mt19937 rng(run); std::exponential_distribution<> sources_dist(0.1); std::exponential_distribution<> threads_dist(0.1); std::exponential_distribution<> slots_dist(0.1); - std::exponential_distribution<> inflight_dist(0.1); + std::exponential_distribution<> inflight_dist(BlockT::kIsSized ? 0.00001 + : 0.1); std::uniform_real_distribution<> speed_dist(0.1, 10.0); std::uniform_int_distribution<> merged_queue_dist(0, 1); + std::uniform_int_distribution<> worst_case_size_dist(1, 10000); + std::uniform_int_distribution<> release_after_us_dist(1, 10000); + std::uniform_int_distribution<> partial_release_repeat_dist(0, 2); auto const num_sources{std::max(1, sources_dist(rng))}; auto const num_slots{std::max(1, slots_dist(rng))}; auto const num_threads{std::max(num_slots, threads_dist(delay_rng))}; - auto const max_in_flight{std::max(1, inflight_dist(delay_rng))}; + auto const max_in_flight{ + std::max(BlockT::kIsSized ? 10000 : 1, inflight_dist(delay_rng))}; bool const use_merged_queue{merged_queue_dist(delay_rng) != 0}; std::vector source_ids; - sync_queue sources; + sync_queue> sources; std::chrono::nanoseconds total_time{}; + std::vector worst_case_block_size; + for (size_t i = 0; i < num_sources; ++i) { - auto src = source(i, delay_rng, rng, 30, 10000.0 * speed_dist(delay_rng)); + size_t worst_case_size{0}; + + if constexpr (BlockT::kIsSized) { + worst_case_size = worst_case_size_dist(rng); + worst_case_block_size.emplace_back(worst_case_size); + } + + auto src = source(i, delay_rng, rng, 30, + 10000.0 * speed_dist(delay_rng), worst_case_size); total_time += src.total_time(); source_ids.emplace_back(src.id()); sources.wlock()->emplace(std::move(src)); @@ -168,34 +262,100 @@ do_run(std::mutex& out_mx, size_t run, std::mt19937& delay_rng) { std::cout << config << "\n"; } - sync_queue> merged_queue; - std::vector merged; + synchronized>> merged_queue; + std::vector merged; - dwarfs::multi_queue_block_merger merger( - num_slots, max_in_flight, source_ids, - [&](merged_block_holder holder) { - if (use_merged_queue) { - merged_queue.wlock()->emplace(std::move(holder)); - } else { - merged.emplace_back(std::move(holder.value())); - } - }); + auto merge_cb = [&](merged_block_holder holder) { + merged.emplace_back(std::move(holder.value())); + + if (use_merged_queue) { + if constexpr (PartialRelease) { + auto when = std::chrono::steady_clock::now() + + std::chrono::microseconds(release_after_us_dist(delay_rng)); + merged_queue.withWLock([&](auto&& q) { + q.emplace_back(when, std::move(holder)); + std::push_heap(begin(q), end(q)); + }); + } else { + merged_queue.withWLock([&](auto&& q) { + q.emplace_back(std::chrono::steady_clock::time_point{}, + std::move(holder)); + }); + } + } + }; + + BlockMergerT merger; + + if constexpr (BlockT::kIsSized) { + merger = BlockMergerT( + num_slots, max_in_flight, source_ids, std::move(merge_cb), + sized_block_merger_policy{std::move(worst_case_block_size)}); + } else { + merger = + BlockMergerT(num_slots, max_in_flight, source_ids, std::move(merge_cb)); + } std::vector thr; std::atomic running{use_merged_queue}; std::thread releaser([&] { + std::mt19937 partial_rng(run); + while (running || !merged_queue.rlock()->empty()) { - std::this_thread::sleep_for(std::chrono::microseconds(10)); - std::vector> holders; + auto now = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point next; + std::vector> holders; + merged_queue.withWLock([&](auto&& q) { while (!q.empty()) { - holders.emplace_back(std::move(q.front())); - q.pop(); + if constexpr (PartialRelease) { + std::pop_heap(begin(q), end(q)); + } + auto& item = q.back(); + if constexpr (PartialRelease) { + if (item.when > now) { + next = item.when; + break; + } + } + holders.emplace_back(std::move(item.holder)); + q.pop_back(); } }); - for (auto& holder : holders) { - merged.emplace_back(std::move(holder.value())); + + if constexpr (PartialRelease) { + std::vector> partial; + + for (auto& h : holders) { + if (partial_release_repeat_dist(partial_rng) > 0) { + auto& size = h.value().size; + if (size > 10) { + auto to_release = size / 2; + size -= to_release; + h.release_partial(to_release); + partial.emplace_back(std::move(h)); + continue; + } + } + } + + merged_queue.withWLock([&](auto&& q) { + for (auto& h : partial) { + auto when = now + std::chrono::microseconds( + release_after_us_dist(partial_rng)); + q.emplace_back(when, std::move(h)); + std::push_heap(begin(q), end(q)); + } + }); + } + + holders.clear(); + + if constexpr (PartialRelease) { + std::this_thread::sleep_until(next); + } else { + std::this_thread::sleep_for(std::chrono::microseconds(10)); } } }); @@ -231,21 +391,22 @@ do_run(std::mutex& out_mx, size_t run, std::mt19937& delay_rng) { return merged; } +template [[maybe_unused]] void -dump(std::mutex& out_mx, std::vector const& blocks) { +dump(std::mutex& out_mx, std::vector const& blocks) { if constexpr (debuglevel > 1) { std::lock_guard lock{out_mx}; for (size_t i = 0; i < blocks.size(); ++i) { if (i > 0) { std::cout << ", "; } - auto const& b = blocks[i]; - std::cout << b.first << "." << b.second; + std::cout << blocks[i]; } std::cout << "\n"; } } +template void runner_thread(size_t tid, std::mutex& out_mx, std::atomic& runs, size_t const max_runs, std::atomic& passes, synchronized>& fails) { @@ -260,14 +421,14 @@ void runner_thread(size_t tid, std::mutex& out_mx, std::atomic& runs, std::lock_guard lock{out_mx}; std::cout << "[" << run << "/" << tid << "] ref\n"; } - auto ref = do_run(out_mx, run, delay_rng); + auto ref = do_run(out_mx, run, delay_rng); dump(out_mx, ref); for (size_t rep = 0; rep < num_repetitions; ++rep) { if constexpr (debuglevel > 0) { std::lock_guard lock{out_mx}; std::cout << "[" << run << "/" << tid << "] test\n"; } - auto test = do_run(out_mx, run, delay_rng); + auto test = do_run(out_mx, run, delay_rng); dump(out_mx, test); if (test == ref) { ++passes; @@ -278,9 +439,9 @@ void runner_thread(size_t tid, std::mutex& out_mx, std::atomic& runs, } } -} // namespace - -TEST(block_merger, random) { +template +std::tuple> +block_merger_test(size_t const max_runs) { std::mutex out_mx; std::atomic runs{0}; std::atomic passes{0}; @@ -289,14 +450,48 @@ TEST(block_merger, random) { std::vector thr; for (size_t i = 0; i < num_runner_threads; ++i) { - thr.emplace_back( - [&, i] { runner_thread(i, out_mx, runs, max_runs, passes, fails); }); + thr.emplace_back([&, i] { + runner_thread(i, out_mx, runs, max_runs, + passes, fails); + }); } for (auto& t : thr) { t.join(); } - EXPECT_EQ(max_runs * num_repetitions, passes); - EXPECT_TRUE(fails.rlock()->empty()) << folly::join(", ", *fails.rlock()); + return {passes.load(), *fails.rlock()}; +} + +} // namespace + +TEST(block_merger, random) { + using merger_type = dwarfs::multi_queue_block_merger; + + auto [passes, fails] = block_merger_test(max_runs_regular); + + EXPECT_EQ(max_runs_regular * num_repetitions, passes); + EXPECT_TRUE(fails.empty()) << folly::join(", ", fails); +} + +TEST(block_merger, random_sized) { + using merger_type = + dwarfs::multi_queue_block_merger; + + auto [passes, fails] = block_merger_test(max_runs_regular); + + EXPECT_EQ(max_runs_regular * num_repetitions, passes); + EXPECT_TRUE(fails.empty()) << folly::join(", ", fails); +} + +TEST(block_merger, random_sized_partial) { + using merger_type = + dwarfs::multi_queue_block_merger; + + auto [passes, fails] = block_merger_test(max_runs_partial); + + EXPECT_EQ(max_runs_partial * num_repetitions, passes); + EXPECT_TRUE(fails.empty()) << folly::join(", ", fails); }