refactor: naming in multi_queue_block_merger

This commit is contained in:
Marcus Holland-Moritz 2023-11-18 22:13:30 +01:00
parent 20e4a83b92
commit 65a954b681

View File

@ -41,16 +41,18 @@ class multi_queue_block_merger : public block_merger<SourceT, BlockT> {
using source_type = SourceT; using source_type = SourceT;
using block_type = BlockT; using block_type = BlockT;
multi_queue_block_merger(size_t num_active_slots, size_t max_queued_blocks, multi_queue_block_merger(
size_t num_active_slots, size_t max_queued_blocks,
std::vector<source_type> const& sources, std::vector<source_type> const& sources,
std::function<void(block_type)> on_block_merged) std::function<void(block_type)> on_block_merged_callback)
: num_queueable_{max_queued_blocks} : num_queueable_{max_queued_blocks}
, sources_{sources.begin(), sources.end()} , source_queue_{sources.begin(), sources.end()}
, active_(num_active_slots) , active_slots_(num_active_slots)
, on_block_merged_{on_block_merged} { , on_block_merged_callback_{on_block_merged_callback} {
for (size_t i = 0; i < active_.size() && !sources_.empty(); ++i) { for (size_t i = 0; i < active_slots_.size() && !source_queue_.empty();
active_[i] = sources_.front(); ++i) {
sources_.pop_front(); active_slots_[i] = source_queue_.front();
source_queue_.pop_front();
} }
} }
@ -62,7 +64,7 @@ class multi_queue_block_merger : public block_merger<SourceT, BlockT> {
--num_queueable_; --num_queueable_;
queues_[src].emplace(std::move(blk), is_last); block_queues_[src].emplace(std::move(blk), is_last);
while (try_merge_block()) { while (try_merge_block()) {
} }
@ -72,16 +74,16 @@ class multi_queue_block_merger : public block_merger<SourceT, BlockT> {
private: private:
size_t source_distance(source_type src) const { size_t source_distance(source_type src) const {
auto ix = active_index_; auto ix = active_slot_index_;
size_t distance{0}; size_t distance{0};
while (active_[ix] && active_[ix].value() != src) { while (active_slots_[ix] && active_slots_[ix].value() != src) {
++distance; ++distance;
ix = (ix + 1) % active_.size(); ix = (ix + 1) % active_slots_.size();
if (ix == active_index_) { if (ix == active_slot_index_) {
auto it = std::find(begin(sources_), end(sources_), src); auto it = std::find(begin(source_queue_), end(source_queue_), src);
distance += std::distance(begin(sources_), it); distance += std::distance(begin(source_queue_), it);
break; break;
} }
} }
@ -90,54 +92,54 @@ class multi_queue_block_merger : public block_merger<SourceT, BlockT> {
} }
bool try_merge_block() { bool try_merge_block() {
auto const ix = active_index_; auto const ix = active_slot_index_;
assert(active_[ix]); assert(active_slots_[ix]);
auto src = active_[ix].value(); auto src = active_slots_[ix].value();
auto it = queues_.find(src); auto it = block_queues_.find(src);
if (it == queues_.end() || it->second.empty()) { if (it == block_queues_.end() || it->second.empty()) {
return false; return false;
} }
auto [blk, is_last] = std::move(it->second.front()); auto [blk, is_last] = std::move(it->second.front());
it->second.pop(); it->second.pop();
on_block_merged_(std::move(blk)); on_block_merged_callback_(std::move(blk));
++num_queueable_; ++num_queueable_;
if (is_last) { if (is_last) {
queues_.erase(it); block_queues_.erase(it);
update_active(ix); update_active(ix);
} }
do { do {
active_index_ = (active_index_ + 1) % active_.size(); active_slot_index_ = (active_slot_index_ + 1) % active_slots_.size();
} while (active_index_ != ix && !active_[active_index_]); } while (active_slot_index_ != ix && !active_slots_[active_slot_index_]);
return active_index_ != ix || active_[active_index_]; return active_slot_index_ != ix || active_slots_[active_slot_index_];
} }
void update_active(size_t ix) { void update_active(size_t ix) {
if (!sources_.empty()) { if (!source_queue_.empty()) {
active_[ix] = sources_.front(); active_slots_[ix] = source_queue_.front();
sources_.pop_front(); source_queue_.pop_front();
} else { } else {
active_[ix].reset(); active_slots_[ix].reset();
} }
} }
std::mutex mx_; std::mutex mx_;
std::condition_variable cv_; std::condition_variable cv_;
size_t active_index_{0}; size_t active_slot_index_{0};
size_t num_queueable_; size_t num_queueable_;
std::unordered_map<source_type, std::queue<std::pair<block_type, bool>>> std::unordered_map<source_type, std::queue<std::pair<block_type, bool>>>
queues_; block_queues_;
std::deque<source_type> sources_; std::deque<source_type> source_queue_;
std::vector<std::optional<source_type>> active_; std::vector<std::optional<source_type>> active_slots_;
std::function<void(block_type)> on_block_merged_; std::function<void(block_type)> on_block_merged_callback_;
}; };
} // namespace dwarfs } // namespace dwarfs