Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 72 additions & 36 deletions src/fdb5/toc/TocHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -720,12 +720,43 @@ class SubtocPreloader {
}
AutoFDCloser& operator=(const AutoFDCloser&) = delete;
~AutoFDCloser() {
if (fd_ > 0) {
if (fd_ >= 0) {
::close(fd_); // n.b. ignore return value
}
}
};

/// Ensure we are cleaning up any aio request in case of stack unwinding
struct AioCanceller {
std::vector<aiocb*>& ptrs_;

explicit AioCanceller(std::vector<aiocb*>& ptrs) : ptrs_(ptrs) {}
~AioCanceller() {
for (auto* p : ptrs_) {
if (p) {
::aio_cancel(p->aio_fildes, p);
}
}
for (auto* p : ptrs_) {
if (!p) {
continue;
}

int err = ::aio_error(p);
if (err == EINVAL) {
continue;
} // not a valid outstanding request

while (err == EINPROGRESS) {
::aio_suspend(&p, 1, nullptr);
err = ::aio_error(p);
}
Comment thread
Ozaq marked this conversation as resolved.
// err is now 0 (completed), ECANCELED, or another errno.
// Must call aio_return exactly once to reap.
::aio_return(p);
}
Comment thread
Ozaq marked this conversation as resolved.
}
};
const Key& parentKey_;

mutable std::map<eckit::LocalPathName, std::unique_ptr<TocHandler>> subTocReadCache_;
Expand All @@ -748,20 +779,17 @@ class SubtocPreloader {
#endif

std::vector<aiocb> aiocbs(paths_.size());
std::vector<Buffer> buffers(paths_.size());
std::vector<aiocb*> aiocbPtrs(aiocbs.size(), nullptr);
std::vector<std::vector<char>> buffers(paths_.size());
std::vector<AutoFDCloser> closers;
std::vector<char> done(paths_.size());
::memset(done.data(), 0, done.size() * sizeof(char));
::memset(aiocbs.data(), 0, sizeof(aiocb) * aiocbs.size());
AioCanceller ac(aiocbPtrs);

{
eckit::Timer sstime("subtocs.statsubmit", Log::debug<LibFdb5>());
for (int i = 0; i < aiocbs.size(); ++i) {
for (size_t i = 0; i < aiocbs.size(); ++i) {

const eckit::LocalPathName& path = paths_[i];

int fd;
SYSCALL2((fd = ::open(path.localPath(), iomode)), path);
const int fd = SYSCALL2(::open(path.localPath(), iomode), path);
closers.emplace_back(AutoFDCloser{fd});
eckit::Length tocSize = path.size();

Expand All @@ -777,64 +805,72 @@ class SubtocPreloader {
aio.aio_buf = buffers[i].data();

SYSCALL(::aio_read(&aio));
aiocbPtrs[i] = &aio;
}
}

std::vector<aiocb*> aiocbPtrs(aiocbs.size());
for (int i = 0; i < aiocbs.size(); ++i) {
aiocbPtrs[i] = &aiocbs[i];
}

int doneCount = 0;

{
eckit::Timer sstime("subtocs.collect", Log::debug<LibFdb5>());

while (doneCount < aiocbs.size()) {
while (std::any_of(std::begin(aiocbPtrs), std::end(aiocbPtrs),
[](const auto ptr) { return ptr != nullptr; })) {

// Now wait until data is ready from at least one read

struct timespec timeout{};
timeout.tv_sec = 1;
errno = 0;
while (::aio_suspend(aiocbPtrs.data(), aiocbs.size(), nullptr) < 0) {
if (errno != EINTR) {
throw FailedSystemCall("aio_suspend", Here(), errno);
while (::aio_suspend(aiocbPtrs.data(), aiocbPtrs.size(), &timeout) < 0) {
switch (errno) {
Comment thread
Ozaq marked this conversation as resolved.
case EINTR:
// got interrupted, continue sleeping
continue;
case EAGAIN:
// timeout reached

LOG_DEBUG_LIB(LibFdb5) << "Reading subtocs - timeout(1s) reached for:\n";
for (size_t index = 0; index < aiocbPtrs.size(); ++index) {
if (aiocbPtrs[index] == nullptr) {
continue;
}
LOG_DEBUG_LIB(LibFdb5) << paths_[index].localPath() << "\n";
}
LOG_DEBUG_LIB(LibFdb5) << "retrying." << std::endl;
continue;
default:
throw FailedSystemCall("aio_suspend", Here(), errno);
}
}

// Find which one(s) are ready

for (int n = 0; n < aiocbs.size(); ++n) {

if (done[n]) {
for (size_t n = 0; n < aiocbPtrs.size(); ++n) {
if (aiocbPtrs[n] == nullptr) {
// already completed, skip
continue;
}

int e = ::aio_error(&aiocbs[n]);
const int e = ::aio_error(aiocbPtrs[n]);
if (e == EINPROGRESS) {
continue;
}

if (e == 0) {

ssize_t len = ::aio_return(&aiocbs[n]);
auto aioptr = aiocbPtrs[n];
aiocbPtrs[n] = nullptr;
const auto len = SYSCALL(::aio_return(aioptr));
if (len != buffers[n].size()) {
aiocbs[n].aio_nbytes = len;
// File has been truncated since stat has been called
throw eckit::ShortFile(paths_[n], Here());
}

bool grow = true;
auto cachedToc = std::make_unique<eckit::MemoryHandle>(buffers[n].size(), grow);

{
cachedToc->openForWrite(aiocbs[n].aio_nbytes);
cachedToc->openForWrite(buffers[n].size());
AutoClose closer(*cachedToc);
ASSERT(cachedToc->write(buffers[n].data(), aiocbs[n].aio_nbytes) == aiocbs[n].aio_nbytes);
ASSERT(cachedToc->write(buffers[n].data(), buffers[n].size()) == buffers[n].size());
}
ASSERT(subTocReadCache_.find(paths_[n]) == subTocReadCache_.end());
subTocReadCache_.emplace(
paths_[n], std::make_unique<TocHandler>(paths_[n], parentKey_, cachedToc.release()));

done[n] = true;
doneCount++;
}
else {
throw FailedSystemCall("aio_error", Here(), e);
Expand Down
Loading