diff --git a/env/env_test.cc b/env/env_test.cc index c341230b0ad248e2e1354f9d10195d1bd563cf26..e8fdd31bc4f44ffe9b0f8ba393dd6014097b0f38 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -1256,7 +1256,7 @@ TEST_P(EnvPosixTestWithParam, MultiRead) { // Random Read Random rnd(301 + attempt); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "PosixRandomAccessFile::MultiRead:io_uring_result", [&](void* arg) { + "UpdateResults:io_uring_result", [&](void* arg) { if (attempt > 0) { // No failure in the first attempt. size_t& bytes_read = *static_cast(arg); @@ -1326,7 +1326,7 @@ TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) { const int num_reads = rnd.Uniform(512) + 1; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "PosixRandomAccessFile::MultiRead:io_uring_result", [&](void* arg) { + "UpdateResults:io_uring_result", [&](void* arg) { if (attempt > 5) { // Improve partial result rates in second half of the run to // cover the case of repeated partial results. @@ -3203,10 +3203,11 @@ IOStatus ReadAsyncRandomAccessFile::ReadAsync( // Submit read request asynchronously. std::function submit_request = - [&opts, cb, cb_arg, io_handle, del_fn, dbg, create_io_error, - this](FSReadRequest _req) { + [&opts, cb, cb_arg, dbg, create_io_error, this](FSReadRequest _req) { if (!create_io_error) { - target()->ReadAsync(_req, opts, cb, cb_arg, io_handle, del_fn, dbg); + _req.status = target()->Read(_req.offset, _req.len, opts, + &(_req.result), _req.scratch, dbg); + cb(_req, cb_arg); } }; diff --git a/env/fs_posix.cc b/env/fs_posix.cc index db5319615a1ac4759c44f3bebf57afb312e0138a..23ef89784a669ec28e2d936c5bb19e20d7a25fde 100644 --- a/env/fs_posix.cc +++ b/env/fs_posix.cc @@ -1043,6 +1043,76 @@ class PosixFileSystem : public FileSystem { } #endif // ROCKSDB_IOURING_PRESENT + // EXPERIMENTAL + // + // TODO akankshamahajan: Update Poll API to take into account min_completions + // and returns if number of handles in io_handles (any order) completed is + // equal to atleast min_completions. + virtual IOStatus Poll(std::vector& io_handles, + size_t /*min_completions*/) override { +#if defined(ROCKSDB_IOURING_PRESENT) + // io_uring_queue_init. + struct io_uring* iu = nullptr; + if (thread_local_io_urings_) { + iu = static_cast(thread_local_io_urings_->Get()); + } + + // Init failed, platform doesn't support io_uring. + if (iu == nullptr) { + return IOStatus::NotSupported("Poll"); + } + + for (size_t i = 0; i < io_handles.size(); i++) { + // The request has been completed in earlier runs. + if ((static_cast(io_handles[i]))->is_finished) { + continue; + } + // Loop until IO for io_handles[i] is completed. + while (true) { + // io_uring_wait_cqe. + struct io_uring_cqe* cqe = nullptr; + ssize_t ret = io_uring_wait_cqe(iu, &cqe); + if (ret) { + // abort as it shouldn't be in indeterminate state and there is no + // good way currently to handle this error. + abort(); + } + + // Step 3: Populate the request. + assert(cqe != nullptr); + Posix_IOHandle* posix_handle = + static_cast(io_uring_cqe_get_data(cqe)); + assert(posix_handle->iu == iu); + if (posix_handle->iu != iu) { + return IOStatus::IOError(""); + } + // Reset cqe data to catch any stray reuse of it + static_cast(cqe)->user_data = 0xd5d5d5d5d5d5d5d5; + + FSReadRequest req; + req.scratch = posix_handle->scratch; + req.offset = posix_handle->offset; + req.len = posix_handle->len; + size_t finished_len = 0; + UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len, + true /*async_read*/, finished_len, &req); + posix_handle->is_finished = true; + io_uring_cqe_seen(iu, cqe); + posix_handle->cb(req, posix_handle->cb_arg); + (void)finished_len; + + if (static_cast(io_handles[i]) == posix_handle) { + break; + } + } + } + return IOStatus::OK(); +#else + (void)io_handles; + return IOStatus::NotSupported("Poll"); +#endif + } + #if defined(ROCKSDB_IOURING_PRESENT) // io_uring instance std::unique_ptr thread_local_io_urings_; diff --git a/env/io_posix.cc b/env/io_posix.cc index 56494d5651de82adeb4395c1df7d2b7c4fd2b5f5..be85a7c6d793d924a5e3116b654e8fe8a75aee7a 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -744,47 +744,31 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, wrap_cache.erase(wrap_check); FSReadRequest* req = req_wrap->req; - if (cqe->res < 0) { - req->result = Slice(req->scratch, 0); - req->status = IOError("Req failed", filename_, cqe->res); - } else { - size_t bytes_read = static_cast(cqe->res); - TEST_SYNC_POINT_CALLBACK( - "PosixRandomAccessFile::MultiRead:io_uring_result", &bytes_read); - if (bytes_read == req_wrap->iov.iov_len) { - req->result = Slice(req->scratch, req->len); + UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len, + false /*async_read*/, req_wrap->finished_len, req); + int32_t res = cqe->res; + if (res == 0) { + /// cqe->res == 0 can means EOF, or can mean partial results. See + // comment + // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435 + // Fall back to pread in this case. + if (use_direct_io() && !IsSectorAligned(req_wrap->finished_len, + GetRequiredBufferAlignment())) { + // Bytes reads don't fill sectors. Should only happen at the end + // of the file. + req->result = Slice(req->scratch, req_wrap->finished_len); req->status = IOStatus::OK(); - } else if (bytes_read == 0) { - // cqe->res == 0 can means EOF, or can mean partial results. See - // comment - // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435 - // Fall back to pread in this case. - if (use_direct_io() && - !IsSectorAligned(req_wrap->finished_len, - GetRequiredBufferAlignment())) { - // Bytes reads don't fill sectors. Should only happen at the end - // of the file. - req->result = Slice(req->scratch, req_wrap->finished_len); - req->status = IOStatus::OK(); - } else { - Slice tmp_slice; - req->status = - Read(req->offset + req_wrap->finished_len, - req->len - req_wrap->finished_len, options, &tmp_slice, - req->scratch + req_wrap->finished_len, dbg); - req->result = - Slice(req->scratch, req_wrap->finished_len + tmp_slice.size()); - } - } else if (bytes_read < req_wrap->iov.iov_len) { - assert(bytes_read > 0); - assert(bytes_read + req_wrap->finished_len < req->len); - req_wrap->finished_len += bytes_read; - incomplete_rq_list.push_back(req_wrap); } else { - req->result = Slice(req->scratch, 0); - req->status = IOError("Req returned more bytes than requested", - filename_, cqe->res); + Slice tmp_slice; + req->status = + Read(req->offset + req_wrap->finished_len, + req->len - req_wrap->finished_len, options, &tmp_slice, + req->scratch + req_wrap->finished_len, dbg); + req->result = + Slice(req->scratch, req_wrap->finished_len + tmp_slice.size()); } + } else if (res > 0 && res < static_cast(req_wrap->iov.iov_len)) { + incomplete_rq_list.push_back(req_wrap); } io_uring_cqe_seen(iu, cqe); } @@ -872,6 +856,80 @@ IOStatus PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) { #endif } +IOStatus PosixRandomAccessFile::ReadAsync( + FSReadRequest& req, const IOOptions& /*opts*/, + std::function cb, void* cb_arg, + void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) { + if (use_direct_io()) { + assert(IsSectorAligned(req.offset, GetRequiredBufferAlignment())); + assert(IsSectorAligned(req.len, GetRequiredBufferAlignment())); + assert(IsSectorAligned(req.scratch, GetRequiredBufferAlignment())); + } + +#if defined(ROCKSDB_IOURING_PRESENT) + // io_uring_queue_init. + struct io_uring* iu = nullptr; + if (thread_local_io_urings_) { + iu = static_cast(thread_local_io_urings_->Get()); + if (iu == nullptr) { + iu = CreateIOUring(); + if (iu != nullptr) { + thread_local_io_urings_->Reset(iu); + } + } + } + + // Init failed, platform doesn't support io_uring. + if (iu == nullptr) { + return IOStatus::NotSupported("ReadAsync"); + } + + // Allocate io_handle. + IOHandleDeleter deletefn = [](void* args) -> void { + delete (static_cast(args)); + args = nullptr; + }; + + Posix_IOHandle* posix_handle = new Posix_IOHandle(); + *io_handle = static_cast(posix_handle); + *del_fn = deletefn; + + // Initialize Posix_IOHandle. + posix_handle->iu = iu; + posix_handle->iov.iov_base = posix_handle->scratch; + posix_handle->iov.iov_len = posix_handle->len; + posix_handle->cb = cb; + posix_handle->cb_arg = cb_arg; + posix_handle->offset = req.offset; + posix_handle->len = req.len; + posix_handle->scratch = req.scratch; + + // Step 3: io_uring_sqe_set_data + struct io_uring_sqe* sqe; + sqe = io_uring_get_sqe(iu); + + io_uring_prep_readv(sqe, fd_, &posix_handle->iov, 1, posix_handle->offset); + + io_uring_sqe_set_data(sqe, posix_handle); + + // Step 4: io_uring_submit + ssize_t ret = io_uring_submit(iu); + if (ret < 0) { + fprintf(stderr, "io_uring_submit error: %ld\n", long(ret)); + return IOStatus::IOError("io_uring_submit() requested but returned " + + ToString(ret)); + } + return IOStatus::OK(); +#else + (void)req; + (void)cb; + (void)cb_arg; + (void)io_handle; + (void)del_fn; + return IOStatus::NotSupported("ReadAsync"); +#endif +} + /* * PosixMmapReadableFile * diff --git a/env/io_posix.h b/env/io_posix.h index 94f579c500f78191b57b57807969a56f833226c7..606e932bb57661014e3c6d97f8d88af0b49e6ebb 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -13,14 +13,17 @@ #include #endif #include + #include #include #include #include + #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/file_system.h" #include "rocksdb/io_status.h" +#include "test_util/sync_point.h" #include "util/mutexlock.h" #include "util/thread_local.h" @@ -49,6 +52,54 @@ class PosixHelper { size_t* size); }; +#if defined(ROCKSDB_IOURING_PRESENT) +struct Posix_IOHandle { + struct iovec iov; + struct io_uring* iu; + std::function cb; + void* cb_arg; + uint64_t offset; + size_t len; + char* scratch; + bool is_finished = false; +}; + +inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name, + size_t len, size_t iov_len, bool async_read, + size_t& finished_len, FSReadRequest* req) { + if (cqe->res < 0) { + req->result = Slice(req->scratch, 0); + req->status = IOError("Req failed", file_name, cqe->res); + } else { + size_t bytes_read = static_cast(cqe->res); + TEST_SYNC_POINT_CALLBACK("UpdateResults::io_uring_result", &bytes_read); + if (bytes_read == iov_len) { + req->result = Slice(req->scratch, req->len); + req->status = IOStatus::OK(); + } else if (bytes_read == 0) { + if (async_read) { + // No bytes read. It can means EOF. + req->result = Slice(req->scratch, 0); + req->status = IOStatus::OK(); + } + } else if (bytes_read < iov_len) { + assert(bytes_read > 0); + if (async_read) { + req->result = Slice(req->scratch, bytes_read); + req->status = IOStatus::OK(); + } else { + assert(bytes_read + finished_len < len); + finished_len += bytes_read; + } + } else { + req->result = Slice(req->scratch, 0); + req->status = IOError("Req returned more bytes than requested", file_name, + cqe->res); + } + } +} +#endif + #ifdef OS_LINUX // Files under a specific directory have the same logical block size. // This class caches the logical block size for the specified directories to @@ -210,6 +261,11 @@ class PosixRandomAccessFile : public FSRandomAccessFile { virtual size_t GetRequiredBufferAlignment() const override { return logical_sector_size_; } + // EXPERIMENTAL + virtual IOStatus ReadAsync( + FSReadRequest& req, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) override; }; class PosixWritableFile : public FSWritableFile { diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 201cee95a0f6d41aee0df28e3632b7bbcc0683a1..1dec4cf85b6ffdffb2e552d4b0f25b0c5cc3e27b 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -651,7 +651,8 @@ class FileSystem : public Customizable { // Underlying FS is required to support Poll API. Poll implementation should // ensure that the callback gets called at IO completion, and return only // after the callback has been called. - // + // If Poll returns partial results for any reads, its caller reponsibility to + // call Read or ReadAsync in order to get the remaining bytes. // // Default implementation is to return IOStatus::OK.