mirror of
https://github.com/mhx/dwarfs.git
synced 2025-09-14 14:59:52 -04:00
Improve error handling & resilience
This commit is contained in:
parent
460b932c95
commit
1d39c4b50f
@ -81,9 +81,13 @@ class cached_block {
|
|||||||
|
|
||||||
const uint8_t* data() const { return data_.data(); }
|
const uint8_t* data() const { return data_.data(); }
|
||||||
|
|
||||||
|
size_t size() const { return data_.size(); }
|
||||||
|
|
||||||
void decompress_until(size_t end) {
|
void decompress_until(size_t end) {
|
||||||
while (data_.size() < end) {
|
while (data_.size() < end) {
|
||||||
assert(decompressor_);
|
if (!decompressor_) {
|
||||||
|
DWARFS_THROW(runtime_error, "no decompressor for block");
|
||||||
|
}
|
||||||
|
|
||||||
if (decompressor_->decompress_frame()) {
|
if (decompressor_->decompress_frame()) {
|
||||||
// We're done, free the memory
|
// We're done, free the memory
|
||||||
@ -128,7 +132,7 @@ class block_request {
|
|||||||
: begin_(begin)
|
: begin_(begin)
|
||||||
, end_(end)
|
, end_(end)
|
||||||
, promise_(std::move(promise)) {
|
, promise_(std::move(promise)) {
|
||||||
assert(begin_ < end_);
|
DWARFS_ASSERT(begin_ < end_, "invalid block_request");
|
||||||
}
|
}
|
||||||
|
|
||||||
block_request(block_request&&) = default;
|
block_request(block_request&&) = default;
|
||||||
@ -401,22 +405,31 @@ class block_cache_ : public block_cache::impl {
|
|||||||
|
|
||||||
// Bummer. We don't know anything about the block.
|
// Bummer. We don't know anything about the block.
|
||||||
|
|
||||||
LOG_DEBUG << "block " << block_no << " not found";
|
try {
|
||||||
|
if (block_no >= block_.size()) {
|
||||||
|
DWARFS_THROW(runtime_error,
|
||||||
|
fmt::format("block number out of range {0} >= {1}",
|
||||||
|
block_no, block_.size()));
|
||||||
|
}
|
||||||
|
|
||||||
assert(block_no < block_.size());
|
LOG_DEBUG << "block " << block_no << " not found";
|
||||||
|
|
||||||
auto block = std::make_shared<cached_block>(
|
auto block = std::make_shared<cached_block>(
|
||||||
LOG_GET_LOGGER, block_[block_no], mm_, options_.mm_release);
|
LOG_GET_LOGGER, DWARFS_NOTHROW(block_.at(block_no)), mm_,
|
||||||
++blocks_created_;
|
options_.mm_release);
|
||||||
|
++blocks_created_;
|
||||||
|
|
||||||
// Make a new set for the block
|
// Make a new set for the block
|
||||||
brs = std::make_shared<block_request_set>(std::move(block), block_no);
|
brs = std::make_shared<block_request_set>(std::move(block), block_no);
|
||||||
|
|
||||||
// Promise will be fulfilled asynchronously
|
// Promise will be fulfilled asynchronously
|
||||||
brs->add(offset, range_end, std::move(promise));
|
brs->add(offset, range_end, std::move(promise));
|
||||||
|
|
||||||
active_[block_no].emplace_back(brs);
|
active_[block_no].emplace_back(brs);
|
||||||
enqueue_job(std::move(brs));
|
enqueue_job(std::move(brs));
|
||||||
|
} catch (...) {
|
||||||
|
promise.set_exception(std::current_exception());
|
||||||
|
}
|
||||||
|
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
@ -561,6 +574,15 @@ block_range::block_range(std::shared_ptr<cached_block const> block,
|
|||||||
size_t offset, size_t size)
|
size_t offset, size_t size)
|
||||||
: begin_(block->data() + offset)
|
: begin_(block->data() + offset)
|
||||||
, end_(begin_ + size)
|
, end_(begin_ + size)
|
||||||
, block_(std::move(block)) {}
|
, block_(std::move(block)) {
|
||||||
|
if (!block_->data()) {
|
||||||
|
DWARFS_THROW(runtime_error, "block_range: block data is null");
|
||||||
|
}
|
||||||
|
if (size > block_->size()) {
|
||||||
|
DWARFS_THROW(runtime_error,
|
||||||
|
fmt::format("block_range: size out of range ({0} > {1})", size,
|
||||||
|
block_->size()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace dwarfs
|
} // namespace dwarfs
|
||||||
|
@ -493,7 +493,15 @@ class null_block_decompressor : public block_decompressor::impl {
|
|||||||
: decompressed_(target)
|
: decompressed_(target)
|
||||||
, data_(data)
|
, data_(data)
|
||||||
, uncompressed_size_(size) {
|
, uncompressed_size_(size) {
|
||||||
decompressed_.reserve(uncompressed_size_);
|
// TODO: we shouldn't have to copy this to memory at all...
|
||||||
|
try {
|
||||||
|
decompressed_.reserve(uncompressed_size_);
|
||||||
|
} catch (std::bad_alloc const&) {
|
||||||
|
DWARFS_THROW(
|
||||||
|
runtime_error,
|
||||||
|
fmt::format("could not reserve {} bytes for decompressed block",
|
||||||
|
uncompressed_size_));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
compression_type type() const override { return compression_type::NONE; }
|
compression_type type() const override { return compression_type::NONE; }
|
||||||
@ -536,7 +544,14 @@ class lzma_block_decompressor : public block_decompressor::impl {
|
|||||||
LZMA_OK) {
|
LZMA_OK) {
|
||||||
DWARFS_THROW(runtime_error, "lzma_stream_decoder");
|
DWARFS_THROW(runtime_error, "lzma_stream_decoder");
|
||||||
}
|
}
|
||||||
decompressed_.reserve(uncompressed_size_);
|
try {
|
||||||
|
decompressed_.reserve(uncompressed_size_);
|
||||||
|
} catch (std::bad_alloc const&) {
|
||||||
|
DWARFS_THROW(
|
||||||
|
runtime_error,
|
||||||
|
fmt::format("could not reserve {} bytes for decompressed block",
|
||||||
|
uncompressed_size_));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
~lzma_block_decompressor() { lzma_end(&stream_); }
|
~lzma_block_decompressor() { lzma_end(&stream_); }
|
||||||
@ -605,7 +620,14 @@ class lz4_block_decompressor : public block_decompressor::impl {
|
|||||||
, data_(data + sizeof(uint32_t))
|
, data_(data + sizeof(uint32_t))
|
||||||
, input_size_(size - sizeof(uint32_t))
|
, input_size_(size - sizeof(uint32_t))
|
||||||
, uncompressed_size_(get_uncompressed_size(data)) {
|
, uncompressed_size_(get_uncompressed_size(data)) {
|
||||||
decompressed_.reserve(uncompressed_size_);
|
try {
|
||||||
|
decompressed_.reserve(uncompressed_size_);
|
||||||
|
} catch (std::bad_alloc const&) {
|
||||||
|
DWARFS_THROW(
|
||||||
|
runtime_error,
|
||||||
|
fmt::format("could not reserve {} bytes for decompressed block",
|
||||||
|
uncompressed_size_));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
compression_type type() const override { return compression_type::LZ4; }
|
compression_type type() const override { return compression_type::LZ4; }
|
||||||
@ -656,7 +678,14 @@ class zstd_block_decompressor : public block_decompressor::impl {
|
|||||||
, data_(data)
|
, data_(data)
|
||||||
, size_(size)
|
, size_(size)
|
||||||
, uncompressed_size_(ZSTD_getDecompressedSize(data, size)) {
|
, uncompressed_size_(ZSTD_getDecompressedSize(data, size)) {
|
||||||
decompressed_.reserve(uncompressed_size_);
|
try {
|
||||||
|
decompressed_.reserve(uncompressed_size_);
|
||||||
|
} catch (std::bad_alloc const&) {
|
||||||
|
DWARFS_THROW(
|
||||||
|
runtime_error,
|
||||||
|
fmt::format("could not reserve {} bytes for decompressed block",
|
||||||
|
uncompressed_size_));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
compression_type type() const override { return compression_type::ZSTD; }
|
compression_type type() const override { return compression_type::ZSTD; }
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
#include <folly/String.h>
|
||||||
#include <folly/container/Enumerate.h>
|
#include <folly/container/Enumerate.h>
|
||||||
#include <folly/stats/Histogram.h>
|
#include <folly/stats/Histogram.h>
|
||||||
|
|
||||||
@ -127,6 +128,11 @@ inode_reader_<LoggerPolicy>::read(size_t size, off_t offset, chunk_range chunks,
|
|||||||
size_t chunksize = it->size() - offset;
|
size_t chunksize = it->size() - offset;
|
||||||
size_t chunkoff = it->offset() + offset;
|
size_t chunkoff = it->offset() + offset;
|
||||||
|
|
||||||
|
if (chunksize == 0) {
|
||||||
|
LOG_ERROR << "invalid zero-sized chunk";
|
||||||
|
return -EIO;
|
||||||
|
}
|
||||||
|
|
||||||
if (num_read + chunksize > size) {
|
if (num_read + chunksize > size) {
|
||||||
chunksize = size - num_read;
|
chunksize = size - num_read;
|
||||||
}
|
}
|
||||||
@ -137,15 +143,22 @@ inode_reader_<LoggerPolicy>::read(size_t size, off_t offset, chunk_range chunks,
|
|||||||
offset = 0;
|
offset = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// now fill the buffer
|
try {
|
||||||
size_t num_read = 0;
|
// now fill the buffer
|
||||||
for (auto& r : ranges) {
|
size_t num_read = 0;
|
||||||
auto br = r.get();
|
for (auto& r : ranges) {
|
||||||
store(num_read, br);
|
auto br = r.get();
|
||||||
num_read += br.size();
|
store(num_read, br);
|
||||||
|
num_read += br.size();
|
||||||
|
}
|
||||||
|
return num_read;
|
||||||
|
} catch (runtime_error const& e) {
|
||||||
|
LOG_ERROR << e.what();
|
||||||
|
} catch (...) {
|
||||||
|
LOG_ERROR << folly::exceptionStr(std::current_exception());
|
||||||
}
|
}
|
||||||
|
|
||||||
return num_read;
|
return -EIO;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename LoggerPolicy>
|
template <typename LoggerPolicy>
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
#include <climits>
|
#include <climits>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <ostream>
|
#include <ostream>
|
||||||
|
#include <unordered_set>
|
||||||
|
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
@ -304,6 +305,15 @@ class metadata_ : public metadata_v2::impl {
|
|||||||
auto inode = entry.inode() - chunk_index_offset_;
|
auto inode = entry.inode() - chunk_index_offset_;
|
||||||
uint32_t cur = meta_.chunk_index()[inode];
|
uint32_t cur = meta_.chunk_index()[inode];
|
||||||
uint32_t end = meta_.chunk_index()[inode + 1];
|
uint32_t end = meta_.chunk_index()[inode + 1];
|
||||||
|
if (cur > end) {
|
||||||
|
DWARFS_THROW(runtime_error,
|
||||||
|
fmt::format("invalid chunk range: [{0}..{1}]", cur, end));
|
||||||
|
}
|
||||||
|
if (end > meta_.chunks().size()) {
|
||||||
|
DWARFS_THROW(runtime_error,
|
||||||
|
fmt::format("chunk index out of range: {0} > {1}", end,
|
||||||
|
meta_.chunks().size()));
|
||||||
|
}
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
while (cur < end) {
|
while (cur < end) {
|
||||||
size += meta_.chunks()[cur++].size();
|
size += meta_.chunks()[cur++].size();
|
||||||
@ -321,8 +331,8 @@ class metadata_ : public metadata_v2::impl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void walk(entry_view entry, std::unordered_set<int>& seen,
|
||||||
walk(entry_view entry, std::function<void(entry_view)> const& func) const;
|
std::function<void(entry_view)> const& func) const;
|
||||||
|
|
||||||
std::optional<entry_view> get_entry(int inode) const {
|
std::optional<entry_view> get_entry(int inode) const {
|
||||||
inode -= inode_offset_;
|
inode -= inode_offset_;
|
||||||
@ -563,20 +573,27 @@ std::string metadata_<LoggerPolicy>::modestring(uint16_t mode) const {
|
|||||||
|
|
||||||
template <typename LoggerPolicy>
|
template <typename LoggerPolicy>
|
||||||
void metadata_<LoggerPolicy>::walk(
|
void metadata_<LoggerPolicy>::walk(
|
||||||
entry_view entry, std::function<void(entry_view)> const& func) const {
|
entry_view entry, std::unordered_set<int>& seen,
|
||||||
|
std::function<void(entry_view)> const& func) const {
|
||||||
func(entry);
|
func(entry);
|
||||||
if (S_ISDIR(entry.mode())) {
|
if (S_ISDIR(entry.mode())) {
|
||||||
|
auto inode = entry.inode();
|
||||||
|
if (!seen.emplace(inode).second) {
|
||||||
|
DWARFS_THROW(runtime_error, "cycle detected during directory walk");
|
||||||
|
}
|
||||||
auto dir = make_directory_view(entry);
|
auto dir = make_directory_view(entry);
|
||||||
for (auto cur : dir.entry_range()) {
|
for (auto cur : dir.entry_range()) {
|
||||||
walk(make_entry_view(cur), func);
|
walk(make_entry_view(cur), seen, func);
|
||||||
}
|
}
|
||||||
|
seen.erase(inode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename LoggerPolicy>
|
template <typename LoggerPolicy>
|
||||||
void metadata_<LoggerPolicy>::walk(
|
void metadata_<LoggerPolicy>::walk(
|
||||||
std::function<void(entry_view)> const& func) const {
|
std::function<void(entry_view)> const& func) const {
|
||||||
walk(root_, func);
|
std::unordered_set<int> seen;
|
||||||
|
walk(root_, seen, func);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename LoggerPolicy>
|
template <typename LoggerPolicy>
|
||||||
|
@ -19,6 +19,8 @@
|
|||||||
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
|
* along with dwarfs. If not, see <https://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <exception>
|
||||||
|
|
||||||
#include <boost/program_options.hpp>
|
#include <boost/program_options.hpp>
|
||||||
|
|
||||||
#include <folly/Conv.h>
|
#include <folly/Conv.h>
|
||||||
@ -74,8 +76,13 @@ int dwarfsbench(int argc, char** argv) {
|
|||||||
|
|
||||||
po::variables_map vm;
|
po::variables_map vm;
|
||||||
|
|
||||||
po::store(po::parse_command_line(argc, argv, opts), vm);
|
try {
|
||||||
po::notify(vm);
|
po::store(po::parse_command_line(argc, argv, opts), vm);
|
||||||
|
po::notify(vm);
|
||||||
|
} catch (po::error const& e) {
|
||||||
|
std::cerr << "error: " << e.what() << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
if (vm.count("help") or !vm.count("filesystem")) {
|
if (vm.count("help") or !vm.count("filesystem")) {
|
||||||
std::cout << "dwarfsbench (" << DWARFS_VERSION << ")\n\n"
|
std::cout << "dwarfsbench (" << DWARFS_VERSION << ")\n\n"
|
||||||
@ -83,43 +90,54 @@ int dwarfsbench(int argc, char** argv) {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
stream_logger lgr(std::cerr, logger::parse_level(log_level));
|
try {
|
||||||
filesystem_options fsopts;
|
stream_logger lgr(std::cerr, logger::parse_level(log_level));
|
||||||
|
filesystem_options fsopts;
|
||||||
|
|
||||||
fsopts.lock_mode = parse_mlock_mode(lock_mode_str);
|
fsopts.lock_mode = parse_mlock_mode(lock_mode_str);
|
||||||
fsopts.block_cache.max_bytes = parse_size_with_unit(cache_size_str);
|
fsopts.block_cache.max_bytes = parse_size_with_unit(cache_size_str);
|
||||||
fsopts.block_cache.num_workers = num_workers;
|
fsopts.block_cache.num_workers = num_workers;
|
||||||
fsopts.block_cache.decompress_ratio = folly::to<double>(decompress_ratio_str);
|
fsopts.block_cache.decompress_ratio =
|
||||||
|
folly::to<double>(decompress_ratio_str);
|
||||||
|
|
||||||
dwarfs::filesystem_v2 fs(lgr, std::make_shared<dwarfs::mmap>(filesystem),
|
dwarfs::filesystem_v2 fs(lgr, std::make_shared<dwarfs::mmap>(filesystem),
|
||||||
fsopts);
|
fsopts);
|
||||||
|
|
||||||
worker_group wg("reader", num_readers);
|
worker_group wg("reader", num_readers);
|
||||||
|
|
||||||
fs.walk([&](auto entry) {
|
fs.walk([&](auto entry) {
|
||||||
if (S_ISREG(entry.mode())) {
|
if (S_ISREG(entry.mode())) {
|
||||||
wg.add_job([&fs, entry] {
|
wg.add_job([&fs, entry] {
|
||||||
struct ::stat stbuf;
|
try {
|
||||||
if (fs.getattr(entry, &stbuf) == 0) {
|
struct ::stat stbuf;
|
||||||
std::vector<char> buf(stbuf.st_size);
|
if (fs.getattr(entry, &stbuf) == 0) {
|
||||||
int fh = fs.open(entry);
|
std::vector<char> buf(stbuf.st_size);
|
||||||
fs.read(fh, buf.data(), buf.size());
|
int fh = fs.open(entry);
|
||||||
}
|
fs.read(fh, buf.data(), buf.size());
|
||||||
});
|
}
|
||||||
}
|
} catch (runtime_error const& e) {
|
||||||
});
|
std::cerr << "error: " << e.what() << std::endl;
|
||||||
|
} catch (...) {
|
||||||
|
std::cerr << "error: "
|
||||||
|
<< folly::exceptionStr(std::current_exception())
|
||||||
|
<< std::endl;
|
||||||
|
dump_exceptions();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
wg.wait();
|
wg.wait();
|
||||||
|
} catch (runtime_error const& e) {
|
||||||
|
std::cerr << "error: " << e.what() << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
try {
|
return dwarfs::safe_main([&] { return dwarfsbench(argc, argv); });
|
||||||
return dwarfsbench(argc, argv);
|
|
||||||
} catch (std::exception const& e) {
|
|
||||||
std::cerr << "ERROR: " << folly::exceptionStr(e) << std::endl;
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user