mirror of
https://github.com/mhx/dwarfs.git
synced 2025-09-18 17:00:30 -04:00
chore: support logging in worker_group
This commit is contained in:
parent
e045a420c5
commit
3dbe63b7df
@ -31,6 +31,8 @@
|
||||
|
||||
namespace dwarfs {
|
||||
|
||||
class logger;
|
||||
|
||||
/**
|
||||
* A group of worker threads
|
||||
*
|
||||
@ -48,7 +50,7 @@ class worker_group {
|
||||
* \param num_workers Number of worker threads.
|
||||
*/
|
||||
explicit worker_group(
|
||||
const char* group_name, size_t num_workers = 1,
|
||||
logger& lgr, const char* group_name, size_t num_workers = 1,
|
||||
size_t max_queue_len = std::numeric_limits<size_t>::max(),
|
||||
int niceness = 0);
|
||||
|
||||
|
@ -144,11 +144,11 @@ class block_cache_ final : public block_cache::impl {
|
||||
, LOG_PROXY_INIT(lgr)
|
||||
, options_(options) {
|
||||
if (options.init_workers) {
|
||||
wg_ =
|
||||
worker_group("blkcache", std::max(options.num_workers > 0
|
||||
? options.num_workers
|
||||
: folly::hardware_concurrency(),
|
||||
static_cast<size_t>(1)));
|
||||
wg_ = worker_group(lgr, "blkcache",
|
||||
std::max(options.num_workers > 0
|
||||
? options.num_workers
|
||||
: folly::hardware_concurrency(),
|
||||
static_cast<size_t>(1)));
|
||||
}
|
||||
}
|
||||
|
||||
@ -247,7 +247,7 @@ class block_cache_ final : public block_cache::impl {
|
||||
wg_.stop();
|
||||
}
|
||||
|
||||
wg_ = worker_group("blkcache", num);
|
||||
wg_ = worker_group(LOG_GET_LOGGER, "blkcache", num);
|
||||
}
|
||||
|
||||
void set_tidy_config(cache_tidy_config const& cfg) override {
|
||||
|
@ -235,7 +235,7 @@ bool filesystem_extractor_<LoggerPolicy>::extract(
|
||||
|
||||
::archive_entry* spare = nullptr;
|
||||
|
||||
worker_group archiver("archiver", 1);
|
||||
worker_group archiver(LOG_GET_LOGGER, "archiver", 1);
|
||||
cache_semaphore sem;
|
||||
|
||||
LOG_DEBUG << "extractor semaphore size: " << opts.max_queued_bytes
|
||||
|
@ -736,7 +736,7 @@ int filesystem_<LoggerPolicy>::check(filesystem_check_level level,
|
||||
size_t num_threads) const {
|
||||
filesystem_parser parser(mm_, image_offset_);
|
||||
|
||||
worker_group wg("fscheck", num_threads);
|
||||
worker_group wg(LOG_GET_LOGGER, "fscheck", num_threads);
|
||||
std::vector<std::future<fs_section>> sections;
|
||||
|
||||
while (auto sp = parser.next_section()) {
|
||||
|
@ -692,8 +692,8 @@ void scanner_<LoggerPolicy>::scan(
|
||||
|
||||
{
|
||||
size_t const num_threads = options_.num_segmenter_workers;
|
||||
worker_group wg_ordering("ordering", num_threads);
|
||||
worker_group wg_blockify("blockify", num_threads);
|
||||
worker_group wg_ordering(LOG_GET_LOGGER, "ordering", num_threads);
|
||||
worker_group wg_blockify(LOG_GET_LOGGER, "blockify", num_threads);
|
||||
|
||||
fsw.configure(frag_info.categories, num_threads);
|
||||
|
||||
|
@ -41,6 +41,7 @@
|
||||
#include <folly/system/ThreadName.h>
|
||||
|
||||
#include "dwarfs/error.h"
|
||||
#include "dwarfs/logger.h"
|
||||
#include "dwarfs/util.h"
|
||||
#include "dwarfs/worker_group.h"
|
||||
|
||||
@ -91,16 +92,15 @@ double get_thread_cpu_time(std::thread const& t) {
|
||||
|
||||
#endif
|
||||
|
||||
} // namespace
|
||||
|
||||
template <typename Policy>
|
||||
template <typename LoggerPolicy, typename Policy>
|
||||
class basic_worker_group final : public worker_group::impl, private Policy {
|
||||
public:
|
||||
template <typename... Args>
|
||||
basic_worker_group(const char* group_name, size_t num_workers,
|
||||
basic_worker_group(logger& lgr, const char* group_name, size_t num_workers,
|
||||
size_t max_queue_len, int niceness [[maybe_unused]],
|
||||
Args&&... args)
|
||||
: Policy(std::forward<Args>(args)...)
|
||||
, LOG_PROXY_INIT(lgr)
|
||||
, running_(true)
|
||||
, pending_(0)
|
||||
, max_queue_len_(max_queue_len) {
|
||||
@ -331,9 +331,8 @@ class basic_worker_group final : public worker_group::impl, private Policy {
|
||||
try {
|
||||
job();
|
||||
} catch (...) {
|
||||
std::cerr << "FATAL ERROR: exception thrown in worker thread: "
|
||||
<< folly::exceptionStr(std::current_exception()) << '\n';
|
||||
std::abort();
|
||||
LOG_FATAL << "exception thrown in worker thread: "
|
||||
<< folly::exceptionStr(std::current_exception());
|
||||
}
|
||||
#ifdef _WIN32
|
||||
if (is_background) {
|
||||
@ -352,6 +351,7 @@ class basic_worker_group final : public worker_group::impl, private Policy {
|
||||
}
|
||||
}
|
||||
|
||||
LOG_PROXY_DECL(LoggerPolicy);
|
||||
std::vector<std::thread> workers_;
|
||||
jobs_t jobs_;
|
||||
std::condition_variable cond_;
|
||||
@ -371,9 +371,16 @@ class no_policy {
|
||||
};
|
||||
};
|
||||
|
||||
worker_group::worker_group(const char* group_name, size_t num_workers,
|
||||
size_t max_queue_len, int niceness)
|
||||
: impl_{std::make_unique<basic_worker_group<no_policy>>(
|
||||
group_name, num_workers, max_queue_len, niceness)} {}
|
||||
template <typename LoggerPolicy>
|
||||
using default_worker_group = basic_worker_group<LoggerPolicy, no_policy>;
|
||||
|
||||
} // namespace
|
||||
|
||||
worker_group::worker_group(logger& lgr, const char* group_name,
|
||||
size_t num_workers, size_t max_queue_len,
|
||||
int niceness)
|
||||
: impl_{make_unique_logging_object<impl, default_worker_group,
|
||||
logger_policies>(
|
||||
lgr, group_name, num_workers, max_queue_len, niceness)} {}
|
||||
|
||||
} // namespace dwarfs
|
||||
|
@ -104,7 +104,7 @@ int dwarfsbench_main(int argc, sys_char** argv, iolayer const& iol) {
|
||||
dwarfs::filesystem_v2 fs(lgr, std::make_shared<dwarfs::mmap>(filesystem),
|
||||
fsopts);
|
||||
|
||||
worker_group wg("reader", num_readers);
|
||||
worker_group wg(lgr, "reader", num_readers);
|
||||
|
||||
fs.walk([&](auto entry) {
|
||||
auto inode_data = entry.inode();
|
||||
|
@ -865,10 +865,6 @@ int mkdwarfs_main(int argc, sys_char** argv, iolayer const& iol) {
|
||||
num_segmenter_workers = num_workers;
|
||||
}
|
||||
|
||||
worker_group wg_compress("compress", num_workers,
|
||||
std::numeric_limits<size_t>::max(),
|
||||
compress_niceness);
|
||||
|
||||
options.num_segmenter_workers = num_segmenter_workers;
|
||||
|
||||
if (vm.count("debug-filter")) {
|
||||
@ -1243,6 +1239,10 @@ int mkdwarfs_main(int argc, sys_char** argv, iolayer const& iol) {
|
||||
block_compressor metadata_bc(metadata_compression);
|
||||
block_compressor history_bc(history_compression);
|
||||
|
||||
worker_group wg_compress(lgr, "compress", num_workers,
|
||||
std::numeric_limits<size_t>::max(),
|
||||
compress_niceness);
|
||||
|
||||
std::unique_ptr<filesystem_writer> fsw;
|
||||
|
||||
try {
|
||||
@ -1316,7 +1316,7 @@ int mkdwarfs_main(int argc, sys_char** argv, iolayer const& iol) {
|
||||
auto sf = std::make_shared<segmenter_factory>(
|
||||
lgr, prog, options.inode.categorizer_mgr, sf_config);
|
||||
|
||||
worker_group wg_scanner("scanner", num_scanner_workers);
|
||||
worker_group wg_scanner(lgr, "scanner", num_scanner_workers);
|
||||
|
||||
scanner s(lgr, wg_scanner, std::move(sf), entry_factory::create(), iol.os,
|
||||
std::move(script), options);
|
||||
|
@ -1109,7 +1109,7 @@ TEST_P(rewrite, filesystem_rewrite) {
|
||||
opts.recompress_block = recompress_block;
|
||||
opts.recompress_metadata = recompress_metadata;
|
||||
|
||||
worker_group wg("rewriter", 2);
|
||||
worker_group wg(lgr, "rewriter", 2);
|
||||
block_compressor bc("null");
|
||||
progress prog([](const progress&, bool) {}, 1000);
|
||||
std::ostringstream rewritten, idss;
|
||||
|
@ -115,11 +115,11 @@ std::string make_filesystem(::benchmark::State const& state) {
|
||||
options.plain_names_table = state.range(1);
|
||||
options.plain_symlinks_table = state.range(1);
|
||||
|
||||
worker_group wg("writer", 4);
|
||||
progress prog([](const progress&, bool) {}, 1000);
|
||||
|
||||
test::test_logger lgr;
|
||||
|
||||
worker_group wg(lgr, "writer", 4);
|
||||
progress prog([](const progress&, bool) {}, 1000);
|
||||
|
||||
auto sf = std::make_shared<segmenter_factory>(lgr, prog, cfg);
|
||||
|
||||
scanner s(lgr, wg, sf, entry_factory::create(),
|
||||
|
@ -74,7 +74,7 @@ build_dwarfs(logger& lgr, std::shared_ptr<test::os_access_mock> input,
|
||||
std::optional<std::span<std::filesystem::path const>> input_list =
|
||||
std::nullopt) {
|
||||
// force multithreading
|
||||
worker_group wg("worker", 4);
|
||||
worker_group wg(lgr, "worker", 4);
|
||||
|
||||
std::unique_ptr<progress> local_prog;
|
||||
if (!prog) {
|
||||
@ -908,7 +908,7 @@ class filter_test
|
||||
};
|
||||
|
||||
progress prog([](const progress&, bool) {}, 1000);
|
||||
worker_group wg("worker", 1);
|
||||
worker_group wg(lgr, "worker", 1);
|
||||
auto sf = std::make_shared<segmenter_factory>(lgr, prog,
|
||||
segmenter_factory::config{});
|
||||
scanner s(lgr, wg, sf, entry_factory::create(), input, scr, options);
|
||||
|
Loading…
x
Reference in New Issue
Block a user