diff --git a/CMakeLists.txt b/CMakeLists.txt
index 79d81b6b..d7315e87 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -598,6 +598,10 @@ if(WITH_TESTS)
target_link_libraries(dwarfs_utils_test gtest gtest_main)
list(APPEND TEST_TARGETS dwarfs_utils_test)
+ add_executable(block_merger_test test/block_merger_test.cpp)
+ target_link_libraries(block_merger_test gtest gtest_main)
+ list(APPEND TEST_TARGETS block_merger_test)
+
add_executable(dwarfs_pcm_sample_transformer_test test/pcm_sample_transformer_test.cpp)
target_link_libraries(dwarfs_pcm_sample_transformer_test gtest gtest_main)
list(APPEND TEST_TARGETS dwarfs_pcm_sample_transformer_test)
diff --git a/include/dwarfs/block_merger.h b/include/dwarfs/block_merger.h
new file mode 100644
index 00000000..27c92b0c
--- /dev/null
+++ b/include/dwarfs/block_merger.h
@@ -0,0 +1,37 @@
+/* vim:set ts=2 sw=2 sts=2 et: */
+/**
+ * \author Marcus Holland-Moritz (github@mhxnet.de)
+ * \copyright Copyright (c) Marcus Holland-Moritz
+ *
+ * This file is part of dwarfs.
+ *
+ * dwarfs is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dwarfs is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dwarfs. If not, see .
+ */
+
+#pragma once
+
+namespace dwarfs {
+
+template
+class block_merger {
+ public:
+ using source_type = SourceT;
+ using block_type = BlockT;
+
+ virtual ~block_merger() = default;
+
+ virtual void add(source_type src, block_type blk, bool is_last) = 0;
+};
+
+} // namespace dwarfs
diff --git a/include/dwarfs/multi_queue_block_merger.h b/include/dwarfs/multi_queue_block_merger.h
new file mode 100644
index 00000000..0f8e7ab6
--- /dev/null
+++ b/include/dwarfs/multi_queue_block_merger.h
@@ -0,0 +1,143 @@
+/* vim:set ts=2 sw=2 sts=2 et: */
+/**
+ * \author Marcus Holland-Moritz (github@mhxnet.de)
+ * \copyright Copyright (c) Marcus Holland-Moritz
+ *
+ * This file is part of dwarfs.
+ *
+ * dwarfs is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dwarfs is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dwarfs. If not, see .
+ */
+
+#pragma once
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "dwarfs/block_merger.h"
+
+namespace dwarfs {
+
+template
+class multi_queue_block_merger : public block_merger {
+ public:
+ using source_type = SourceT;
+ using block_type = BlockT;
+
+ multi_queue_block_merger(size_t num_active_slots, size_t max_queued_blocks,
+ std::vector const& sources,
+ std::function on_block_merged)
+ : num_queueable_{max_queued_blocks}
+ , sources_{sources.begin(), sources.end()}
+ , active_(num_active_slots)
+ , on_block_merged_{on_block_merged} {
+ for (size_t i = 0; i < active_.size() && !sources_.empty(); ++i) {
+ active_[i] = sources_.front();
+ sources_.pop_front();
+ }
+ }
+
+ void add(source_type src, block_type blk, bool is_last) override {
+ std::unique_lock lock{mx_};
+
+ cv_.wait(lock,
+ [this, &src] { return source_distance(src) < num_queueable_; });
+
+ --num_queueable_;
+
+ queues_[src].emplace(std::move(blk), is_last);
+
+ while (try_merge_block()) {
+ }
+
+ cv_.notify_all();
+ }
+
+ private:
+ size_t source_distance(source_type src) const {
+ auto ix = active_index_;
+ size_t distance{0};
+
+ while (active_[ix] && active_[ix].value() != src) {
+ ++distance;
+ ix = (ix + 1) % active_.size();
+
+ if (ix == active_index_) {
+ auto it = std::find(begin(sources_), end(sources_), src);
+ distance += std::distance(begin(sources_), it);
+ break;
+ }
+ }
+
+ return distance;
+ }
+
+ bool try_merge_block() {
+ auto const ix = active_index_;
+
+ assert(active_[ix]);
+
+ auto src = active_[ix].value();
+ auto it = queues_.find(src);
+
+ if (it == queues_.end() || it->second.empty()) {
+ return false;
+ }
+
+ auto [blk, is_last] = std::move(it->second.front());
+ it->second.pop();
+
+ on_block_merged_(std::move(blk));
+
+ ++num_queueable_;
+
+ if (is_last) {
+ queues_.erase(it);
+ update_active(ix);
+ }
+
+ do {
+ active_index_ = (active_index_ + 1) % active_.size();
+ } while (active_index_ != ix && !active_[active_index_]);
+
+ return active_index_ != ix || active_[active_index_];
+ }
+
+ void update_active(size_t ix) {
+ if (!sources_.empty()) {
+ active_[ix] = sources_.front();
+ sources_.pop_front();
+ } else {
+ active_[ix].reset();
+ }
+ }
+
+ std::mutex mx_;
+ std::condition_variable cv_;
+ size_t active_index_{0};
+ size_t num_queueable_;
+ std::unordered_map>>
+ queues_;
+ std::deque sources_;
+ std::vector> active_;
+ std::function on_block_merged_;
+};
+
+} // namespace dwarfs
diff --git a/test/block_merger_test.cpp b/test/block_merger_test.cpp
new file mode 100644
index 00000000..20c32c56
--- /dev/null
+++ b/test/block_merger_test.cpp
@@ -0,0 +1,272 @@
+/* vim:set ts=2 sw=2 sts=2 et: */
+/**
+ * \author Marcus Holland-Moritz (github@mhxnet.de)
+ * \copyright Copyright (c) Marcus Holland-Moritz
+ *
+ * This file is part of dwarfs.
+ *
+ * dwarfs is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dwarfs is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dwarfs. If not, see .
+ */
+
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+#include "dwarfs/multi_queue_block_merger.h"
+
+using namespace dwarfs;
+
+namespace {
+
+constexpr int const debuglevel{0};
+
+constexpr size_t const max_runs{250};
+constexpr size_t const num_runner_threads{16};
+constexpr size_t const num_repetitions{4};
+
+using block = std::pair;
+
+// Use std::shared_mutex because folly::SharedMutex might trigger TSAN
+template
+using synchronized = folly::Synchronized;
+
+template
+using sync_queue = synchronized>;
+
+class source {
+ public:
+ source(size_t id, std::mt19937& delay_rng, std::mt19937& rng,
+ size_t max_blocks = 20, double ips = 5000.0)
+ : id_{id}
+ , blocks_{init_blocks(delay_rng, rng, max_blocks, ips)} {}
+
+ std::tuple next() {
+ auto idx = idx_++;
+ return {std::make_pair(id_, idx), idx_ >= blocks_.size(), blocks_[idx]};
+ }
+
+ size_t id() const { return id_; }
+
+ size_t num_blocks() const { return blocks_.size(); }
+
+ std::chrono::nanoseconds total_time() const {
+ auto seconds = std::accumulate(begin(blocks_), end(blocks_), 0.0);
+ return std::chrono::duration_cast(
+ std::chrono::duration(seconds));
+ }
+
+ private:
+ static std::vector
+ init_blocks(std::mt19937& delay_rng, std::mt19937& rng, size_t max_blocks,
+ double ips) {
+ std::uniform_int_distribution<> idist(1, max_blocks);
+ std::exponential_distribution<> edist(ips);
+ std::vector blocks;
+ blocks.resize(idist(rng));
+ std::generate(begin(blocks), end(blocks), [&] { return edist(delay_rng); });
+ return blocks;
+ }
+
+ size_t idx_{0};
+ size_t id_;
+ std::vector blocks_;
+};
+
+void emitter(sync_queue& sources,
+ dwarfs::block_merger& merger) {
+ for (;;) {
+ auto src = sources.withWLock([](auto&& q) {
+ std::optional src;
+
+ if (!q.empty()) {
+ src = std::move(q.front());
+ q.pop();
+ }
+
+ return src;
+ });
+
+ if (!src) {
+ break;
+ }
+
+ auto t = std::chrono::steady_clock::now();
+
+ for (;;) {
+ auto [blk, is_last, wait] = src->next();
+
+ t += std::chrono::duration_cast(
+ std::chrono::duration(wait));
+
+ std::this_thread::sleep_until(t);
+
+ merger.add(blk.first, blk, is_last);
+
+ if (is_last) {
+ break;
+ }
+ }
+ }
+}
+
+std::vector
+do_run(std::mutex& out_mx, size_t run, std::mt19937& delay_rng) {
+ std::mt19937 rng(run);
+ std::exponential_distribution<> sources_dist(0.1);
+ std::exponential_distribution<> threads_dist(0.1);
+ std::exponential_distribution<> slots_dist(0.1);
+ std::exponential_distribution<> inflight_dist(0.1);
+ std::uniform_real_distribution<> speed_dist(0.1, 10.0);
+ auto const num_sources{std::max(1, sources_dist(rng))};
+ auto const num_slots{std::max(1, slots_dist(rng))};
+ auto const num_threads{std::max(num_slots, threads_dist(delay_rng))};
+ auto const max_in_flight{std::max(1, inflight_dist(delay_rng))};
+
+ std::vector source_ids;
+ sync_queue sources;
+ std::chrono::nanoseconds total_time{};
+
+ for (size_t i = 0; i < num_sources; ++i) {
+ auto src = source(i, delay_rng, rng, 30, 10000.0 * speed_dist(delay_rng));
+ total_time += src.total_time();
+ source_ids.emplace_back(src.id());
+ sources.wlock()->emplace(std::move(src));
+ }
+
+ auto config =
+ fmt::format("sources: {}, slots: {}, threads: {}, max in flight: {}",
+ num_sources, num_slots, num_threads, max_in_flight);
+
+ if constexpr (debuglevel > 0) {
+ std::lock_guard lock{out_mx};
+ std::cout << config << "\n";
+ }
+
+ std::vector merged;
+
+ dwarfs::multi_queue_block_merger merger(
+ num_slots, max_in_flight, source_ids,
+ [&merged](block blk) { merged.emplace_back(std::move(blk)); });
+
+ std::vector thr;
+
+ auto t0 = std::chrono::steady_clock::now();
+
+ for (size_t i = 0; i < num_threads; ++i) {
+ thr.emplace_back([&] { emitter(sources, merger); });
+ }
+
+ for (auto& t : thr) {
+ t.join();
+ }
+
+ auto t1 = std::chrono::steady_clock::now();
+
+ auto elapsed = num_threads * (t1 - t0);
+ auto efficiency =
+ std::chrono::duration_cast>(total_time)
+ .count() /
+ std::chrono::duration_cast>(elapsed)
+ .count();
+
+ if constexpr (debuglevel > 0) {
+ std::lock_guard lock{out_mx};
+ std::cout << config
+ << fmt::format(" => efficiency: {:.2f}%\n", 100.0 * efficiency);
+ }
+
+ return merged;
+}
+
+[[maybe_unused]] void
+dump(std::mutex& out_mx, std::vector const& blocks) {
+ if constexpr (debuglevel > 1) {
+ std::lock_guard lock{out_mx};
+ for (size_t i = 0; i < blocks.size(); ++i) {
+ if (i > 0) {
+ std::cout << ", ";
+ }
+ auto const& b = blocks[i];
+ std::cout << b.first << "." << b.second;
+ }
+ std::cout << "\n";
+ }
+}
+
+void runner_thread(size_t tid, std::mutex& out_mx, std::atomic& runs,
+ size_t const max_runs, std::atomic& passes,
+ synchronized>& fails) {
+ std::mt19937 delay_rng(tid);
+
+ for (;;) {
+ auto run = runs++;
+ if (run >= max_runs) {
+ break;
+ }
+ if constexpr (debuglevel > 0) {
+ std::lock_guard lock{out_mx};
+ std::cout << "[" << run << "/" << tid << "] ref\n";
+ }
+ auto ref = do_run(out_mx, run, delay_rng);
+ dump(out_mx, ref);
+ for (size_t rep = 0; rep < num_repetitions; ++rep) {
+ if constexpr (debuglevel > 0) {
+ std::lock_guard lock{out_mx};
+ std::cout << "[" << run << "/" << tid << "] test\n";
+ }
+ auto test = do_run(out_mx, run, delay_rng);
+ dump(out_mx, test);
+ if (test == ref) {
+ ++passes;
+ } else {
+ fails.wlock()->emplace_back(run);
+ }
+ }
+ }
+}
+
+} // namespace
+
+TEST(block_merger, random) {
+ std::mutex out_mx;
+ std::atomic runs{0};
+ std::atomic passes{0};
+ synchronized> fails;
+
+ std::vector thr;
+
+ for (size_t i = 0; i < num_runner_threads; ++i) {
+ thr.emplace_back(
+ [&, i] { runner_thread(i, out_mx, runs, max_runs, passes, fails); });
+ }
+
+ for (auto& t : thr) {
+ t.join();
+ }
+
+ EXPECT_EQ(max_runs * num_repetitions, passes);
+ EXPECT_TRUE(fails.rlock()->empty()) << folly::join(", ", *fails.rlock());
+}