diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..92cc6cce --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +.vscode/ +bazel-bin +bazel-out +bazel-riegeli +bazel-testlogs +.bazelversion +configure.bazelrc +io_uring_test \ No newline at end of file diff --git a/WORKSPACE b/WORKSPACE index 3f8f0e98..45ab1118 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -14,6 +14,13 @@ http_archive( ], ) +http_archive( + name = "com_google_googletest", + sha256 = "5cf189eb6847b4f8fc603a3ffff3b0771c08eec7dd4bd961bfd45477dd13eb73", + strip_prefix = "googletest-609281088cfefc76f9d0ce82e1ff6c30cc3591e5", + urls = ["https://github.com/google/googletest/archive/609281088cfefc76f9d0ce82e1ff6c30cc3591e5.zip"], +) + http_archive( name = "org_brotli", sha256 = "6e69be238ff61cef589a3fa88da11b649c7ff7a5932cb12d1e6251c8c2e17a2f", @@ -68,6 +75,16 @@ http_archive( ], ) +http_archive( + name = "liburing", + build_file = "//third_party:liburing.BUILD", + sha256 = "ca069ecc4aa1baf1031bd772e4e97f7e26dfb6bb733d79f70159589b22ab4dc0", + strip_prefix = "liburing-liburing-2.0", + urls = [ + "https://github.com/axboe/liburing/archive/refs/tags/liburing-2.0.tar.gz", + ], +) + http_archive( name = "highwayhash", build_file = "//third_party:highwayhash.BUILD", diff --git a/io_uring_test/BUILD b/io_uring_test/BUILD new file mode 100644 index 00000000..d5dcf9a0 --- /dev/null +++ b/io_uring_test/BUILD @@ -0,0 +1,21 @@ +cc_test( + name = "io_uring_write_test", + srcs = ["io_uring_write_test.cc"], + deps = [ + "//riegeli/bytes:fd_io_uring_writer", + "//riegeli/records:record_writer", + "//riegeli/bytes:fd_reader", + "//riegeli/records:record_reader", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "io_uring_test", + srcs = ["io_uring_test.cc"], + deps = [ + "//riegeli/iouring:fd_async_io_uring", + "//riegeli/iouring:fd_sync_io_uring", + "@com_google_googletest//:gtest_main", + ], +) \ No newline at end of file diff --git a/io_uring_test/io_uring_test.cc b/io_uring_test/io_uring_test.cc new file mode 100644 index 00000000..3c70f158 --- /dev/null +++ b/io_uring_test/io_uring_test.cc @@ -0,0 +1,177 @@ +#include "riegeli/iouring/fd_async_io_uring.h" +#include "riegeli/iouring/fd_sync_io_uring.h" + +#include +#include +#include + +#include "gtest/gtest.h" + +namespace iouringtest { +using IoUringPtr = std::unique_ptr; + +TEST(IoUringTest, CreateAsynIoUring) { + IoUringPtr IoUring; + riegeli::FdIoUringOptions options; + options.set_async(true); + IoUring = std::make_unique(options, 0); + + EXPECT_EQ(IoUring -> Mode(), riegeli::FdIoUring::IoUringMode::ASYNCIOURING); +} + +TEST(IoUringTest, CreateSynIoUring) { + IoUringPtr IoUring; + riegeli::FdIoUringOptions options; + options.set_async(false); + IoUring = std::make_unique(options, 0); + + EXPECT_EQ(IoUring -> Mode(), riegeli::FdIoUring::IoUringMode::SYNCIOURING); +} + +TEST(IoUringTest, SyncUnRegisterFd) { + IoUringPtr IoUring; + riegeli::FdIoUringOptions options; + options.set_async(false); + options.set_fd_register(false); + IoUring = std::make_unique(options, 0); + + EXPECT_EQ(IoUring -> fd_register(), false); + EXPECT_EQ(IoUring -> fd(), -1); +} + +TEST(IoUringTest, AsyncUnRegisterFd) { + IoUringPtr IoUring; + riegeli::FdIoUringOptions options; + options.set_async(true); + options.set_fd_register(false); + IoUring = std::make_unique(options, 0); + + EXPECT_EQ(IoUring -> fd_register(), false); + EXPECT_EQ(IoUring -> fd(), -1); +} + +TEST(IoUringTest, SyncRegisterAndUnregisterFd) { + IoUringPtr IoUring; + riegeli::FdIoUringOptions options; + options.set_async(false); + options.set_fd_register(true); + IoUring = std::make_unique(options, 0); + + EXPECT_EQ(IoUring -> fd_register(), true); + EXPECT_EQ(IoUring -> fd(), 0); + + IoUring -> UnRegisterFd(); + EXPECT_EQ(IoUring -> fd_register(), false); + EXPECT_EQ(IoUring -> fd(), -1); + +} + +TEST(IoUringTest, AsyncUnRegisterAndUnregisterFd) { + IoUringPtr IoUring; + riegeli::FdIoUringOptions options; + options.set_async(true); + options.set_fd_register(true); + IoUring = std::make_unique(options, 0); + + EXPECT_EQ(IoUring -> fd_register(), true); + EXPECT_EQ(IoUring -> fd(), 0); + + IoUring -> UnRegisterFd(); + EXPECT_EQ(IoUring -> fd_register(), false); + EXPECT_EQ(IoUring -> fd(), -1); +} + +TEST(IoUringTest, SyncRegisterAndUpdateFd) { + IoUringPtr IoUring; + riegeli::FdIoUringOptions options; + options.set_async(false); + options.set_fd_register(true); + IoUring = std::make_unique(options, 0); + + EXPECT_EQ(IoUring -> fd_register(), true); + EXPECT_EQ(IoUring -> fd(), 0); + + std::string path = std::string(getenv("TEST_TMPDIR")) + "/io_uring_test_file"; + int fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC); + IoUring -> RegisterFd(fd); + EXPECT_EQ(IoUring -> fd_register(), true); + EXPECT_EQ(IoUring -> fd(), fd); + +} + +TEST(IoUringTest, AsyncUnRegisterAndUpdateFd) { + IoUringPtr IoUring; + riegeli::FdIoUringOptions options; + options.set_async(true); + options.set_fd_register(true); + IoUring = std::make_unique(options, 0); + + EXPECT_EQ(IoUring -> fd_register(), true); + EXPECT_EQ(IoUring -> fd(), 0); + + std::string path = std::string(getenv("TEST_TMPDIR")) + "/io_uring_test_file"; + int fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC); + IoUring -> RegisterFd(fd); + EXPECT_EQ(IoUring -> fd_register(), true); + EXPECT_EQ(IoUring -> fd(), fd); +} + +TEST(IoUringTest, SyncDefaultSize) { + IoUringPtr IoUring; + riegeli::FdIoUringOptions options; + options.set_async(false); + IoUring = std::make_unique(options, 0); + + EXPECT_EQ(IoUring -> size(), 512); +} + +TEST(IoUringTest, AsyncDefaultSize) { + IoUringPtr IoUring; + riegeli::FdIoUringOptions options; + options.set_async(true); + IoUring = std::make_unique(options, 0); + + EXPECT_EQ(IoUring -> size(), 512); +} + +TEST(IoUringTest, SyncSize) { + IoUringPtr IoUring; + riegeli::FdIoUringOptions options; + options.set_async(false); + options.set_size(10); + IoUring = std::make_unique(options, 0); + + EXPECT_EQ(IoUring -> size(), 16); +} + +TEST(IoUringTest, AsyncSize) { + IoUringPtr IoUring; + riegeli::FdIoUringOptions options; + options.set_async(true); + options.set_size(10); + IoUring = std::make_unique(options, 0); + + EXPECT_EQ(IoUring -> size(), 16); +} + +TEST(IoUringTest, SyncMaxSize) { + IoUringPtr IoUring; + riegeli::FdIoUringOptions options; + options.set_async(false); + options.set_size(4098); + IoUring = std::make_unique(options, 0); + + EXPECT_EQ(IoUring -> size(), 4096); +} + +TEST(IoUringTest, AsyncMaxSize) { + IoUringPtr IoUring; + riegeli::FdIoUringOptions options; + options.set_async(true); + options.set_size(4098); + IoUring = std::make_unique(options, 0); + + EXPECT_EQ(IoUring -> size(), 4096); +} + +} \ No newline at end of file diff --git a/io_uring_test/io_uring_write_test.cc b/io_uring_test/io_uring_write_test.cc new file mode 100644 index 00000000..5896a659 --- /dev/null +++ b/io_uring_test/io_uring_write_test.cc @@ -0,0 +1,217 @@ +#include + +#include "riegeli/bytes/fd_io_uring_writer.h" +#include "riegeli/records/record_writer.h" +#include "riegeli/bytes/fd_reader.h" +#include "riegeli/records/record_reader.h" + +#include "gtest/gtest.h" + +namespace iouringtest { +using WritePtr = std::unique_ptr>>; +using ReadPtr = std::unique_ptr>>; + +void Sync(WritePtr& writer, const std::string &file) { + riegeli::FdIoUringOptions fd_io_uring_options; + fd_io_uring_options.set_async(false); + fd_io_uring_options.set_fd_register(false); + + riegeli::FdIoUringWriterBase::Options fd_w_options; + fd_w_options.set_io_uring_option(fd_io_uring_options); + + riegeli::RecordWriterBase::Options w_options; + + riegeli::FdIoUringWriter<> fd_writer(file, O_WRONLY | O_CREAT | O_TRUNC, + fd_w_options); + + writer = std::make_unique>>( + std::move(fd_writer), std::move(w_options)); +} + +void SyncFd(WritePtr& writer, const std::string &file) { + riegeli::FdIoUringOptions fd_io_uring_options; + fd_io_uring_options.set_async(false); + fd_io_uring_options.set_fd_register(true); + + riegeli::FdIoUringWriterBase::Options fd_w_options; + fd_w_options.set_io_uring_option(fd_io_uring_options); + + riegeli::RecordWriterBase::Options w_options; + + riegeli::FdIoUringWriter<> fd_writer(file, O_WRONLY | O_CREAT | O_TRUNC, + fd_w_options); + + writer = std::make_unique>>( + std::move(fd_writer), std::move(w_options)); +} + +void Async(WritePtr& writer, const std::string &file) { + riegeli::FdIoUringOptions fd_io_uring_options; + fd_io_uring_options.set_async(true); + fd_io_uring_options.set_fd_register(false); + + riegeli::FdIoUringWriterBase::Options fd_w_options; + fd_w_options.set_io_uring_option(fd_io_uring_options); + + riegeli::RecordWriterBase::Options w_options; + + riegeli::FdIoUringWriter<> fd_writer(file, O_WRONLY | O_CREAT | O_TRUNC, + fd_w_options); + + writer = std::make_unique>>( + std::move(fd_writer), std::move(w_options)); +} + +void AsyncFd(WritePtr& writer, const std::string &file) { + riegeli::FdIoUringOptions fd_io_uring_options; + fd_io_uring_options.set_async(true); + fd_io_uring_options.set_fd_register(true); + + riegeli::FdIoUringWriterBase::Options fd_w_options; + fd_w_options.set_io_uring_option(fd_io_uring_options); + + riegeli::RecordWriterBase::Options w_options; + + riegeli::FdIoUringWriter<> fd_writer(file, O_WRONLY | O_CREAT | O_TRUNC, + fd_w_options); + + writer = std::make_unique>>( + std::move(fd_writer), std::move(w_options)); +} + +void WriteData(WritePtr &writer) { + for(int i = 1; i <= 10000000; ++i) { + std::string temp = std::to_string(i); + writer -> WriteRecord(temp); + } + writer -> Close(); +} + +void CheckData(const std::string &file) { + riegeli::RecordReaderBase::Options r_options; + riegeli::FdReaderBase::Options fd_r_options; + riegeli::FdReader<> fd_reader(file, O_RDONLY, + fd_r_options); + ReadPtr reader = std::make_unique>>( + std::move(fd_reader), std::move(r_options)); + + std::string record; + int num = 1; + while(reader -> ReadRecord(record)) { + EXPECT_EQ(record, std::to_string(num)); + ++num; + } + EXPECT_EQ(num, 10000001); + reader -> Close(); +} + +void WriteLargeData(WritePtr &writer) { + for(int i = 1; i <= 10000; ++i) { + std::string element = std::to_string(i); + std::string temp = element; + for(int j = 0; j < 100000; ++j) { + temp += element; + } + writer -> WriteRecord(temp); + } + writer -> Close(); +} + +void CheckLargeData(const std::string &file) { + riegeli::RecordReaderBase::Options r_options; + riegeli::FdReaderBase::Options fd_r_options; + riegeli::FdReader<> fd_reader(file, O_RDONLY, + fd_r_options); + ReadPtr reader = std::make_unique>>( + std::move(fd_reader), std::move(r_options)); + + std::string record; + int num = 1; + while(reader -> ReadRecord(record)) { + std::string element = std::to_string(num); + std::string temp = element; + for(int j = 0; j < 100000; ++j) { + temp += element; + } + EXPECT_EQ(record, temp); + ++num; + } + EXPECT_EQ(num, 10001); + reader -> Close(); +} + +TEST(IoUringTest, SynWrite) { + std::string file = std::string(getenv("TEST_TMPDIR")) + "/write_test_file"; + WritePtr writer; + Sync(writer, file); + + WriteData(writer); + CheckData(file); +} + +TEST(IoUringTest, SynFdWrite) { + std::string file = std::string(getenv("TEST_TMPDIR")) + "/write_test_file"; + WritePtr writer; + SyncFd(writer, file); + + WriteData(writer); + CheckData(file); +} + + +TEST(IoUringTest, SynWriteLargeData) { + std::string file = std::string(getenv("TEST_TMPDIR")) + "/write_test_file"; + WritePtr writer; + Sync(writer, file); + + WriteLargeData(writer); + CheckLargeData(file); +} + +TEST(IoUringTest, SynFdWriteLargeData) { + std::string file = std::string(getenv("TEST_TMPDIR")) + "/write_test_file"; + WritePtr writer; + SyncFd(writer, file); + + WriteLargeData(writer); + CheckLargeData(file); +} + +TEST(IoUringTest, AsynWrite) { + std::string file = std::string(getenv("TEST_TMPDIR")) + "/write_test_file"; + WritePtr writer; + Sync(writer, file); + + WriteData(writer); + CheckData(file); +} + +TEST(IoUringTest, AsynFdWrite) { + std::string file = std::string(getenv("TEST_TMPDIR")) + "/write_test_file"; + WritePtr writer; + SyncFd(writer, file); + + WriteData(writer); + CheckData(file); +} + + +TEST(IoUringTest, AsynWriteLargeData) { + std::string file = std::string(getenv("TEST_TMPDIR")) + "/write_test_file"; + WritePtr writer; + Async(writer, file); + + WriteLargeData(writer); + CheckLargeData(file); +} + +TEST(IoUringTest, AsynFdWriteLargeData) { + std::string file = std::string(getenv("TEST_TMPDIR")) + "/write_test_file"; + WritePtr writer; + AsyncFd(writer, file); + + WriteLargeData(writer); + CheckLargeData(file); +} + +} \ No newline at end of file diff --git a/riegeli/bytes/BUILD b/riegeli/bytes/BUILD index 3fae4a58..aad577e8 100644 --- a/riegeli/bytes/BUILD +++ b/riegeli/bytes/BUILD @@ -471,6 +471,26 @@ cc_library( ], ) +cc_library( + name = "fd_io_uring_writer", + srcs = [ + "fd_dependency.h", + "fd_io_uring_writer.cc", + ], + hdrs = ["fd_io_uring_writer.h"], + deps = [ + ":buffered_writer", + "//riegeli/base", + "//riegeli/base:status", + "//riegeli/iouring:fd_sync_io_uring", + "//riegeli/iouring:fd_async_io_uring", + "@com_google_absl//absl/base:core_headers", + "@com_google_absl//absl/status", + "@com_google_absl//absl/strings", + "@com_google_absl//absl/types:optional", + ], +) + cc_library( name = "fd_reader", srcs = [ diff --git a/riegeli/bytes/fd_io_uring_writer.cc b/riegeli/bytes/fd_io_uring_writer.cc new file mode 100644 index 00000000..a2a52ef6 --- /dev/null +++ b/riegeli/bytes/fd_io_uring_writer.cc @@ -0,0 +1,309 @@ +// Make `pwrite()` and `ftruncate()` available. +#if !defined(_XOPEN_SOURCE) || _XOPEN_SOURCE < 500 +#undef _XOPEN_SOURCE +#define _XOPEN_SOURCE 500 +#endif + +// Make `off_t` 64-bit even on 32-bit systems. +#undef _FILE_OFFSET_BITS +#define _FILE_OFFSET_BITS 64 + +#include "riegeli/bytes/fd_io_uring_writer.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "absl/base/optimization.h" +#include "absl/status/status.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "riegeli/base/base.h" +#include "riegeli/base/errno_mapping.h" +#include "riegeli/base/status.h" +#include "riegeli/bytes/buffered_writer.h" + +namespace riegeli { + +void FdIoUringWriterBase::Initialize(int dest, absl::optional assumed_pos, + absl::optional independent_pos, FdIoUringOptions io_uring_option) { + RIEGELI_ASSERT_GE(dest, 0) + << "Failed precondition of FdIoUringWriter: negative file descriptor"; + SetFilename(dest); + InitializeFdIoUring(io_uring_option, dest); + InitializePos(dest, assumed_pos, independent_pos); +} + +inline void FdIoUringWriterBase::SetFilename(int dest) { + if (dest == 1) { + filename_ = "/dev/stdout"; + } else if (dest == 2) { + filename_ = "/dev/stderr"; + } else { + filename_ = absl::StrCat("/proc/self/fd/", dest); + } +} + +int FdIoUringWriterBase::OpenFd(absl::string_view filename, int flags, + mode_t permissions) { + RIEGELI_ASSERT((flags & O_ACCMODE) == O_WRONLY || + (flags & O_ACCMODE) == O_RDWR) + << "Failed precondition of FdIoUringWriter: " + "flags must include either O_WRONLY or O_RDWR"; + // TODO: When `absl::string_view` becomes C++17 `std::string_view`: + // `filename_ = filename` + filename_.assign(filename.data(), filename.size()); +again: + const int dest = open(filename_.c_str(), flags, permissions); + if (ABSL_PREDICT_FALSE(dest < 0)) { + if (errno == EINTR) goto again; + FailOperation("open()"); + return -1; + } + return dest; +} + +inline void FdIoUringWriterBase::InitializePos( + int dest, absl::optional assumed_pos, + absl::optional independent_pos) { + int flags = 0; + if (assumed_pos == absl::nullopt && independent_pos == absl::nullopt) { + // Flags are needed only if `assumed_pos == absl::nullopt` and + // `independent_pos == absl::nullopt`. Avoid `fcntl()` otherwise. + flags = fcntl(dest, F_GETFL); + if (ABSL_PREDICT_FALSE(flags < 0)) { + FailOperation("fcntl()"); + return; + } + } + return InitializePos(dest, flags, assumed_pos, independent_pos); +} + +void FdIoUringWriterBase::InitializePos(int dest, int flags, + absl::optional assumed_pos, + absl::optional independent_pos) { + RIEGELI_ASSERT(assumed_pos == absl::nullopt || + independent_pos == absl::nullopt) + << "Failed precondition of FdIoUringWriterBase: " + "Options::assumed_pos() and Options::independent_pos() are both set"; + RIEGELI_ASSERT(!supports_random_access_) + << "Failed precondition of FdIoUringWriterBase::InitializePos(): " + "supports_random_access_ not reset"; + RIEGELI_ASSERT(!has_independent_pos_) + << "Failed precondition of FdIoUringWriterBase::InitializePos(): " + "has_independent_pos_ not reset"; + if (assumed_pos != absl::nullopt) { + if (ABSL_PREDICT_FALSE(*assumed_pos > + Position{std::numeric_limits::max()})) { + FailOverflow(); + return; + } + set_start_pos(*assumed_pos); + } else if (independent_pos != absl::nullopt) { + supports_random_access_ = true; + has_independent_pos_ = true; + if (ABSL_PREDICT_FALSE(*independent_pos > + Position{std::numeric_limits::max()})) { + FailOverflow(); + return; + } + set_start_pos(*independent_pos); + } else { + const off_t file_pos = + lseek(dest, 0, (flags & O_APPEND) != 0 ? SEEK_END : SEEK_CUR); + if (file_pos < 0) { + if (errno == ESPIPE) { + // Random access is not supported. Assume the current position as 0. + } else { + FailOperation("lseek()"); + } + return; + } + set_start_pos(IntCast(file_pos)); + supports_random_access_ = true; + } +} + +void FdIoUringWriterBase::InitializeFdIoUring(FdIoUringOptions options, int fd) { + async_ = options.async(); + + if(async_) { + fd_io_uring_ = std::make_unique(options, fd); + } else { + fd_io_uring_ = std::make_unique(options, fd); + } + + if(ABSL_PREDICT_FALSE(!fd_io_uring_ -> healthy())) { + Fail(*fd_io_uring_); + } +} + +bool FdIoUringWriterBase::FailOperation(absl::string_view operation) { + const int error_number = errno; + RIEGELI_ASSERT_NE(error_number, 0) + << "Failed precondition of FdIoUringWriterBase::FailOperation(): " + "zero errno"; + return Fail( + ErrnoToCanonicalStatus(error_number, absl::StrCat(operation, " failed"))); +} + +bool FdIoUringWriterBase::IoUringFailOperation(const int error_number, absl::string_view operation) { + RIEGELI_ASSERT_NE(error_number, 0) + << "Failed precondition of FdIoUringWriterBase::FailOperation(): " + "zero errno"; + return Fail( + ErrnoToCanonicalStatus(error_number, absl::StrCat(operation, " failed"))); +} + +void FdIoUringWriterBase::AnnotateFailure(absl::Status& status) { + RIEGELI_ASSERT(!status.ok()) + << "Failed precondition of Object::AnnotateFailure(): status not failed"; + status = Annotate(status, absl::StrCat("writing ", filename_)); + BufferedWriter::AnnotateFailure(status); +} + +bool FdIoUringWriterBase::WriteInternal(absl::string_view src) { + RIEGELI_ASSERT(!src.empty()) + << "Failed precondition of BufferedWriter::WriteInternal(): " + "nothing to write"; + RIEGELI_ASSERT(healthy()) + << "Failed precondition of BufferedWriter::WriteInternal(): " << status(); + + const int dest = dest_fd(); + if(ABSL_PREDICT_FALSE(!fd_io_uring_ -> healthy())) { + return Fail(*fd_io_uring_); + } + + if (ABSL_PREDICT_FALSE(src.size() > + Position{std::numeric_limits::max()} - + start_pos())) { + return FailOverflow(); + } + + do { + again: + const ssize_t length_written = fd_io_uring_ -> pwrite(dest, src.data(), + UnsignedMin(src.size(), + size_t{std::numeric_limits::max()}), + IntCast(start_pos())); + if (ABSL_PREDICT_FALSE(length_written < 0)) { + if (length_written == -EINTR || length_written == -EAGAIN) goto again; + return IoUringFailOperation(-length_written, "pwrite()"); + } + RIEGELI_ASSERT_GT(length_written, 0) + << "pwrite()" << " returned 0"; + RIEGELI_ASSERT_LE(IntCast(length_written), src.size()) + << "pwrite()" + << " wrote more than requested"; + move_start_pos(IntCast(length_written)); + src.remove_prefix(IntCast(length_written)); + } while (!src.empty()); + return true; +} + +bool FdIoUringWriterBase::FlushImpl(FlushType flush_type) { + if (ABSL_PREDICT_FALSE(!BufferedWriter::FlushImpl(flush_type))) return false; + switch (flush_type) { + case FlushType::kFromObject: + case FlushType::kFromProcess: + return true; + case FlushType::kFromMachine: { + if(ABSL_PREDICT_FALSE(!fd_io_uring_ -> healthy())) { + return Fail(*fd_io_uring_); + } + const int dest = dest_fd(); + const int fsync_res = fd_io_uring_ -> fsync(dest); + if (ABSL_PREDICT_FALSE(fsync_res < 0)) { + return IoUringFailOperation(-fsync_res, "fsync()"); + } + return true; + } + } + RIEGELI_ASSERT_UNREACHABLE() + << "Unknown flush type: " << static_cast(flush_type); +} + +inline bool FdIoUringWriterBase::SeekInternal(int dest, Position new_pos) { + RIEGELI_ASSERT_EQ(buffer_size(), 0u) + << "Failed precondition of FdIoUringWriterBase::SeekInternal(): " + "buffer not empty"; + if (!has_independent_pos_) { + if (ABSL_PREDICT_FALSE(lseek(dest, IntCast(new_pos), SEEK_SET) < + 0)) { + return FailOperation("lseek()"); + } + } + set_start_pos(new_pos); + return true; +} + +bool FdIoUringWriterBase::SeekBehindBuffer(Position new_pos) { + RIEGELI_ASSERT_EQ(buffer_size(), 0u) + << "Failed precondition of BufferedWriter::SeekBehindBuffer(): " + "buffer not empty"; + if (ABSL_PREDICT_FALSE(!healthy())) return false; + const int dest = dest_fd(); + if (new_pos >= start_pos()) { + // Seeking forwards. + struct stat stat_info; + if (ABSL_PREDICT_FALSE(fstat(dest, &stat_info) < 0)) { + return FailOperation("fstat()"); + } + if (ABSL_PREDICT_FALSE(new_pos > IntCast(stat_info.st_size))) { + // File ends. + SeekInternal(dest, IntCast(stat_info.st_size)); + return false; + } + } + return SeekInternal(dest, new_pos); +} + +absl::optional FdIoUringWriterBase::SizeBehindBuffer() { + RIEGELI_ASSERT_EQ(buffer_size(), 0u) + << "Failed precondition of BufferedWriter::SizeBehindBuffer(): " + "buffer not empty"; + if (ABSL_PREDICT_FALSE(!healthy())) return absl::nullopt; + const int dest = dest_fd(); + struct stat stat_info; + if (ABSL_PREDICT_FALSE(fstat(dest, &stat_info) < 0)) { + FailOperation("fstat()"); + return absl::nullopt; + } + return IntCast(stat_info.st_size); +} + +bool FdIoUringWriterBase::TruncateBehindBuffer(Position new_size) { + RIEGELI_ASSERT_EQ(buffer_size(), 0u) + << "Failed precondition of BufferedWriter::TruncateBehindBuffer(): " + "buffer not empty"; + if (ABSL_PREDICT_FALSE(!healthy())) return false; + const int dest = dest_fd(); + if (new_size >= start_pos()) { + // Seeking forwards. + struct stat stat_info; + if (ABSL_PREDICT_FALSE(fstat(dest, &stat_info) < 0)) { + return FailOperation("fstat()"); + } + if (ABSL_PREDICT_FALSE(new_size > IntCast(stat_info.st_size))) { + // File ends. + SeekInternal(dest, IntCast(stat_info.st_size)); + return false; + } + } +again: + if (ABSL_PREDICT_FALSE(ftruncate(dest, IntCast(new_size)) < 0)) { + if (errno == EINTR) goto again; + return FailOperation("ftruncate()"); + } + return SeekInternal(dest, new_size); +} + +} // namespace riegeli diff --git a/riegeli/bytes/fd_io_uring_writer.h b/riegeli/bytes/fd_io_uring_writer.h new file mode 100644 index 00000000..19dc88c7 --- /dev/null +++ b/riegeli/bytes/fd_io_uring_writer.h @@ -0,0 +1,448 @@ +#ifndef RIEGELI_BYTES_FD_IO_URING_WRITER_H_ +#define RIEGELI_BYTES_FD_IO_URING_WRITER_H_ + +#include +#include +#include + +#include +#include +#include +#include + +#include "absl/base/attributes.h" +#include "absl/base/optimization.h" +#include "absl/status/status.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "riegeli/base/base.h" +#include "riegeli/base/dependency.h" +#include "riegeli/bytes/buffered_writer.h" +#include "riegeli/bytes/fd_dependency.h" + +#include "riegeli/iouring/fd_sync_io_uring.h" +#include "riegeli/iouring/fd_async_io_uring.h" + +namespace riegeli { + +// Template parameter independent part of `FdIoUringWriter`. +class FdIoUringWriterBase : public BufferedWriter { + public: + class Options { + public: + Options() noexcept {} + + // Permissions to use in case a new file is created (9 bits). The effective + // permissions are modified by the process's umask. + // + // Default: `0666`. + Options& set_permissions(mode_t permissions) & { + permissions_ = permissions; + return *this; + } + Options&& set_permissions(mode_t permissions) && { + return std::move(set_permissions(permissions)); + } + mode_t permissions() const { return permissions_; } + + // If `absl::nullopt`, the current position reported by `pos()` corresponds + // to the current fd position if possible, otherwise 0 is assumed as the + // initial position. Random access is supported if the fd supports random + // access. + // + // If not `absl::nullopt`, this position is assumed initially, to be + // reported by `pos()`. It does not need to correspond to the current fd + // position. Random access is not supported. + // + // `assumed_pos()` and `independent_pos()` must not be both set. + // + // Default: `absl::nullopt`. + Options& set_assumed_pos(absl::optional assumed_pos) & { + assumed_pos_ = assumed_pos; + return *this; + } + Options&& set_assumed_pos(absl::optional assumed_pos) && { + return std::move(set_assumed_pos(assumed_pos)); + } + absl::optional assumed_pos() const { return assumed_pos_; } + + // If `absl::nullopt`, `FdIoUringWriter` writes at the current fd position. + // + // If not `absl::nullopt`, `FdIoUringWriter` writes starting from this position, + // without disturbing the current fd position. This is useful for multiple + // writers concurrently writing to disjoint regions of the same file. The fd + // must support `pwrite()`. + // + // `assumed_pos()` and `independent_pos()` must not be both set. + // + // Default: `absl::nullopt`. + Options& set_independent_pos(absl::optional independent_pos) & { + independent_pos_ = independent_pos; + return *this; + } + Options&& set_independent_pos(absl::optional independent_pos) && { + return std::move(set_independent_pos(independent_pos)); + } + absl::optional independent_pos() const { + return independent_pos_; + } + + // Tunes how much data is buffered before writing to the file. + // + // Default: `kDefaultBufferSize` (64K). + Options& set_buffer_size(size_t buffer_size) & { + RIEGELI_ASSERT_GT(buffer_size, 0u) + << "Failed precondition of FdIoUringWriterBase::Options::set_buffer_size(): " + "zero buffer size"; + buffer_size_ = buffer_size; + return *this; + } + Options&& set_buffer_size(size_t buffer_size) && { + return std::move(set_buffer_size(buffer_size)); + } + size_t buffer_size() const { return buffer_size_; } + + // The option for Io_Uring operation. + // + // The option includes: async or sync, fd_register or not, and size. + Options& set_io_uring_option(FdIoUringOptions io_uring_option) & { + io_uring_option_ = io_uring_option; + return *this; + } + + Options&& set_io_uring_option(FdIoUringOptions io_uring_option) && { + return std::move(set_io_uring_option(io_uring_option)); + } + + FdIoUringOptions io_uring_option() const { + return io_uring_option_; + } + + private: + mode_t permissions_ = 0666; + absl::optional assumed_pos_; + absl::optional independent_pos_; + size_t buffer_size_ = kDefaultBufferSize; + FdIoUringOptions io_uring_option_; + }; + + // Returns the fd being written to. If the fd is owned then changed to -1 by + // `Close()`, otherwise unchanged. + virtual int dest_fd() const = 0; + + // Returns the original name of the file being written to (or "/dev/stdout", + // "/dev/stderr", or "/proc/self/fd/" if fd was given). Unchanged by + // `Close()`. + const std::string& filename() const { return filename_; } + + bool SupportsRandomAccess() override { return supports_random_access_; } + + protected: + FdIoUringWriterBase() noexcept {} + + explicit FdIoUringWriterBase(size_t buffer_size); + + FdIoUringWriterBase(FdIoUringWriterBase&& that) noexcept; + FdIoUringWriterBase& operator=(FdIoUringWriterBase&& that) noexcept; + + void Reset(); + void Reset(size_t buffer_size); + void Initialize(int dest, absl::optional assumed_pos, + absl::optional independent_pos, FdIoUringOptions io_uring_option); + int OpenFd(absl::string_view filename, int flags, mode_t permissions); + void InitializePos(int dest, absl::optional assumed_pos, + absl::optional independent_pos); + void InitializePos(int dest, int flags, absl::optional assumed_pos, + absl::optional independent_pos); + void InitializeFdIoUring(FdIoUringOptions options, int fd); + ABSL_ATTRIBUTE_COLD bool FailOperation(absl::string_view operation); + ABSL_ATTRIBUTE_COLD bool IoUringFailOperation(const int error_number, absl::string_view operation); + + void AnnotateFailure(absl::Status& status) override; + bool WriteInternal(absl::string_view src) override; + bool FlushImpl(FlushType flush_type) override; + bool SeekBehindBuffer(Position new_pos) override; + absl::optional SizeBehindBuffer() override; + bool TruncateBehindBuffer(Position new_size) override; + + private: + void SetFilename(int dest); + bool SeekInternal(int dest, Position new_pos); + + std::string filename_; + bool supports_random_access_ = false; + bool has_independent_pos_ = false; + bool async_ = false; + + protected: + std::unique_ptr fd_io_uring_; + // Invariant: `start_pos() <= std::numeric_limits::max()` +}; + +// A `Writer` which writes to a file descriptor. +// +// The fd must support: +// * `fcntl()` - for the constructor from fd, +// if `Options::assumed_pos() == absl::nullopt` +// and `Options::independent_pos() == absl::nullopt` +// * `close()` - if the fd is owned +// * `write()` - if `Options::independent_pos() == absl::nullopt` +// * `pwrite()` - if `Options::independent_pos() != absl::nullopt` +// * `lseek()` - for `Seek()`, `Size()`, or `Truncate()` +// if `Options::independent_pos() == absl::nullopt` +// * `fstat()` - for `Seek()`, `Size()`, or `Truncate()` +// * `fsync()` - for `Flush(FlushType::kFromMachine)` +// * `ftruncate()` - for `Truncate()` +// +// `FdIoUringWriter` supports random access if +// `Options::assumed_pos() == absl::nullopt` and the fd supports random access +// (this is assumed if `Options::independent_pos() != absl::nullopt`, otherwise +// this is checked by calling `lseek()`). +// +// The `Dest` template parameter specifies the type of the object providing and +// possibly owning the fd being written to. `Dest` must support +// `Dependency`, e.g. `OwnedFd` (owned, default), `UnownedFd` +// (not owned). +// +// By relying on CTAD the template argument can be deduced as `OwnedFd` if the +// first constructor argument is a filename or an `int`, otherwise as the value +// type of the first constructor argument. This requires C++17. +// +// Until the `FdIoUringWriter` is closed or no longer used, the fd must not be closed; +// additionally, if `Options::independent_pos() == absl::nullopt`, the fd should +// not have its position changed, except that if random access is not used, +// careful interleaving of multiple writers is possible: `Flush()` is needed +// before switching to another writer, and `pos()` does not take other writers +// into account. +template +class FdIoUringWriter : public FdIoUringWriterBase { + public: + // Creates a closed `FdIoUringWriter`. + FdIoUringWriter() noexcept {} + + // Will write to the fd provided by `dest`. + explicit FdIoUringWriter(const Dest& dest, Options options = Options()); + explicit FdIoUringWriter(Dest&& dest, Options options = Options()); + + // Will write to the fd provided by a `Dest` constructed from elements of + // `dest_args`. This avoids constructing a temporary `Dest` and moving from + // it. + template + explicit FdIoUringWriter(std::tuple dest_args, + Options options = Options()); + + // Opens a file for writing. + // + // `flags` is the second argument of `open()`, typically one of: + // * `O_WRONLY | O_CREAT | O_TRUNC` + // * `O_WRONLY | O_CREAT | O_APPEND` + // + // `flags` must include either `O_WRONLY` or `O_RDWR`. + // + // If opening the file fails, `FdIoUringWriter` will be failed and closed. + explicit FdIoUringWriter(absl::string_view filename, int flags, + Options options = Options()); + + FdIoUringWriter(FdIoUringWriter&& that) noexcept; + FdIoUringWriter& operator=(FdIoUringWriter&& that) noexcept; + + // Makes `*this` equivalent to a newly constructed `FdIoUringWriter`. This avoids + // constructing a temporary `FdIoUringWriter` and moving from it. + void Reset(); + void Reset(const Dest& dest, Options options = Options()); + void Reset(Dest&& dest, Options options = Options()); + template + void Reset(std::tuple dest_args, Options options = Options()); + void Reset(absl::string_view filename, int flags, + Options options = Options()); + + // Returns the object providing and possibly owning the fd being written to. + // If the fd is owned then changed to -1 by `Close()`, otherwise unchanged. + Dest& dest() { return dest_.manager(); } + const Dest& dest() const { return dest_.manager(); } + int dest_fd() const override { return dest_.get(); } + + protected: + using FdIoUringWriterBase::Initialize; + void Initialize(absl::string_view filename, int flags, Options&& options); + + void Done() override; + + private: + // The object providing and possibly owning the fd being written to. + Dependency dest_; +}; + +// Support CTAD. +#if __cpp_deduction_guides +FdIoUringWriter()->FdIoUringWriter>; +template +explicit FdIoUringWriter(const Dest& dest, + FdIoUringWriterBase::Options options = FdIoUringWriterBase::Options()) + -> FdIoUringWriter::value, + OwnedFd, std::decay_t>>; +template +explicit FdIoUringWriter(Dest&& dest, + FdIoUringWriterBase::Options options = FdIoUringWriterBase::Options()) + -> FdIoUringWriter::value, + OwnedFd, std::decay_t>>; +template +explicit FdIoUringWriter(std::tuple dest_args, + FdIoUringWriterBase::Options options = FdIoUringWriterBase::Options()) + -> FdIoUringWriter>>; +explicit FdIoUringWriter(absl::string_view filename, int flags, + FdIoUringWriterBase::Options options = FdIoUringWriterBase::Options()) + ->FdIoUringWriter<>; +#endif + +// Implementation details follow. + +inline FdIoUringWriterBase::FdIoUringWriterBase(size_t buffer_size) + : BufferedWriter(buffer_size) {} + +inline FdIoUringWriterBase::FdIoUringWriterBase(FdIoUringWriterBase&& that) noexcept + : BufferedWriter(std::move(that)), + // Using `that` after it was moved is correct because only the base class + // part was moved. + filename_(std::move(that.filename_)), + supports_random_access_(that.supports_random_access_), + has_independent_pos_(that.has_independent_pos_), async_(that.async_), fd_io_uring_(std::move(that.fd_io_uring_)) {} + +inline FdIoUringWriterBase& FdIoUringWriterBase::operator=(FdIoUringWriterBase&& that) noexcept { + BufferedWriter::operator=(std::move(that)); + // Using `that` after it was moved is correct because only the base class part + // was moved. + filename_ = std::move(that.filename_); + supports_random_access_ = that.supports_random_access_; + has_independent_pos_ = that.has_independent_pos_; + async_ = that.async_; + fd_io_uring_ = std::move(that.fd_io_uring_); + return *this; +} + +inline void FdIoUringWriterBase::Reset() { + BufferedWriter::Reset(); + filename_.clear(); + supports_random_access_ = false; + has_independent_pos_ = false; + async_ = false; + fd_io_uring_.reset(); +} + +inline void FdIoUringWriterBase::Reset(size_t buffer_size) { + BufferedWriter::Reset(buffer_size); + // `filename_` was set by `OpenFd()` or will be set by `Initialize()`. + supports_random_access_ = false; + has_independent_pos_ = false; + async_ = false; + fd_io_uring_.reset(); +} + +template +inline FdIoUringWriter::FdIoUringWriter(const Dest& dest, Options options) + : FdIoUringWriterBase(options.buffer_size()), dest_(dest) { + Initialize(dest_.get(), options.assumed_pos(), options.independent_pos(), options.io_uring_option()); +} + +template +inline FdIoUringWriter::FdIoUringWriter(Dest&& dest, Options options) + : FdIoUringWriterBase(options.buffer_size()), dest_(std::move(dest)) { + Initialize(dest_.get(), options.assumed_pos(), options.independent_pos(), options.io_uring_option()); +} + +template +template +inline FdIoUringWriter::FdIoUringWriter(std::tuple dest_args, + Options options) + : FdIoUringWriterBase(options.buffer_size()), dest_(std::move(dest_args)) { + Initialize(dest_.get(), options.assumed_pos(), options.independent_pos(), options.io_uring_option()); +} + +template +inline FdIoUringWriter::FdIoUringWriter(absl::string_view filename, int flags, + Options options) { + Initialize(filename, flags, std::move(options)); +} + +template +inline FdIoUringWriter::FdIoUringWriter(FdIoUringWriter&& that) noexcept + : FdIoUringWriterBase(std::move(that)), + // Using `that` after it was moved is correct because only the base class + // part was moved. + dest_(std::move(that.dest_)) {} + +template +inline FdIoUringWriter& FdIoUringWriter::operator=(FdIoUringWriter&& that) noexcept { + FdIoUringWriterBase::operator=(std::move(that)); + // Using `that` after it was moved is correct because only the base class part + // was moved. + dest_ = std::move(that.dest_); + return *this; +} + +template +inline void FdIoUringWriter::Reset() { + FdIoUringWriterBase::Reset(); + dest_.Reset(); +} + +template +inline void FdIoUringWriter::Reset(const Dest& dest, Options options) { + FdIoUringWriterBase::Reset(options.buffer_size()); + dest_.Reset(dest); + Initialize(dest_.get(), options.assumed_pos(), options.independent_pos(), options.io_uring_option()); +} + +template +inline void FdIoUringWriter::Reset(Dest&& dest, Options options) { + FdIoUringWriterBase::Reset(options.buffer_size()); + dest_.Reset(std::move(dest)); + Initialize(dest_.get(), options.assumed_pos(), options.independent_pos(), options.io_uring_option()); +} + +template +template +inline void FdIoUringWriter::Reset(std::tuple dest_args, + Options options) { + FdIoUringWriterBase::Reset(options.buffer_size()); + dest_.Reset(std::move(dest_args)); + Initialize(dest_.get(), options.assumed_pos(), options.independent_pos(), options.io_uring_option()); +} + +template +inline void FdIoUringWriter::Reset(absl::string_view filename, int flags, + Options options) { + Reset(); + Initialize(filename, flags, std::move(options)); +} + +template +void FdIoUringWriter::Initialize(absl::string_view filename, int flags, + Options&& options) { + const int dest = OpenFd(filename, flags, options.permissions()); + if (ABSL_PREDICT_FALSE(dest < 0)) return; + FdIoUringWriterBase::Reset(options.buffer_size()); + dest_.Reset(std::forward_as_tuple(dest)); + int fd = dest_.get(); + InitializeFdIoUring(options.io_uring_option(), fd); + InitializePos(dest_.get(), flags, options.assumed_pos(), + options.independent_pos()); +} + +template +void FdIoUringWriter::Done() { + FdIoUringWriterBase::Done(); + fd_io_uring_.reset(); + + if (dest_.is_owning()) { + const int dest = dest_.Release(); + if (ABSL_PREDICT_FALSE(internal::CloseFd(dest) < 0) && + ABSL_PREDICT_TRUE(healthy())) { + FailOperation(internal::kCloseFunctionName); + } + } +} + +} // namespace riegeli + +#endif // RIEGELI_BYTES_FD_IO_URING_WRITER_H_ \ No newline at end of file diff --git a/riegeli/iouring/BUILD b/riegeli/iouring/BUILD new file mode 100644 index 00000000..e6ff0356 --- /dev/null +++ b/riegeli/iouring/BUILD @@ -0,0 +1,46 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +cc_library( + name = "fd_io_uring_options", + srcs = ["fd_io_uring_options.cc"], + hdrs = ["fd_io_uring_options.h"], +) + +cc_library( + name = "fd_io_uring", + srcs = ["fd_io_uring.cc"], + hdrs = ["fd_io_uring.h"], + deps = [ + "//riegeli/base", + "//riegeli/base:status", + ":fd_io_uring_options", + "@liburing", + "@com_google_absl//absl/base:core_headers", + "@com_google_absl//absl/strings", + ] +) + +cc_library( + name = "fd_sync_io_uring", + srcs = [ + "fd_sync_io_uring.cc", + ], + hdrs = ["fd_sync_io_uring.h"], + deps = [ + ":fd_io_uring", + ], +) + +cc_library( + name = "fd_async_io_uring", + srcs = [ + "fd_async_io_uring.cc", + ], + hdrs = ["fd_async_io_uring.h"], + deps = [ + ":fd_io_uring", + "@com_google_absl//absl/functional:bind_front", + ], +) \ No newline at end of file diff --git a/riegeli/iouring/fd_async_io_uring.cc b/riegeli/iouring/fd_async_io_uring.cc new file mode 100644 index 00000000..42ca1daa --- /dev/null +++ b/riegeli/iouring/fd_async_io_uring.cc @@ -0,0 +1,182 @@ +#include "riegeli/iouring/fd_async_io_uring.h" + +#include "absl/functional/bind_front.h" + +namespace riegeli { + +FdAsyncIoUring::FdAsyncIoUring(FdIoUringOptions options, int fd) +:FdIoUring(options, fd), exit_(false), process_num_(0) { + reap_thread_ = std::thread([this]() { this->Reap(); }); +} + +FdAsyncIoUring::~FdAsyncIoUring() { + exit_.store(true); + + if(reap_thread_.joinable()) { + reap_thread_.join(); + } +} + +ssize_t FdAsyncIoUring::pread(int fd, void *buf, size_t count, off_t offset) { + if(fd_register_) { + RIEGELI_ASSERT_EQ(fd_, fd) << "The fd is not epual to the registered fd."; + return preadInternel(0, buf, count, offset); + } + + return preadInternel(fd, buf, count, offset); +} + +ssize_t FdAsyncIoUring::pwrite(int fd, const void *buf, size_t count, off_t offset) { + void* src = operator new(count); + + if(ABSL_PREDICT_FALSE(!src)) { + FailOperation(12, "FdAsyncIoUring pwrite() Create Space"); + return -12; + } + + std::memcpy(src, buf, count); + + if(fd_register_) { + RIEGELI_ASSERT_EQ(fd_, fd) << "The fd is not epual to the registered fd."; + return pwriteInternel(0, src, count, offset); + } + + return pwriteInternel(fd, src, count, offset); +} + +int FdAsyncIoUring::fsync(int fd) { + if(fd_register_) { + RIEGELI_ASSERT_EQ(fd_, fd) << "The fd is not epual to the registered fd."; + return fsyncInternel(0); + } + + return fsyncInternel(fd); +} + +ssize_t FdAsyncIoUring::preadInternel(int fd, void *buf, size_t count, off_t offset) { + std::lock_guard lock_g(sq_mutex_); + struct io_uring_sqe *sqe = GetSqe(); + io_uring_prep_read(sqe, fd, buf, count, offset); + if(fd_register_) { + sqe -> flags |= IOSQE_FIXED_FILE; + } + FdAsyncIoUringOp::CallBackFunc cb = absl::bind_front(&FdAsyncIoUring::preadCallBack, this); + FdAsyncIoUringOp* op = new FdAsyncIoUringOp(sqe, cb); + io_uring_sqe_set_data(sqe, op); + SubmitSqe(); + return count; +} + +ssize_t FdAsyncIoUring::pwriteInternel(int fd, const void* buf, size_t count, off_t offset) { + std::lock_guard lock_g(sq_mutex_); + struct io_uring_sqe *sqe = GetSqe(); + io_uring_prep_write(sqe, fd, buf, count, offset); + if(fd_register_) { + sqe -> flags |= IOSQE_FIXED_FILE; + } + FdAsyncIoUringOp::CallBackFunc cb = absl::bind_front(&FdAsyncIoUring::pwriteCallBack, this); + FdAsyncIoUringOp* op = new FdAsyncIoUringOp(sqe, cb); + io_uring_sqe_set_data(sqe, op); + SubmitSqe(); + return count; +} + +int FdAsyncIoUring::fsyncInternel(int fd) { + std::lock_guard lock_g(sq_mutex_); + struct io_uring_sqe *sqe = GetSqe(); + io_uring_prep_fsync(sqe, fd, 0); + if(fd_register_) { + sqe -> flags |= IOSQE_FIXED_FILE; + } + FdAsyncIoUringOp::CallBackFunc cb = absl::bind_front(&FdAsyncIoUring::fsyncCallBack, this); + FdAsyncIoUringOp* op = new FdAsyncIoUringOp(sqe, cb); + io_uring_sqe_set_data(sqe, op); + SubmitSqe(); + return 0; +} + +void FdAsyncIoUring::preadCallBack(FdAsyncIoUringOp *op, const ssize_t res) { + int fd = op -> GetSqe().fd; + void* buf = reinterpret_cast(op -> GetSqe().addr); + size_t offset = op -> GetSqe().off; + size_t count = op -> GetSqe().len; + delete op; + + if(ABSL_PREDICT_FALSE(res < 0)) { + if(res == -EINTR || res == -EAGAIN) { + preadInternel(fd, buf, count, offset); + } else { + FailOperation(-res, "pread()"); + } + } + RIEGELI_ASSERT_GT(res, 0) << "pread() return 0."; +} + +void FdAsyncIoUring::pwriteCallBack(FdAsyncIoUringOp *op, const ssize_t res) { + int fd = op -> GetSqe().fd; + void* buf = reinterpret_cast(op -> GetSqe().addr); + size_t offset = op -> GetSqe().off; + size_t count = op -> GetSqe().len; + delete op; + + if(ABSL_PREDICT_FALSE(res < 0)) { + if(res == -EINTR || res == -EAGAIN) { + pwriteInternel(fd, buf, count, offset); + } else { + operator delete(buf, count); + FailOperation(-res, "pwrite()"); + } + return; + } + RIEGELI_ASSERT_GT(res, 0) << "pwrite() return 0."; + RIEGELI_ASSERT_LE((size_t) res, count) << "pwrite() wrote more than requested."; + + char* newBuf = static_cast(buf); + operator delete(buf, res); + if(ABSL_PREDICT_FALSE(count - res > 0)) { + newBuf += res; + offset += res; + count -= res; + pwriteInternel(fd, newBuf, count, offset); + } +} + +void FdAsyncIoUring::fsyncCallBack(FdAsyncIoUringOp *op, const ssize_t res) { + delete op; + + if(ABSL_PREDICT_FALSE(res < 0)) { + FailOperation(-res, "fsync()"); + } +} + +inline struct io_uring_sqe* FdAsyncIoUring::GetSqe() { + struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_); + RIEGELI_ASSERT(!!sqe) << "Failed get a sqe."; + return sqe; +} + +inline void FdAsyncIoUring::SubmitSqe() { + const int submit_res = io_uring_submit(&ring_); + RIEGELI_ASSERT_GT(submit_res, 0) << "Failed to submit the sqe."; + process_num_ += submit_res; +} + +void FdAsyncIoUring::Reap() { + while(!exit_.load() || process_num_ != 0) { + if(process_num_ != 0) { + struct io_uring_cqe* cqe = NULL; + if(io_uring_wait_cqe(&ring_, &cqe) == 0) { + --process_num_; + const ssize_t res = cqe -> res; + FdAsyncIoUringOp *op = static_cast(io_uring_cqe_get_data(cqe)); + FdAsyncIoUringOp::CallBackFunc cb = op -> GetCallBackFunc(); + if(cb != NULL) { + cb(op, res); + } + io_uring_cqe_seen(&ring_, cqe); + } + } + } +} + +} // namespace riegeli diff --git a/riegeli/iouring/fd_async_io_uring.h b/riegeli/iouring/fd_async_io_uring.h new file mode 100644 index 00000000..7f91a964 --- /dev/null +++ b/riegeli/iouring/fd_async_io_uring.h @@ -0,0 +1,109 @@ +#ifndef RIEGELI_IOURING_FD_ASYNC_IO_URING_H_ +#define RIEGELI_IOURING_FD_ASYNC_IO_URING_H_ + +#include +#include +#include +#include + +#include "riegeli/iouring/fd_io_uring.h" + +namespace riegeli { + +// Class maintains data and call back function for Io_Uring operation. +class FdAsyncIoUringOp { + public: + // Call back function. + using CallBackFunc = std::function; + + // Consturctor and destructor for FdAsyncIoUringOp. + explicit FdAsyncIoUringOp() : cb_(NULL) {} + + explicit FdAsyncIoUringOp(struct io_uring_sqe *sqe) : sqe_(*sqe), cb_(NULL) {} + + explicit FdAsyncIoUringOp(struct io_uring_sqe *sqe, CallBackFunc cb) : sqe_(*sqe), cb_(cb) {} + + FdAsyncIoUringOp(const FdAsyncIoUringOp&) = delete; + FdAsyncIoUringOp& operator=(const FdAsyncIoUringOp&) = delete; + + void SetCallBackFunc(CallBackFunc cb) { + cb_ = cb; + } + + CallBackFunc GetCallBackFunc() const { + return cb_; + } + + void SetSqe(const struct io_uring_sqe* sqe) { + sqe_ = *sqe; + } + + const struct io_uring_sqe& GetSqe() const { + return sqe_; + } + + private: + struct io_uring_sqe sqe_; + CallBackFunc cb_; +}; + +// Perform Io_Uring asynchronously. +class FdAsyncIoUring : public FdIoUring { + public: + // Constructor and destructor for this class. + FdAsyncIoUring() = delete; + FdAsyncIoUring(const FdAsyncIoUring&) = delete; + FdAsyncIoUring& operator=(const FdAsyncIoUring&) = delete; + + explicit FdAsyncIoUring(FdIoUringOptions options, int fd = -1); + + ~FdAsyncIoUring() override; + + // Override the file operation interface for Io_Uring. ToDo: complete pread / preadv / pwritev function. + ssize_t pread(int fd, void *buf, size_t count, off_t offset) override; + + ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset) override; + + int fsync(int fd) override; + + // Get the mode of Io_Uring. + IoUringMode Mode() override { + return IoUringMode::ASYNCIOURING; + } + + private: + // Reap handler function. + void Reap(); + + // Get sqe. + struct io_uring_sqe* GetSqe(); + + // Submit sqe to kernel. + void SubmitSqe(); + + // Internel part and call back function of file operation. + ssize_t pwriteInternel(int fd, const void *buf, size_t count, off_t offset); + ssize_t preadInternel(int fd, void *buf, size_t count, off_t offset); + int fsyncInternel(int fd); + + void preadCallBack(FdAsyncIoUringOp *op, const ssize_t res); + void pwriteCallBack(FdAsyncIoUringOp *op, const ssize_t res); + void fsyncCallBack(FdAsyncIoUringOp *op, const ssize_t res); + + private: + // Joinable thread flag. + std::atomic_bool exit_; + + // Amount of processing number. + std::atomic_int process_num_; + + // Reaping thread. + std::thread reap_thread_; + + // Submission queue mutex. + std::mutex sq_mutex_; +}; + +} // namespace riegeli + +#endif // RIEGELI_IOURING_FD_ASYNC_IO_URING_H_ \ No newline at end of file diff --git a/riegeli/iouring/fd_io_uring.cc b/riegeli/iouring/fd_io_uring.cc new file mode 100644 index 00000000..20197bea --- /dev/null +++ b/riegeli/iouring/fd_io_uring.cc @@ -0,0 +1,80 @@ +#include "riegeli/iouring/fd_io_uring.h" + +#include "riegeli/base/errno_mapping.h" + +#include "absl/base/optimization.h" +#include "absl/strings/str_cat.h" + +namespace riegeli { + +namespace ioUring { + +bool IsIoUringAvailable() { + struct io_uring test_ring; + bool available = false; + if(io_uring_queue_init(4, &test_ring, 0) == 0) { + available = true; + io_uring_queue_exit(&test_ring); + } + return available; +} + +} + +FdIoUring::FdIoUring(FdIoUringOptions options, int fd) : Object(kInitiallyOpen), size_(options.size()) { + Init(options.fd_register(), fd); +} + +bool FdIoUring::Init(bool fd_register, int fd) { + memset(&ring_, 0, sizeof(ring_)); + memset(¶ms_, 0, sizeof(params_)); + + const int init_res = io_uring_queue_init_params(size_, &ring_, ¶ms_); + if(ABSL_PREDICT_FALSE(init_res < 0)) { + return FailOperation(-init_res, "Init Io_Uring"); + } + + if(fd_register) { + RegisterFd(fd); + } + + return true; +} + +FdIoUring::~FdIoUring() { + io_uring_queue_exit(&ring_); +} + +void FdIoUring::RegisterFd(int fd) { + fd_ = fd; + + if(fd_register_ == false) { + const int register_res = io_uring_register_files(&ring_, &fd_, 1); + RIEGELI_ASSERT_EQ(register_res, 0) << "Failed fd register."; + fd_register_ = true; + } else { + UpdateFd(); + } +} + +void FdIoUring::UnRegisterFd() { + fd_ = -1; + const int unregister_res = io_uring_unregister_files(&ring_); + RIEGELI_ASSERT_EQ(unregister_res, 0) << "Failed fd unregister."; + fd_register_ = false; +} + +void FdIoUring::UpdateFd() { + const int update_res = io_uring_register_files_update(&ring_, 0, &fd_, 1); + RIEGELI_ASSERT_EQ(update_res, 1) << "Failed fd update."; +} + +bool FdIoUring::FailOperation(const int error_number, absl::string_view operation) { + RIEGELI_ASSERT_NE(error_number, 0) + << "Failed precondition of FdWriterBase::FailOperation(): " + "zero errno"; + return Fail( + ErrnoToCanonicalStatus(error_number, absl::StrCat(operation, " failed"))); +} + +} // namespace riegeli \ No newline at end of file diff --git a/riegeli/iouring/fd_io_uring.h b/riegeli/iouring/fd_io_uring.h new file mode 100644 index 00000000..635109be --- /dev/null +++ b/riegeli/iouring/fd_io_uring.h @@ -0,0 +1,100 @@ +#ifndef RIEGELI_IOURING_FD_IO_URING_H_ +#define RIEGELI_IOURING_FD_IO_URING_H_ + +#include +#include +#include + +#include +#include + +#include "liburing.h" +#include "syscall.h" + +#include "absl/base/attributes.h" +#include "absl/strings/string_view.h" + +#include "riegeli/base/base.h" +#include "riegeli/base/object.h" +#include "riegeli/iouring/fd_io_uring_options.h" + +namespace riegeli { + +namespace ioUring { + +bool IsIoUringAvailable(); + +} + +// The base class for sync or async Io_Uring. +class FdIoUring : public Object { +public: + enum class IoUringMode { + ASYNCIOURING, + SYNCIOURING, + }; + +public: + FdIoUring() = delete; + FdIoUring(const FdIoUring&) = delete; + FdIoUring& operator=(const FdIoUring&) = delete; + +protected: + FdIoUring(FdIoUringOptions options, int fd); + +public: + virtual ~FdIoUring(); + + // The interface for file operation for Io_Uring. + virtual ssize_t pread(int fd, void *buf, size_t count, off_t offset) = 0; + + virtual ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset) = 0; + + virtual int fsync(int fd) = 0; + + // Get the mode of Io_Uring. + virtual IoUringMode Mode() = 0; + + // Pre-register or unregister file descriptor for Io_Uring. + void RegisterFd(int fd); + + void UnRegisterFd(); + + // Get Io_Uring settings. + bool fd_register() { + return fd_register_; + } + + uint32_t size() { + return size_; + } + + int fd() { + return fd_; + } + +private: + // Init. + bool Init(bool fd_register, int fd); + + // Update registered fd. + void UpdateFd(); + +protected: + // Fail Operation + ABSL_ATTRIBUTE_COLD bool FailOperation(const int error_number, absl::string_view operation); + +protected: + // Io_Uring entrance and set up params. + struct io_uring_params params_; + struct io_uring ring_; + + // Io_Uring settings. + bool fd_register_ = false; + uint32_t size_ = 0; + int fd_ = -1; +}; + +} // namespace riegeli + +#endif // RIEGELI_IOURING_FD_IO_URING_H_ \ No newline at end of file diff --git a/riegeli/iouring/fd_io_uring_options.cc b/riegeli/iouring/fd_io_uring_options.cc new file mode 100644 index 00000000..a19421aa --- /dev/null +++ b/riegeli/iouring/fd_io_uring_options.cc @@ -0,0 +1,22 @@ +#include "riegeli/iouring/fd_io_uring_options.h" + +namespace riegeli { + +uint32_t FdIoUringOptions::RoundUpToNextPowerTwo(uint32_t size) { + if(size == 0) { + return size; + } + + --size; + size |= size >> 1; + size |= size >> 2; + size |= size >> 4; + size |= size >> 8; + size |= size >> 16; + if(size + 1 > 4096) { + return 4096; + } + return size + 1; +} + +} // namespace riegeli diff --git a/riegeli/iouring/fd_io_uring_options.h b/riegeli/iouring/fd_io_uring_options.h new file mode 100644 index 00000000..892ebde4 --- /dev/null +++ b/riegeli/iouring/fd_io_uring_options.h @@ -0,0 +1,90 @@ +#ifndef RIEGELI_IOURING_FD_IO_URING_OPTIONS_H_ +#define RIEGELI_IOURING_FD_IO_URING_OPTIONS_H_ + +#include +#include +#include + +#include + +namespace riegeli { + +// The base interface class for sync or async Io_Uring. +class FdIoUringOptions { + public: + FdIoUringOptions() noexcept {}; + + // Tunes the Io_Uring mode (sync or async). + // + // If "true", we will return the function immediately. + // A reap thread will process the result of operations later. + // + // If "false", we will wait for the result of operations. + // + // Default: "true" + FdIoUringOptions& set_async(bool async) & { + async_ = async; + return *this; + } + + FdIoUringOptions&& set_async(bool async) && { + return std::move(set_async(async)); + } + + bool async() const { + return async_; + } + + // Tunes the size of Io_Uring instance. + // + // The size must be a power of 2. + // + // Default: 8192. + FdIoUringOptions& set_size(uint32_t size) & { + size = RoundUpToNextPowerTwo(size); + size_ = size; + return *this; + } + + FdIoUringOptions&& set_size(uint32_t size) && { + return std::move(set_size(size)); + } + + uint32_t size() const { + return size_; + } + + // If "true", the Io_Uring instance will pre-register a file-set. + // + // If "false", the Io_Uring instance will not pre-register. + // + // This can save overhead in kernel when you know the file in advance. + // The kernel will not retrieve a reference of the file in this case. + // + // Default: "false". + FdIoUringOptions& set_fd_register(bool fd_register) & { + fd_register_ = fd_register; + return *this; + } + + FdIoUringOptions&& set_fd_register(bool fd_register) && { + return std::move(set_fd_register(fd_register)); + } + + bool fd_register() const { + return fd_register_; + } + + private: + // Tunes the value of size. + // Get the next power of two. + uint32_t RoundUpToNextPowerTwo(uint32_t size); + + bool async_ = true; + uint32_t size_ = 512; + bool fd_register_ = false; +}; + +} // namespace riegeli + +#endif // RIEGELI_IOURING_FD_IO_URING_OPTIONS_H_ \ No newline at end of file diff --git a/riegeli/iouring/fd_sync_io_uring.cc b/riegeli/iouring/fd_sync_io_uring.cc new file mode 100644 index 00000000..68efcd43 --- /dev/null +++ b/riegeli/iouring/fd_sync_io_uring.cc @@ -0,0 +1,77 @@ +#include "riegeli/iouring/fd_sync_io_uring.h" + +namespace riegeli { + +FdSyncIoUring::FdSyncIoUring(FdIoUringOptions options, int fd) +: FdIoUring(options, fd) {} + +ssize_t FdSyncIoUring::pread(int fd, void *buf, size_t count, off_t offset) { + struct io_uring_sqe *sqe = GetSqe(); + if(fd_register_) { + RIEGELI_ASSERT_EQ(fd_, fd) << "The fd is not epual to the registered fd."; + io_uring_prep_read(sqe, 0, buf, count, offset); + sqe -> flags |= IOSQE_FIXED_FILE; + } else { + io_uring_prep_read(sqe, fd, buf, count, offset); + } + const ssize_t res = SubmitAndGetResult(); + if(ABSL_PREDICT_FALSE(res < 0)) { + if(res != -EINTR && res != -EAGAIN) { + FailOperation(-res, "pread()"); + } + } + return res; +} + +ssize_t FdSyncIoUring::pwrite(int fd, const void *buf, size_t count, off_t offset) { + struct io_uring_sqe *sqe = GetSqe(); + if(fd_register_) { + RIEGELI_ASSERT_EQ(fd_, fd) << "The fd is not epual to the registered fd."; + io_uring_prep_write(sqe, 0, buf, count, offset); + sqe -> flags |= IOSQE_FIXED_FILE; + } else { + io_uring_prep_write(sqe, fd, buf, count, offset); + } + const ssize_t res = SubmitAndGetResult(); + if(ABSL_PREDICT_FALSE(res < 0)) { + if(res != -EINTR && res != -EAGAIN) { + FailOperation(-res, "pwrite()"); + } + } + return res; +} + +int FdSyncIoUring::fsync(int fd) { + struct io_uring_sqe *sqe = GetSqe(); + if(fd_register_) { + RIEGELI_ASSERT_EQ(fd_, fd) << "The fd is not epual to the registered fd."; + io_uring_prep_fsync(sqe, 0, 0); + sqe -> flags |= IOSQE_FIXED_FILE; + } else { + io_uring_prep_fsync(sqe, fd, 0); + } + const ssize_t res = SubmitAndGetResult(); + if(ABSL_PREDICT_FALSE(res < 0)) { + FailOperation(-res, "fsync()"); + } + return res; +} + +inline struct io_uring_sqe* FdSyncIoUring::GetSqe() { + struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_); + RIEGELI_ASSERT(!!sqe) << "Failed get a sqe."; + return sqe; +} + +inline ssize_t FdSyncIoUring::SubmitAndGetResult() { + const int submit_res = io_uring_submit(&ring_); + RIEGELI_ASSERT_GT(submit_res, 0) << "Failed to submit the sqe."; + struct io_uring_cqe* cqe = NULL; + const int wait_res = io_uring_wait_cqe(&ring_, &cqe); + RIEGELI_ASSERT_EQ(wait_res, 0) << "Failed to get a cqe"; + ssize_t res = cqe -> res; + io_uring_cqe_seen(&ring_, cqe); + return res; +} + +} // namespace riegeli diff --git a/riegeli/iouring/fd_sync_io_uring.h b/riegeli/iouring/fd_sync_io_uring.h new file mode 100644 index 00000000..5a07f305 --- /dev/null +++ b/riegeli/iouring/fd_sync_io_uring.h @@ -0,0 +1,41 @@ +#ifndef RIEGELI_IOURING_FD_SYNC_IO_URING_H_ +#define RIEGELI_IOURING_FD_SYNC_IO_URING_H_ + +#include "riegeli/iouring/fd_io_uring.h" + +namespace riegeli { +// Perform Io_Uring synchronously. +class FdSyncIoUring : public FdIoUring { + public: + // Constructor and destructor for this class. + FdSyncIoUring() = delete; + FdSyncIoUring(const FdSyncIoUring&) = delete; + FdSyncIoUring& operator=(const FdSyncIoUring&) = delete; + + explicit FdSyncIoUring(FdIoUringOptions options, int fd = -1); + + ~FdSyncIoUring() override {} + + // Override the file operation interface for Io_Uring. + ssize_t pread(int fd, void *buf, size_t count, off_t offset) override; + + ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset) override; + + int fsync(int fd) override; + + // Get the mode of Io_Uring. + IoUringMode Mode() override { + return IoUringMode::SYNCIOURING; + } + + private: + // Get sqe. + struct io_uring_sqe* GetSqe(); + + // Submit sqe to kernel. + ssize_t SubmitAndGetResult(); +}; + +} // namespace riegeli + +#endif // RIEGELI_IOURING_FD_SYNC_IO_URING_H_S \ No newline at end of file diff --git a/third_party/liburing.BUILD b/third_party/liburing.BUILD new file mode 100644 index 00000000..ac89eb73 --- /dev/null +++ b/third_party/liburing.BUILD @@ -0,0 +1,38 @@ +licenses(["notice"]) + +genrule( + name = "liburingconfigure", + tools = [ + "configure", + ], + outs = [ + "config-host.h", + "config-host.mak", + "config.log", + "src/include/liburing/compat.h", + ], + cmd = "tempdir=$(@D)/tmp.XXXXX; rm -rf $$tempdir; mkdir -p $$tempdir; cp -r external/liburing/* $$tempdir/; pushd $$tempdir; ./configure; popd; cp $$tempdir/config-host.h $$tempdir/config-host.mak $$tempdir/config.log $(@D); cp $$tempdir/src/include/liburing/compat.h $(@D)/src/include/liburing; rm -rf $$tempdir;", + local = 1, +) + +genrule( + name = "foo", + outs = ["foo.h"], + cmd = "ls -al", +) + +cc_library( + name = "liburing", + visibility = [ + "//visibility:public" + ], + hdrs = glob([ + "src/*.h", + "src/include/*.h", + "src/include/liburing/*.h", + ]) + [":liburingconfigure"], + srcs = glob([ + "src/*.c", + ]), + includes = ["src/include"] +) \ No newline at end of file