From 65a954b681a017201546675efcf5d55dd0a90fc9 Mon Sep 17 00:00:00 2001 From: Marcus Holland-Moritz Date: Sat, 18 Nov 2023 22:13:30 +0100 Subject: [PATCH] refactor: naming in multi_queue_block_merger --- include/dwarfs/multi_queue_block_merger.h | 72 ++++++++++++----------- 1 file changed, 37 insertions(+), 35 deletions(-) diff --git a/include/dwarfs/multi_queue_block_merger.h b/include/dwarfs/multi_queue_block_merger.h index 0f8e7ab6..5223f1c9 100644 --- a/include/dwarfs/multi_queue_block_merger.h +++ b/include/dwarfs/multi_queue_block_merger.h @@ -41,16 +41,18 @@ class multi_queue_block_merger : public block_merger { using source_type = SourceT; using block_type = BlockT; - multi_queue_block_merger(size_t num_active_slots, size_t max_queued_blocks, - std::vector const& sources, - std::function on_block_merged) + multi_queue_block_merger( + size_t num_active_slots, size_t max_queued_blocks, + std::vector const& sources, + std::function on_block_merged_callback) : num_queueable_{max_queued_blocks} - , sources_{sources.begin(), sources.end()} - , active_(num_active_slots) - , on_block_merged_{on_block_merged} { - for (size_t i = 0; i < active_.size() && !sources_.empty(); ++i) { - active_[i] = sources_.front(); - sources_.pop_front(); + , source_queue_{sources.begin(), sources.end()} + , active_slots_(num_active_slots) + , on_block_merged_callback_{on_block_merged_callback} { + for (size_t i = 0; i < active_slots_.size() && !source_queue_.empty(); + ++i) { + active_slots_[i] = source_queue_.front(); + source_queue_.pop_front(); } } @@ -62,7 +64,7 @@ class multi_queue_block_merger : public block_merger { --num_queueable_; - queues_[src].emplace(std::move(blk), is_last); + block_queues_[src].emplace(std::move(blk), is_last); while (try_merge_block()) { } @@ -72,16 +74,16 @@ class multi_queue_block_merger : public block_merger { private: size_t source_distance(source_type src) const { - auto ix = active_index_; + auto ix = active_slot_index_; size_t distance{0}; - while (active_[ix] && active_[ix].value() != src) { + while (active_slots_[ix] && active_slots_[ix].value() != src) { ++distance; - ix = (ix + 1) % active_.size(); + ix = (ix + 1) % active_slots_.size(); - if (ix == active_index_) { - auto it = std::find(begin(sources_), end(sources_), src); - distance += std::distance(begin(sources_), it); + if (ix == active_slot_index_) { + auto it = std::find(begin(source_queue_), end(source_queue_), src); + distance += std::distance(begin(source_queue_), it); break; } } @@ -90,54 +92,54 @@ class multi_queue_block_merger : public block_merger { } bool try_merge_block() { - auto const ix = active_index_; + auto const ix = active_slot_index_; - assert(active_[ix]); + assert(active_slots_[ix]); - auto src = active_[ix].value(); - auto it = queues_.find(src); + auto src = active_slots_[ix].value(); + auto it = block_queues_.find(src); - if (it == queues_.end() || it->second.empty()) { + if (it == block_queues_.end() || it->second.empty()) { return false; } auto [blk, is_last] = std::move(it->second.front()); it->second.pop(); - on_block_merged_(std::move(blk)); + on_block_merged_callback_(std::move(blk)); ++num_queueable_; if (is_last) { - queues_.erase(it); + block_queues_.erase(it); update_active(ix); } do { - active_index_ = (active_index_ + 1) % active_.size(); - } while (active_index_ != ix && !active_[active_index_]); + active_slot_index_ = (active_slot_index_ + 1) % active_slots_.size(); + } while (active_slot_index_ != ix && !active_slots_[active_slot_index_]); - return active_index_ != ix || active_[active_index_]; + return active_slot_index_ != ix || active_slots_[active_slot_index_]; } void update_active(size_t ix) { - if (!sources_.empty()) { - active_[ix] = sources_.front(); - sources_.pop_front(); + if (!source_queue_.empty()) { + active_slots_[ix] = source_queue_.front(); + source_queue_.pop_front(); } else { - active_[ix].reset(); + active_slots_[ix].reset(); } } std::mutex mx_; std::condition_variable cv_; - size_t active_index_{0}; + size_t active_slot_index_{0}; size_t num_queueable_; std::unordered_map>> - queues_; - std::deque sources_; - std::vector> active_; - std::function on_block_merged_; + block_queues_; + std::deque source_queue_; + std::vector> active_slots_; + std::function on_block_merged_callback_; }; } // namespace dwarfs