mirror of
https://github.com/mhx/dwarfs.git
synced 2025-09-11 13:30:47 -04:00
feat: add multi queue block merger + unit tests
This commit is contained in:
parent
f2249f3b6c
commit
20e4a83b92
@ -598,6 +598,10 @@ if(WITH_TESTS)
|
||||
target_link_libraries(dwarfs_utils_test gtest gtest_main)
|
||||
list(APPEND TEST_TARGETS dwarfs_utils_test)
|
||||
|
||||
add_executable(block_merger_test test/block_merger_test.cpp)
|
||||
target_link_libraries(block_merger_test gtest gtest_main)
|
||||
list(APPEND TEST_TARGETS block_merger_test)
|
||||
|
||||
add_executable(dwarfs_pcm_sample_transformer_test test/pcm_sample_transformer_test.cpp)
|
||||
target_link_libraries(dwarfs_pcm_sample_transformer_test gtest gtest_main)
|
||||
list(APPEND TEST_TARGETS dwarfs_pcm_sample_transformer_test)
|
||||
|
37
include/dwarfs/block_merger.h
Normal file
37
include/dwarfs/block_merger.h
Normal file
@ -0,0 +1,37 @@
|
||||
/* vim:set ts=2 sw=2 sts=2 et: */
|
||||
/**
|
||||
* \author Marcus Holland-Moritz (github@mhxnet.de)
|
||||
* \copyright Copyright (c) Marcus Holland-Moritz
|
||||
*
|
||||
* This file is part of dwarfs.
|
||||
*
|
||||
* dwarfs is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* dwarfs is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace dwarfs {
|
||||
|
||||
template <typename SourceT, typename BlockT>
|
||||
class block_merger {
|
||||
public:
|
||||
using source_type = SourceT;
|
||||
using block_type = BlockT;
|
||||
|
||||
virtual ~block_merger() = default;
|
||||
|
||||
virtual void add(source_type src, block_type blk, bool is_last) = 0;
|
||||
};
|
||||
|
||||
} // namespace dwarfs
|
143
include/dwarfs/multi_queue_block_merger.h
Normal file
143
include/dwarfs/multi_queue_block_merger.h
Normal file
@ -0,0 +1,143 @@
|
||||
/* vim:set ts=2 sw=2 sts=2 et: */
|
||||
/**
|
||||
* \author Marcus Holland-Moritz (github@mhxnet.de)
|
||||
* \copyright Copyright (c) Marcus Holland-Moritz
|
||||
*
|
||||
* This file is part of dwarfs.
|
||||
*
|
||||
* dwarfs is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* dwarfs is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <algorithm>
|
||||
#include <cassert>
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <queue>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "dwarfs/block_merger.h"
|
||||
|
||||
namespace dwarfs {
|
||||
|
||||
template <typename SourceT, typename BlockT>
|
||||
class multi_queue_block_merger : public block_merger<SourceT, BlockT> {
|
||||
public:
|
||||
using source_type = SourceT;
|
||||
using block_type = BlockT;
|
||||
|
||||
multi_queue_block_merger(size_t num_active_slots, size_t max_queued_blocks,
|
||||
std::vector<source_type> const& sources,
|
||||
std::function<void(block_type)> on_block_merged)
|
||||
: num_queueable_{max_queued_blocks}
|
||||
, sources_{sources.begin(), sources.end()}
|
||||
, active_(num_active_slots)
|
||||
, on_block_merged_{on_block_merged} {
|
||||
for (size_t i = 0; i < active_.size() && !sources_.empty(); ++i) {
|
||||
active_[i] = sources_.front();
|
||||
sources_.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
void add(source_type src, block_type blk, bool is_last) override {
|
||||
std::unique_lock lock{mx_};
|
||||
|
||||
cv_.wait(lock,
|
||||
[this, &src] { return source_distance(src) < num_queueable_; });
|
||||
|
||||
--num_queueable_;
|
||||
|
||||
queues_[src].emplace(std::move(blk), is_last);
|
||||
|
||||
while (try_merge_block()) {
|
||||
}
|
||||
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
private:
|
||||
size_t source_distance(source_type src) const {
|
||||
auto ix = active_index_;
|
||||
size_t distance{0};
|
||||
|
||||
while (active_[ix] && active_[ix].value() != src) {
|
||||
++distance;
|
||||
ix = (ix + 1) % active_.size();
|
||||
|
||||
if (ix == active_index_) {
|
||||
auto it = std::find(begin(sources_), end(sources_), src);
|
||||
distance += std::distance(begin(sources_), it);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return distance;
|
||||
}
|
||||
|
||||
bool try_merge_block() {
|
||||
auto const ix = active_index_;
|
||||
|
||||
assert(active_[ix]);
|
||||
|
||||
auto src = active_[ix].value();
|
||||
auto it = queues_.find(src);
|
||||
|
||||
if (it == queues_.end() || it->second.empty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto [blk, is_last] = std::move(it->second.front());
|
||||
it->second.pop();
|
||||
|
||||
on_block_merged_(std::move(blk));
|
||||
|
||||
++num_queueable_;
|
||||
|
||||
if (is_last) {
|
||||
queues_.erase(it);
|
||||
update_active(ix);
|
||||
}
|
||||
|
||||
do {
|
||||
active_index_ = (active_index_ + 1) % active_.size();
|
||||
} while (active_index_ != ix && !active_[active_index_]);
|
||||
|
||||
return active_index_ != ix || active_[active_index_];
|
||||
}
|
||||
|
||||
void update_active(size_t ix) {
|
||||
if (!sources_.empty()) {
|
||||
active_[ix] = sources_.front();
|
||||
sources_.pop_front();
|
||||
} else {
|
||||
active_[ix].reset();
|
||||
}
|
||||
}
|
||||
|
||||
std::mutex mx_;
|
||||
std::condition_variable cv_;
|
||||
size_t active_index_{0};
|
||||
size_t num_queueable_;
|
||||
std::unordered_map<source_type, std::queue<std::pair<block_type, bool>>>
|
||||
queues_;
|
||||
std::deque<source_type> sources_;
|
||||
std::vector<std::optional<source_type>> active_;
|
||||
std::function<void(block_type)> on_block_merged_;
|
||||
};
|
||||
|
||||
} // namespace dwarfs
|
272
test/block_merger_test.cpp
Normal file
272
test/block_merger_test.cpp
Normal file
@ -0,0 +1,272 @@
|
||||
/* vim:set ts=2 sw=2 sts=2 et: */
|
||||
/**
|
||||
* \author Marcus Holland-Moritz (github@mhxnet.de)
|
||||
* \copyright Copyright (c) Marcus Holland-Moritz
|
||||
*
|
||||
* This file is part of dwarfs.
|
||||
*
|
||||
* dwarfs is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* dwarfs is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
#include <numeric>
|
||||
#include <queue>
|
||||
#include <random>
|
||||
#include <shared_mutex>
|
||||
#include <thread>
|
||||
#include <tuple>
|
||||
#include <vector>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <folly/String.h>
|
||||
#include <folly/Synchronized.h>
|
||||
|
||||
#include "dwarfs/multi_queue_block_merger.h"
|
||||
|
||||
using namespace dwarfs;
|
||||
|
||||
namespace {
|
||||
|
||||
constexpr int const debuglevel{0};
|
||||
|
||||
constexpr size_t const max_runs{250};
|
||||
constexpr size_t const num_runner_threads{16};
|
||||
constexpr size_t const num_repetitions{4};
|
||||
|
||||
using block = std::pair<size_t, size_t>;
|
||||
|
||||
// Use std::shared_mutex because folly::SharedMutex might trigger TSAN
|
||||
template <typename T>
|
||||
using synchronized = folly::Synchronized<T, std::shared_mutex>;
|
||||
|
||||
template <typename T>
|
||||
using sync_queue = synchronized<std::queue<T>>;
|
||||
|
||||
class source {
|
||||
public:
|
||||
source(size_t id, std::mt19937& delay_rng, std::mt19937& rng,
|
||||
size_t max_blocks = 20, double ips = 5000.0)
|
||||
: id_{id}
|
||||
, blocks_{init_blocks(delay_rng, rng, max_blocks, ips)} {}
|
||||
|
||||
std::tuple<block, bool, double> next() {
|
||||
auto idx = idx_++;
|
||||
return {std::make_pair(id_, idx), idx_ >= blocks_.size(), blocks_[idx]};
|
||||
}
|
||||
|
||||
size_t id() const { return id_; }
|
||||
|
||||
size_t num_blocks() const { return blocks_.size(); }
|
||||
|
||||
std::chrono::nanoseconds total_time() const {
|
||||
auto seconds = std::accumulate(begin(blocks_), end(blocks_), 0.0);
|
||||
return std::chrono::duration_cast<std::chrono::nanoseconds>(
|
||||
std::chrono::duration<double>(seconds));
|
||||
}
|
||||
|
||||
private:
|
||||
static std::vector<double>
|
||||
init_blocks(std::mt19937& delay_rng, std::mt19937& rng, size_t max_blocks,
|
||||
double ips) {
|
||||
std::uniform_int_distribution<> idist(1, max_blocks);
|
||||
std::exponential_distribution<> edist(ips);
|
||||
std::vector<double> blocks;
|
||||
blocks.resize(idist(rng));
|
||||
std::generate(begin(blocks), end(blocks), [&] { return edist(delay_rng); });
|
||||
return blocks;
|
||||
}
|
||||
|
||||
size_t idx_{0};
|
||||
size_t id_;
|
||||
std::vector<double> blocks_;
|
||||
};
|
||||
|
||||
void emitter(sync_queue<source>& sources,
|
||||
dwarfs::block_merger<size_t, block>& merger) {
|
||||
for (;;) {
|
||||
auto src = sources.withWLock([](auto&& q) {
|
||||
std::optional<source> src;
|
||||
|
||||
if (!q.empty()) {
|
||||
src = std::move(q.front());
|
||||
q.pop();
|
||||
}
|
||||
|
||||
return src;
|
||||
});
|
||||
|
||||
if (!src) {
|
||||
break;
|
||||
}
|
||||
|
||||
auto t = std::chrono::steady_clock::now();
|
||||
|
||||
for (;;) {
|
||||
auto [blk, is_last, wait] = src->next();
|
||||
|
||||
t += std::chrono::duration_cast<std::chrono::nanoseconds>(
|
||||
std::chrono::duration<double>(wait));
|
||||
|
||||
std::this_thread::sleep_until(t);
|
||||
|
||||
merger.add(blk.first, blk, is_last);
|
||||
|
||||
if (is_last) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<block>
|
||||
do_run(std::mutex& out_mx, size_t run, std::mt19937& delay_rng) {
|
||||
std::mt19937 rng(run);
|
||||
std::exponential_distribution<> sources_dist(0.1);
|
||||
std::exponential_distribution<> threads_dist(0.1);
|
||||
std::exponential_distribution<> slots_dist(0.1);
|
||||
std::exponential_distribution<> inflight_dist(0.1);
|
||||
std::uniform_real_distribution<> speed_dist(0.1, 10.0);
|
||||
auto const num_sources{std::max<size_t>(1, sources_dist(rng))};
|
||||
auto const num_slots{std::max<size_t>(1, slots_dist(rng))};
|
||||
auto const num_threads{std::max<size_t>(num_slots, threads_dist(delay_rng))};
|
||||
auto const max_in_flight{std::max<size_t>(1, inflight_dist(delay_rng))};
|
||||
|
||||
std::vector<size_t> source_ids;
|
||||
sync_queue<source> sources;
|
||||
std::chrono::nanoseconds total_time{};
|
||||
|
||||
for (size_t i = 0; i < num_sources; ++i) {
|
||||
auto src = source(i, delay_rng, rng, 30, 10000.0 * speed_dist(delay_rng));
|
||||
total_time += src.total_time();
|
||||
source_ids.emplace_back(src.id());
|
||||
sources.wlock()->emplace(std::move(src));
|
||||
}
|
||||
|
||||
auto config =
|
||||
fmt::format("sources: {}, slots: {}, threads: {}, max in flight: {}",
|
||||
num_sources, num_slots, num_threads, max_in_flight);
|
||||
|
||||
if constexpr (debuglevel > 0) {
|
||||
std::lock_guard lock{out_mx};
|
||||
std::cout << config << "\n";
|
||||
}
|
||||
|
||||
std::vector<block> merged;
|
||||
|
||||
dwarfs::multi_queue_block_merger<size_t, block> merger(
|
||||
num_slots, max_in_flight, source_ids,
|
||||
[&merged](block blk) { merged.emplace_back(std::move(blk)); });
|
||||
|
||||
std::vector<std::thread> thr;
|
||||
|
||||
auto t0 = std::chrono::steady_clock::now();
|
||||
|
||||
for (size_t i = 0; i < num_threads; ++i) {
|
||||
thr.emplace_back([&] { emitter(sources, merger); });
|
||||
}
|
||||
|
||||
for (auto& t : thr) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
auto t1 = std::chrono::steady_clock::now();
|
||||
|
||||
auto elapsed = num_threads * (t1 - t0);
|
||||
auto efficiency =
|
||||
std::chrono::duration_cast<std::chrono::duration<double>>(total_time)
|
||||
.count() /
|
||||
std::chrono::duration_cast<std::chrono::duration<double>>(elapsed)
|
||||
.count();
|
||||
|
||||
if constexpr (debuglevel > 0) {
|
||||
std::lock_guard lock{out_mx};
|
||||
std::cout << config
|
||||
<< fmt::format(" => efficiency: {:.2f}%\n", 100.0 * efficiency);
|
||||
}
|
||||
|
||||
return merged;
|
||||
}
|
||||
|
||||
[[maybe_unused]] void
|
||||
dump(std::mutex& out_mx, std::vector<block> const& blocks) {
|
||||
if constexpr (debuglevel > 1) {
|
||||
std::lock_guard lock{out_mx};
|
||||
for (size_t i = 0; i < blocks.size(); ++i) {
|
||||
if (i > 0) {
|
||||
std::cout << ", ";
|
||||
}
|
||||
auto const& b = blocks[i];
|
||||
std::cout << b.first << "." << b.second;
|
||||
}
|
||||
std::cout << "\n";
|
||||
}
|
||||
}
|
||||
|
||||
void runner_thread(size_t tid, std::mutex& out_mx, std::atomic<size_t>& runs,
|
||||
size_t const max_runs, std::atomic<size_t>& passes,
|
||||
synchronized<std::vector<size_t>>& fails) {
|
||||
std::mt19937 delay_rng(tid);
|
||||
|
||||
for (;;) {
|
||||
auto run = runs++;
|
||||
if (run >= max_runs) {
|
||||
break;
|
||||
}
|
||||
if constexpr (debuglevel > 0) {
|
||||
std::lock_guard lock{out_mx};
|
||||
std::cout << "[" << run << "/" << tid << "] ref\n";
|
||||
}
|
||||
auto ref = do_run(out_mx, run, delay_rng);
|
||||
dump(out_mx, ref);
|
||||
for (size_t rep = 0; rep < num_repetitions; ++rep) {
|
||||
if constexpr (debuglevel > 0) {
|
||||
std::lock_guard lock{out_mx};
|
||||
std::cout << "[" << run << "/" << tid << "] test\n";
|
||||
}
|
||||
auto test = do_run(out_mx, run, delay_rng);
|
||||
dump(out_mx, test);
|
||||
if (test == ref) {
|
||||
++passes;
|
||||
} else {
|
||||
fails.wlock()->emplace_back(run);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
TEST(block_merger, random) {
|
||||
std::mutex out_mx;
|
||||
std::atomic<size_t> runs{0};
|
||||
std::atomic<size_t> passes{0};
|
||||
synchronized<std::vector<size_t>> fails;
|
||||
|
||||
std::vector<std::thread> thr;
|
||||
|
||||
for (size_t i = 0; i < num_runner_threads; ++i) {
|
||||
thr.emplace_back(
|
||||
[&, i] { runner_thread(i, out_mx, runs, max_runs, passes, fails); });
|
||||
}
|
||||
|
||||
for (auto& t : thr) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
EXPECT_EQ(max_runs * num_repetitions, passes);
|
||||
EXPECT_TRUE(fails.rlock()->empty()) << folly::join(", ", *fails.rlock());
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user