From 8a70ec3e7f95a7509bcbfff3864e5b936632d75b Mon Sep 17 00:00:00 2001 From: PyXiion Date: Sun, 25 Jan 2026 19:13:43 +0300 Subject: [PATCH 1/4] read/write API for tcp client --- CMakeLists.txt | 3 +- include/coro/net/io_status.hpp | 46 +++++++++ include/coro/net/tcp/client.hpp | 159 +++++++++++++++++++++++++++++--- src/net/io_status.cpp | 120 ++++++++++++++++++++++++ test/net/test_tcp_server.cpp | 41 +++----- 5 files changed, 324 insertions(+), 45 deletions(-) create mode 100644 include/coro/net/io_status.hpp create mode 100644 src/net/io_status.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index ac74c24a..3a8852cb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -170,8 +170,7 @@ if(LIBCORO_FEATURE_NETWORKING) include/coro/net/connect.hpp src/net/connect.cpp include/coro/net/hostname.hpp include/coro/net/ip_address.hpp src/net/ip_address.cpp - include/coro/net/recv_status.hpp src/net/recv_status.cpp - include/coro/net/send_status.hpp src/net/send_status.cpp + include/coro/net/io_status.hpp src/net/io_status.cpp include/coro/net/socket.hpp src/net/socket.cpp include/coro/net/tcp/client.hpp src/net/tcp/client.cpp include/coro/net/tcp/server.hpp src/net/tcp/server.cpp diff --git a/include/coro/net/io_status.hpp b/include/coro/net/io_status.hpp new file mode 100644 index 00000000..a5c6bfa6 --- /dev/null +++ b/include/coro/net/io_status.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include "coro/poll.hpp" +#include +#include + +namespace coro::net +{ + +struct io_status +{ + enum class kind + { + ok, + closed, + connection_reset, + connection_refused, + timeout, + + try_again, + polling_error, + cancelled, + + native + }; + + kind type{}; + int native_code{}; + + [[nodiscard]] auto is_ok() const -> bool { return type == kind::ok; } + [[nodiscard]] auto is_timeout() const -> bool { return type == kind::timeout; } + [[nodiscard]] auto is_closed() const -> bool { return type == kind::closed; } + + [[nodiscard]] auto is_native() const -> bool { return type == kind::native; } + + explicit operator bool() const { return is_ok(); } + + /** + * Returns a human-readable description of the error. + */ + [[nodiscard]] auto message() const -> std::string; +}; + +auto make_io_status_from_native(int native_code) -> io_status; +auto make_io_status_poll_status(coro::poll_status status) -> io_status; +} // namespace coro::net \ No newline at end of file diff --git a/include/coro/net/tcp/client.hpp b/include/coro/net/tcp/client.hpp index 84e3a511..58c68689 100644 --- a/include/coro/net/tcp/client.hpp +++ b/include/coro/net/tcp/client.hpp @@ -3,9 +3,8 @@ #include "coro/concepts/buffer.hpp" #include "coro/io_scheduler.hpp" #include "coro/net/connect.hpp" +#include "coro/net/io_status.hpp" #include "coro/net/ip_address.hpp" -#include "coro/net/recv_status.hpp" -#include "coro/net/send_status.hpp" #include "coro/net/socket.hpp" #include "coro/poll.hpp" #include "coro/task.hpp" @@ -38,9 +37,9 @@ class client */ explicit client( std::unique_ptr& scheduler, - options opts = options{ - .address = {net::ip_address::from_string("127.0.0.1")}, - .port = 8080, + options opts = options{ + .address = {net::ip_address::from_string("127.0.0.1")}, + .port = 8080, }); client(const client& other); client(client&& other) noexcept; @@ -78,6 +77,64 @@ class client return m_io_scheduler->poll(m_socket, op, timeout); } + auto read_some(std::span buffer, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + -> coro::task>> + { + auto poll_status = co_await poll(poll_op::read, timeout); + if (poll_status != poll_status::read) + { + co_return std::pair{make_io_status_poll_status(poll_status), std::span{}}; + } + co_return recv(buffer); + } + template + auto read_some(buffer_type&& buffer, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + -> coro::task>> + { + return read_some(std::as_writable_bytes(std::span{buffer}), timeout); + } + + auto read_exact(std::span buffer, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + -> coro::task>> + { + const auto start_time = std::chrono::steady_clock::now(); + std::span remaining = buffer; + + while (!remaining.empty()) + { + std::chrono::milliseconds remaining_timeout{0}; + if (timeout.count() > 0) + { + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_time); + + if (elapsed >= timeout) + { + // Returning read prefix of the span + co_return { + io_status{io_status::kind::timeout}, buffer.subspan(0, buffer.size() - remaining.size())}; + } + remaining_timeout = timeout - elapsed; + } + + auto [status, read_span] = co_await read_some(remaining, remaining_timeout); + remaining = remaining.subspan(read_span.size()); + + if (!status.is_ok()) + { + co_return {status, buffer.subspan(0, buffer.size() - remaining.size())}; + } + } + + co_return {io_status{io_status::kind::ok}, buffer}; + } + + template + auto read_exact(buffer_type&& buffer, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + -> coro::task>> + { + return read_exact(std::as_writable_bytes(std::span{buffer}), timeout); + } /** * Receives incoming data into the given buffer. By default, since all tcp client sockets are set * to non-blocking use co_await poll() to determine when data is ready to be received. @@ -88,29 +145,101 @@ class client template< concepts::mutable_buffer buffer_type, typename element_type = typename concepts::mutable_buffer_traits::element_type> - auto recv(buffer_type&& buffer) -> std::pair> + auto recv(buffer_type&& buffer) -> std::pair> { // If the user requested zero bytes, just return. if (buffer.empty()) { - return {recv_status::ok, std::span{}}; + return {io_status{io_status::kind::ok}, std::span{}}; } auto bytes_recv = ::recv(m_socket.native_handle(), buffer.data(), buffer.size(), 0); if (bytes_recv > 0) { // Ok, we've received some data. - return {recv_status::ok, std::span{buffer.data(), static_cast(bytes_recv)}}; + return { + io_status{io_status::kind::ok}, + std::span{buffer.data(), static_cast(bytes_recv)}}; } if (bytes_recv == 0) { // On TCP stream sockets 0 indicates the connection has been closed by the peer. - return {recv_status::closed, std::span{}}; + return {io_status{io_status::kind::closed}, std::span{}}; } // Report the error to the user. - return {static_cast(errno), std::span{}}; + return {make_io_status_from_native(errno), std::span{}}; + } + + auto write_some( + std::span buffer, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + -> coro::task>> + { + auto poll_status = co_await poll(poll_op::write, timeout); + if (poll_status != poll_status::write) + { + co_return std::pair{make_io_status_poll_status(poll_status), std::span{}}; + } + co_return send(buffer); + } + template + auto write_some(const buffer_type& buffer, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + -> coro::task>> + { + return write_some(std::as_bytes(std::span{buffer}), timeout); + } + + auto write_all( + std::span buffer, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + -> coro::task>> + { + const auto start_time = std::chrono::steady_clock::now(); + std::span remaining = buffer; + + // Trying to send something without polling + // { + // auto [status, unsent] = send(buffer); + // remaining = unsent; + // + // if (!status.is_ok() && status.type != io_status::kind::try_again) + // { + // co_return {status, remaining}; + // } + // } + + while (!remaining.empty()) + { + std::chrono::milliseconds remaining_timeout{0}; + if (timeout.count() > 0) + { + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_time); + + if (elapsed >= timeout) + { + co_return {io_status{io_status::kind::timeout}, remaining}; + } + remaining_timeout = timeout - elapsed; + } + + auto [status, unsent_span] = co_await write_some(remaining, remaining_timeout); + remaining = unsent_span; + + if (!status.is_ok()) + { + co_return {status, remaining}; + } + } + + co_return {io_status{io_status::kind::ok}, {}}; + } + + template + auto write_all(const buffer_type& buffer, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + -> coro::task>> + { + return write_all(std::as_bytes(std::span{buffer}), timeout); } /** @@ -125,23 +254,25 @@ class client template< concepts::const_buffer buffer_type, typename element_type = typename concepts::const_buffer_traits::element_type> - auto send(const buffer_type& buffer) -> std::pair> + auto send(const buffer_type& buffer) -> std::pair> { // If the user requested zero bytes, just return. if (buffer.empty()) { - return {send_status::ok, std::span{buffer.data(), buffer.size()}}; + return {io_status{io_status::kind::ok}, std::span{buffer.data(), buffer.size()}}; } auto bytes_sent = ::send(m_socket.native_handle(), buffer.data(), buffer.size(), 0); if (bytes_sent >= 0) { // Some or all of the bytes were written. - return {send_status::ok, std::span{buffer.data() + bytes_sent, buffer.size() - bytes_sent}}; + return { + io_status{io_status::kind::ok}, + std::span{buffer.data() + bytes_sent, buffer.size() - bytes_sent}}; } // Due to the error none of the bytes were written. - return {static_cast(errno), std::span{buffer.data(), buffer.size()}}; + return {make_io_status_from_native(errno), std::span{buffer.data(), buffer.size()}}; } private: diff --git a/src/net/io_status.cpp b/src/net/io_status.cpp new file mode 100644 index 00000000..9d40068e --- /dev/null +++ b/src/net/io_status.cpp @@ -0,0 +1,120 @@ +#include "coro/net/io_status.hpp" +#include + +#if defined(_WIN32) + #ifndef WIN32_LEAN_AND_MEAN + #define WIN32_LEAN_AND_MEAN + #endif + #include + #include +#endif + +std::string coro::net::io_status::message() const +{ + if (not is_native()) + { + switch (type) + { + case kind::ok: + return "Success"; + case kind::closed: + return "Connection closed by peer"; + case kind::connection_reset: + return "Connection reset by peer"; + case kind::connection_refused: + return "Connection refused by target host"; + case kind::timeout: + return "Operation timed out"; + case kind::try_again: + return "try_again"; + case kind::polling_error: + return "polling_error"; + case kind::cancelled: + return "cancelled"; + case kind::native: + return "native"; + } + } + + if (native_code == 0) + { + return "Success"; + } + +#if defined(_WIN32) + char* buffer = nullptr; + size_t size = FormatMessageA( + FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, + nullptr, + static_cast(native_code), + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + reinterpret_cast(&buffer), + 0, + nullptr); + + if (size > 0 && buffer) + { + std::string msg(buffer, size); + LocalFree(buffer); + return msg; + } +#else // Linux/BSD + try + { + return std::system_category().message(native_code); + } + catch (...) + { + return "Unknown system error" + std::to_string(native_code) + ")"; + } +#endif +} + +auto coro::net::make_io_status_from_native(int native_code) -> coro::net::io_status +{ +#if defined(_WIN32) + #error "TODO: WIN32" +#else // Linux/BSD + using kind = io_status::kind; + kind type; + switch (native_code) + { + case 0: + type = kind::ok; + break; + case EOF: + type = kind::closed; + break; + case ECONNREFUSED: + type = kind::connection_refused; + break; + case ECONNRESET: + type = kind::connection_reset; + break; + case EAGAIN: + type = kind::try_again; + break; + default: + type = kind::native; + break; + } + return coro::net::io_status{.type = type, .native_code = native_code}; +#endif +} +auto coro::net::make_io_status_poll_status(coro::poll_status status) -> coro::net::io_status +{ + switch (status) + { + case poll_status::read: + case poll_status::write: + return io_status{io_status::kind::ok}; + case poll_status::timeout: + return io_status{io_status::kind::timeout}; + case poll_status::error: + return io_status{io_status::kind::polling_error}; + case poll_status::closed: + return io_status{io_status::kind::closed}; + case poll_status::cancelled: + return io_status{io_status::kind::cancelled}; + } +} diff --git a/test/net/test_tcp_server.cpp b/test/net/test_tcp_server.cpp index c8c9e7c2..8c5fc317 100644 --- a/test/net/test_tcp_server.cpp +++ b/test/net/test_tcp_server.cpp @@ -32,28 +32,15 @@ TEST_CASE("tcp_server ping server", "[tcp_server]") auto cstatus = co_await client.connect(); REQUIRE(cstatus == coro::net::connect_status::connected); - // Skip polling for write, should really only poll if the write is partial, shouldn't be - // required for this test. - std::cerr << "client send()\n"; - auto [sstatus, remaining] = client.send(client_msg); - REQUIRE(sstatus == coro::net::send_status::ok); + std::cerr << "client write_all()\n"; + auto [sstatus, remaining] = co_await client.write_all(client_msg); + REQUIRE(sstatus.is_ok()); REQUIRE(remaining.empty()); - // Poll for the server's response. - std::cerr << "client poll(read)\n"; - auto pstatus = co_await client.poll(coro::poll_op::read); - if (pstatus != coro::poll_status::read) - { - REQUIRE_THREAD_SAFE(pstatus == coro::poll_status::closed); - // the socket has been closed - co_return; - } - REQUIRE(pstatus == coro::poll_status::read); - std::string buffer(256, '\0'); - std::cerr << "client recv()\n"; - auto [rstatus, rspan] = client.recv(buffer); - REQUIRE(rstatus == coro::net::recv_status::ok); + std::cerr << "client read_some()\n"; + auto [rstatus, rspan] = co_await client.read_some(buffer); + REQUIRE(rstatus.is_ok()); REQUIRE(rspan.size() == server_msg.length()); buffer.resize(rspan.size()); REQUIRE(buffer == server_msg); @@ -77,23 +64,19 @@ TEST_CASE("tcp_server ping server", "[tcp_server]") auto client = server.accept(); REQUIRE(client.socket().is_valid()); - // Poll for client request. - std::cerr << "server poll(read)\n"; - pstatus = co_await client.poll(coro::poll_op::read); - REQUIRE(pstatus == coro::poll_status::read); std::string buffer(256, '\0'); - std::cerr << "server recv()\n"; - auto [rstatus, rspan] = client.recv(buffer); - REQUIRE(rstatus == coro::net::recv_status::ok); + std::cerr << "server read_some()\n"; + auto [rstatus, rspan] = co_await client.read_some(buffer); + REQUIRE(rstatus.is_ok()); REQUIRE(rspan.size() == client_msg.size()); buffer.resize(rspan.size()); REQUIRE(buffer == client_msg); // Respond to client. std::cerr << "server send()\n"; - auto [sstatus, remaining] = client.send(server_msg); - REQUIRE(sstatus == coro::net::send_status::ok); + auto [wstatus, remaining] = co_await client.write_some(server_msg); + REQUIRE(wstatus.is_ok()); REQUIRE(remaining.empty()); std::cerr << "server return\n"; @@ -148,7 +131,7 @@ TEST_CASE("tcp_server concurrent polling on the same socket", "[tcp_server]") auto poll_status = co_await write_client.poll(coro::poll_op::write); REQUIRE(poll_status == coro::poll_status::write); auto [send_status, r] = write_client.send(remaining); - REQUIRE(send_status == coro::net::send_status::ok); + REQUIRE(send_status.is_ok()); if (r.empty()) { From c788bdc6b0cff19a7de015a4e8159cee46c0b804 Mon Sep 17 00:00:00 2001 From: PyXiion Date: Sun, 25 Jan 2026 21:32:06 +0300 Subject: [PATCH 2/4] Update README tcp example, tests & add docs --- README.md | 424 ++++++++++++++++++++---------- examples/coro_tcp_echo_server.cpp | 19 +- include/coro/net/tcp/client.hpp | 164 ++++++++---- include/coro/net/tcp/server.hpp | 31 ++- src/net/tcp/server.cpp | 2 +- test/bench.cpp | 92 +++---- test/net/test_tcp_server.cpp | 52 ++-- 7 files changed, 477 insertions(+), 307 deletions(-) diff --git a/README.md b/README.md index 26a41a6b..77d26b5a 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ **libcoro** is meant to provide low level coroutine constructs for building larger applications. ## Overview + * C++20 coroutines! * Modern Safe C++20 API * Higher level coroutine constructs @@ -40,7 +41,7 @@ - Uses libc-ares - [coro::net::tcp::client](#io_scheduler) - [coro::net::tcp::server](#io_scheduler) - * [Example TCP/HTTP Echo Server](#tcp_echo_server) + * [Example TCP/HTTP Echo Server](#tcp_echo_server) - coro::net::tls::client (OpenSSL) - coro::net::tls::server (OpenSSL) - coro::net::udp::peer @@ -54,17 +55,30 @@ ## Usage ### A note on co_await and threads -It's important to note with coroutines that _any_ `co_await` has the potential to switch the underlying thread that is executing the currently executing coroutine if the scheduler used has more than 1 thread. In general this shouldn't affect the way any user of the library would write code except for `thread_local`. Usage of `thread_local` should be extremely careful and _never_ used across any `co_await` boundary do to thread switching and work stealing on libcoro's schedulers. The only way this is safe is by using a `coro::thread_pool` with 1 thread or an inline `io_scheduler` which also only has 1 thread. + +It's important to note with coroutines that _any_ `co_await` has the potential to switch the underlying thread that is +executing the currently executing coroutine if the scheduler used has more than 1 thread. In general this shouldn't +affect the way any user of the library would write code except for `thread_local`. Usage of `thread_local` should be +extremely careful and _never_ used across any `co_await` boundary do to thread switching and work stealing on libcoro's +schedulers. The only way this is safe is by using a `coro::thread_pool` with 1 thread or an inline `io_scheduler` which +also only has 1 thread. ### A note on lambda captures + [C++ Core Guidelines - CP.51: Do no use capturing lambdas that are coroutines](https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rcoro-capture) -The recommendation is to not use lambda captures and instead pass any data into the coroutine via its function arguments by value to guarantee the argument lifetimes. Lambda captures will be destroyed at the coroutines first suspension point so if they are used past that point it will result in a use after free bug. +The recommendation is to not use lambda captures and instead pass any data into the coroutine via its function arguments +by value to guarantee the argument lifetimes. Lambda captures will be destroyed at the coroutines first suspension point +so if they are used past that point it will result in a use after free bug. -If you must use lambda captures with your coroutines then libcoro offers [coro::invoke](#invoke) to create a stable coroutine frame to hold the captures for the duration of the user's coroutine. +If you must use lambda captures with your coroutines then libcoro offers [coro::invoke](#invoke) to create a stable +coroutine frame to hold the captures for the duration of the user's coroutine. ### sync_wait -The `sync_wait` construct is meant to be used outside a coroutine context to block the calling thread until the coroutine has completed. The coroutine can be executed on the calling thread or scheduled on one of libcoro's schedulers. + +The `sync_wait` construct is meant to be used outside a coroutine context to block the calling thread until the +coroutine has completed. The coroutine can be executed on the calling thread or scheduled on one of libcoro's +schedulers. ```C++ #include @@ -102,6 +116,7 @@ int main() ``` Expected output: + ```bash $ ./examples/coro_sync_wait Inline Result = 10 @@ -109,7 +124,11 @@ Offload Result = 20 ``` ### when_all -The `when_all` construct can be used within coroutines to await a set of tasks, or it can be used outside coroutine context in conjunction with `sync_wait` to await multiple tasks. Each task passed into `when_all` will initially be executed serially by the calling thread so it is recommended to offload the tasks onto an executor like `coro::thread_pool` or `coro::io_scheduler` so they can execute in parallel. + +The `when_all` construct can be used within coroutines to await a set of tasks, or it can be used outside coroutine +context in conjunction with `sync_wait` to await multiple tasks. Each task passed into `when_all` will initially be +executed serially by the calling thread so it is recommended to offload the tasks onto an executor like +`coro::thread_pool` or `coro::io_scheduler` so they can execute in parallel. ```C++ #include @@ -167,6 +186,7 @@ int main() ``` Expected output: + ```bash $ ./examples/coro_when_all 2 @@ -178,7 +198,12 @@ first: 1.21 second: 20 ``` ### when_any -The `when_any` construct can be used within coroutines to await a set of tasks and only return the result of the first task that completes. This can also be used outside of a coroutine context in conjunction with `sync_wait` to await the first result. Each task passed into `when_any` will initially be executed serially by the calling thread so it is recommended to offload the tasks onto an executor like `coro::thread_pool` or `coro::io_scheduler` so they can execute in parallel. + +The `when_any` construct can be used within coroutines to await a set of tasks and only return the result of the first +task that completes. This can also be used outside of a coroutine context in conjunction with `sync_wait` to await the +first result. Each task passed into `when_any` will initially be executed serially by the calling thread so it is +recommended to offload the tasks onto an executor like `coro::thread_pool` or `coro::io_scheduler` so they can execute +in parallel. ```C++ #include @@ -232,16 +257,18 @@ int main() ``` Expected output: + ```bash $ ./examples/coro_when_any result = 1 result = -1 ``` - ### task -The `coro::task` is the main coroutine building block within `libcoro`. Use task to create your coroutines and `co_await` or `co_yield` tasks within tasks to perform asynchronous operations, lazily evaluation or even spreading work out across a `coro::thread_pool`. Tasks are lightweight and only begin execution upon awaiting them. +The `coro::task` is the main coroutine building block within `libcoro`. Use task to create your coroutines and +`co_await` or `co_yield` tasks within tasks to perform asynchronous operations, lazily evaluation or even spreading work +out across a `coro::thread_pool`. Tasks are lightweight and only begin execution upon awaiting them. ```C++ #include @@ -325,6 +352,7 @@ int main() ``` Expected output: + ```bash $ ./examples/coro_task Task1 output = 9 @@ -336,6 +364,7 @@ Answer to everything = 42 ``` ### generator + The `coro::generator` construct is a coroutine which can generate one or more values. ```C++ @@ -376,13 +405,18 @@ int main() ``` Expected output: + ```bash $ ./examples/coro_generator 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, ``` ### event -The `coro::event` is a thread safe async tool to have 1 or more waiters suspend for an event to be set before proceeding. The implementation of event currently will resume execution of all waiters on the thread that sets the event. If the event is already set when a waiter goes to wait on the thread they will simply continue executing with no suspend or wait time incurred. + +The `coro::event` is a thread safe async tool to have 1 or more waiters suspend for an event to be set before +proceeding. The implementation of event currently will resume execution of all waiters on the thread that sets the +event. If the event is already set when a waiter goes to wait on the thread they will simply continue executing with no +suspend or wait time incurred. ```C++ #include @@ -416,6 +450,7 @@ int main() ``` Expected output: + ```bash $ ./examples/coro_event task 1 is waiting on the event... @@ -428,7 +463,9 @@ task 1 event triggered, now resuming. ``` ### latch -The `coro::latch` is a thread safe async tool to have 1 waiter suspend until all outstanding events have completed before proceeding. + +The `coro::latch` is a thread safe async tool to have 1 waiter suspend until all outstanding events have completed +before proceeding. ```C++ #include @@ -491,6 +528,7 @@ int main() ``` Expected output: + ```bash $ ./examples/coro_latch latch task is now waiting on all children tasks... @@ -508,11 +546,26 @@ latch task dependency tasks completed, resuming. ``` ### mutex -The `coro::mutex` is a thread safe async tool to protect critical sections and only allow a single thread to execute the critical section at any given time. Mutexes that are uncontended are a simple CAS operation with a memory fence 'acquire' to behave similar to `std::mutex`. If the lock is contended then the thread will add itself to a LIFO queue of waiters and yield excution to allow another coroutine to process on that thread while it waits to acquire the lock. - -Its important to note that upon releasing the mutex that thread unlocking the mutex will immediately start processing the next waiter in line for the `coro::mutex` (if there are any waiters), the mutex is only unlocked/released once all waiters have been processed. This guarantees fair execution in a reasonbly FIFO manner, but it also means all coroutines that stack in the waiter queue will end up shifting to the single thread that is executing all waiting coroutines. It is possible to manually reschedule after the critical section onto a thread pool to re-distribute the work if this is a concern in your use case. -The suspend waiter queue is LIFO, however the worker that current holds the mutex will periodically 'acquire' the current LIFO waiter list to process those waiters when its internal list becomes empty. This effectively resets the suspended waiter list to empty and the worker holding the mutex will work through the newly acquired LIFO queue of waiters. It would be possible to reverse this list to be as fair as possible, however not reversing the list should result is better throughput at possibly the cost of some latency for the first suspended waiters on the 'current' LIFO queue. Reversing the list, however, would introduce latency for all queue waiters since its done everytime the LIFO queue is swapped. +The `coro::mutex` is a thread safe async tool to protect critical sections and only allow a single thread to execute the +critical section at any given time. Mutexes that are uncontended are a simple CAS operation with a memory fence ' +acquire' to behave similar to `std::mutex`. If the lock is contended then the thread will add itself to a LIFO queue of +waiters and yield excution to allow another coroutine to process on that thread while it waits to acquire the lock. + +Its important to note that upon releasing the mutex that thread unlocking the mutex will immediately start processing +the next waiter in line for the `coro::mutex` (if there are any waiters), the mutex is only unlocked/released once all +waiters have been processed. This guarantees fair execution in a reasonbly FIFO manner, but it also means all coroutines +that stack in the waiter queue will end up shifting to the single thread that is executing all waiting coroutines. It is +possible to manually reschedule after the critical section onto a thread pool to re-distribute the work if this is a +concern in your use case. + +The suspend waiter queue is LIFO, however the worker that current holds the mutex will periodically 'acquire' the +current LIFO waiter list to process those waiters when its internal list becomes empty. This effectively resets the +suspended waiter list to empty and the worker holding the mutex will work through the newly acquired LIFO queue of +waiters. It would be possible to reverse this list to be as fair as possible, however not reversing the list should +result is better throughput at possibly the cost of some latency for the first suspended waiters on the 'current' LIFO +queue. Reversing the list, however, would introduce latency for all queue waiters since its done everytime the LIFO +queue is swapped. ```C++ #include @@ -559,18 +612,25 @@ int main() Expected output, note that the output will vary from run to run based on how the thread pool workers are scheduled and in what order they acquire the mutex lock: + ```bash $ ./examples/coro_mutex 1, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 37, 36, 35, 40, 39, 38, 41, 42, 43, 44, 46, 47, 48, 45, 49, 50, 51, 52, 53, 54, 55, 57, 56, 59, 58, 61, 60, 62, 63, 65, 64, 67, 66, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 83, 82, 84, 85, 86, 87, 88, 89, 91, 90, 92, 93, 94, 95, 96, 97, 98, 99, 100, ``` -Its very easy to see the LIFO 'atomic' queue in action in the beginning where 22->2 are immediately suspended waiting to acquire the mutex. +Its very easy to see the LIFO 'atomic' queue in action in the beginning where 22->2 are immediately suspended waiting to +acquire the mutex. ### shared_mutex -The `coro::shared_mutex` is a thread safe async tool to allow for multiple shared users at once but also exclusive access. The lock is acquired strictly in a FIFO manner in that if the lock is currenty held by shared users and an exclusive attempts to lock, the exclusive waiter will suspend until all the _current_ shared users finish using the lock. Any new users that attempt to lock the mutex in a shared state once there is an exclusive waiter will also wait behind the exclusive waiter. This prevents the exclusive waiter from being starved. -The `coro::shared_mutex` requires a `executor_type` when constructed to be able to resume multiple shared waiters when an exclusive lock is released. This allows for all of the pending shared waiters to be resumed concurrently. +The `coro::shared_mutex` is a thread safe async tool to allow for multiple shared users at once but also exclusive +access. The lock is acquired strictly in a FIFO manner in that if the lock is currenty held by shared users and an +exclusive attempts to lock, the exclusive waiter will suspend until all the _current_ shared users finish using the +lock. Any new users that attempt to lock the mutex in a shared state once there is an exclusive waiter will also wait +behind the exclusive waiter. This prevents the exclusive waiter from being starved. +The `coro::shared_mutex` requires a `executor_type` when constructed to be able to resume multiple shared waiters when +an exclusive lock is released. This allows for all of the pending shared waiters to be resumed concurrently. ```C++ #include @@ -641,7 +701,9 @@ int main() } ``` -Example output, notice how the (4,5,6) shared tasks attempt to acquire the lock in a shared state but are blocked behind the exclusive waiter until it completes: +Example output, notice how the (4,5,6) shared tasks attempt to acquire the lock in a shared state but are blocked behind +the exclusive waiter until it completes: + ```bash $ ./examples/coro_shared_mutex shared task 1 lock_shared() @@ -669,7 +731,13 @@ shared task 6 unlock_shared() ``` ### semaphore -The `coro::semaphore` is a thread safe async tool to protect a limited number of resources by only allowing so many consumers to acquire the resources a single time. The `coro::semaphore` also has a maximum number of resources denoted by its constructor. This means if a resource is produced or released when the semaphore is at its maximum resource availability then the release operation will await for space to become available. This is useful for a ringbuffer type situation where the resources are produced and then consumed, but will have no effect on a semaphores usage if there is a set known quantity of resources to start with and are acquired and then released back. + +The `coro::semaphore` is a thread safe async tool to protect a limited number of resources by only allowing so many +consumers to acquire the resources a single time. The `coro::semaphore` also has a maximum number of resources denoted +by its constructor. This means if a resource is produced or released when the semaphore is at its maximum resource +availability then the release operation will await for space to become available. This is useful for a ringbuffer type +situation where the resources are produced and then consumed, but will have no effect on a semaphores usage if there is +a set known quantity of resources to start with and are acquired and then released back. ```C++ #include @@ -713,13 +781,19 @@ int main() ``` Expected output, note that there is no lock around the `std::cout` so some of the output isn't perfect. + ```bash $ ./examples/coro_semaphore 1, 23, 25, 24, 22, 27, 28, 29, 21, 20, 19, 18, 17, 14, 31, 30, 33, 32, 41, 40, 37, 39, 38, 36, 35, 34, 43, 46, 47, 48, 45, 42, 44, 26, 16, 15, 13, 52, 54, 55, 53, 49, 51, 57, 58, 50, 62, 63, 61, 60, 59, 56, 12, 11, 8, 10, 9, 7, 6, 5, 4, 3, 642, , 66, 67, 6568, , 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, ``` ### ring_buffer -The `coro::ring_buffer` is thread safe async multi-producer multi-consumer statically sized ring buffer. Producers that try to produce a value when the ring buffer is full will suspend until space is available. Consumers that try to consume a value when the ring buffer is empty will suspend until space is available. All waiters on the ring buffer for producing or consuming are resumed in a LIFO manner when their respective operation becomes available. + +The `coro::ring_buffer` is thread safe async multi-producer multi-consumer statically sized ring +buffer. Producers that try to produce a value when the ring buffer is full will suspend until space is available. +Consumers that try to consume a value when the ring buffer is empty will suspend until space is available. All waiters +on the ring buffer for producing or consuming are resumed in a LIFO manner when their respective operation becomes +available. ```C++ #include @@ -796,6 +870,7 @@ int main() ``` Expected output: + ```bash $ ./examples/coro_ring_buffer (id=3, v=1), (id=2, v=2), (id=1, v=3), (id=0, v=4), (id=3, v=5), (id=2, v=6), (id=1, v=7), (id=0, v=8), (id=3, v=9), (id=2, v=10), (id=1, v=11), (id=0, v=12), (id=3, v=13), (id=2, v=14), (id=1, v=15), (id=0, v=16), (id=3, v=17), (id=2, v=18), (id=1, v=19), (id=0, v=20), (id=3, v=21), (id=2, v=22), (id=1, v=23), (id=0, v=24), (id=3, v=25), (id=2, v=26), (id=1, v=27), (id=0, v=28), (id=3, v=29), (id=2, v=30), (id=1, v=31), (id=0, v=32), (id=3, v=33), (id=2, v=34), (id=1, v=35), (id=0, v=36), (id=3, v=37), (id=2, v=38), (id=1, v=39), (id=0, v=40), (id=3, v=41), (id=2, v=42), (id=0, v=44), (id=1, v=43), (id=3, v=45), (id=2, v=46), (id=0, v=47), (id=3, v=48), (id=2, v=49), (id=0, v=50), (id=3, v=51), (id=2, v=52), (id=0, v=53), (id=3, v=54), (id=2, v=55), (id=0, v=56), (id=3, v=57), (id=2, v=58), (id=0, v=59), (id=3, v=60), (id=1, v=61), (id=2, v=62), (id=0, v=63), (id=3, v=64), (id=1, v=65), (id=2, v=66), (id=0, v=67), (id=3, v=68), (id=1, v=69), (id=2, v=70), (id=0, v=71), (id=3, v=72), (id=1, v=73), (id=2, v=74), (id=0, v=75), (id=3, v=76), (id=1, v=77), (id=2, v=78), (id=0, v=79), (id=3, v=80), (id=2, v=81), (id=1, v=82), (id=0, v=83), (id=3, v=84), (id=2, v=85), (id=1, v=86), (id=0, v=87), (id=3, v=88), (id=2, v=89), (id=1, v=90), (id=0, v=91), (id=3, v=92), (id=2, v=93), (id=1, v=94), (id=0, v=95), (id=3, v=96), (id=2, v=97), (id=1, v=98), (id=0, v=99), (id=3, v=100), @@ -807,7 +882,11 @@ consumer 3 shutting down, stop signal received ``` ### queue -The `coro::queue` is thread safe async multi-producer multi-consumer queue. Producing into the queue is not an asynchronous operation, it will either immediately use a consumer that is awaiting on `pop()` to process the element, or if no consumer is available place the element into the queue. All consume waiters on the queue are resumed in a LIFO manner when an element becomes available to consume. + +The `coro::queue` is thread safe async multi-producer multi-consumer queue. Producing into the queue is +not an asynchronous operation, it will either immediately use a consumer that is awaiting on `pop()` to process the +element, or if no consumer is available place the element into the queue. All consume waiters on the queue are resumed +in a LIFO manner when an element becomes available to consume. ```C++ #include @@ -882,6 +961,7 @@ int main() ``` Expected output: + ```bash $ ./examples/coro_queue consumed 0 @@ -912,9 +992,13 @@ consumed 4 ``` ### condition_variable -`coro:condition_variable` allows for tasks to await on the condition until notified with an optional predicate and timeout and stop token. The API for `coro::condition_variable` mostly matches `std::condition_variable`. -NOTE: It is important to *not* hold the `coro::scoped_lock` when calling `notify_one()` or `notify_all()`, this differs from `std::condition_variable` which allows it but doesn't require it. `coro::condition_variable` will deadlock in this scenario based on how coroutines work vs threads. +`coro:condition_variable` allows for tasks to await on the condition until notified with an optional predicate and +timeout and stop token. The API for `coro::condition_variable` mostly matches `std::condition_variable`. + +NOTE: It is important to *not* hold the `coro::scoped_lock` when calling `notify_one()` or `notify_all()`, this differs +from `std::condition_variable` which allows it but doesn't require it. `coro::condition_variable` will deadlock in this +scenario based on how coroutines work vs threads. ```C++ #include @@ -1018,6 +1102,7 @@ int main() ``` Expected output: + ```bash $ ./examples/coro_condition_variable 0 predicate condition = 0 # every call to cv.wait() will invoke the predicate to see if they are ready @@ -1050,7 +1135,10 @@ ss.request_stop() # request to stop, wakeup all waiters an ``` ### invoke -`coro::invoke -> awaitable` takes a coroutine functor and its arguments, invokes the functor as a coroutine and awaits its result. This is useful for invoking lambda coroutines that have lambdas that need a stable coroutine frame to last the duration of the invocable coroutine. + +`coro::invoke -> awaitable` takes a coroutine functor and its arguments, invokes the +functor as a coroutine and awaits its result. This is useful for invoking lambda coroutines that have lambdas that need +a stable coroutine frame to last the duration of the invocable coroutine. ```C++ #include @@ -1080,20 +1168,36 @@ int main() ``` Example output: + ```bash $ ./examples/coro_invoke 6 ``` ### thread_pool -`coro::thread_pool` is a statically sized pool of worker threads to execute scheduled coroutines from a FIFO queue. One way to schedule a coroutine on a thread pool is to use the pool's `schedule()` function which should be `co_awaited` inside the coroutine to transfer the execution from the current thread to a thread pool worker thread. Its important to note that scheduling will first place the coroutine into the FIFO queue and will be picked up by the first available thread in the pool, e.g. there could be a delay if there is a lot of work queued up. + +`coro::thread_pool` is a statically sized pool of worker threads to execute scheduled coroutines from a FIFO queue. One +way to schedule a coroutine on a thread pool is to use the pool's `schedule()` function which should be `co_awaited` +inside the coroutine to transfer the execution from the current thread to a thread pool worker thread. Its important to +note that scheduling will first place the coroutine into the FIFO queue and will be picked up by the first available +thread in the pool, e.g. there could be a delay if there is a lot of work queued up. #### Ways to schedule tasks onto a `coro::thread_pool` -* `coro::thread_pool::schedule()` Use `co_await` on this method inside a coroutine to transfer the tasks execution to the `coro::thread_pool`. -* `coro::thread_pool::schedule(coro::task task) -> coro::task` schedules the task on the `coro::thread_pool` and then returns the result in a task that must be awaited. This is useful if you want to schedule work on the `coro::thread_pool` and want to wait for the result inline. -* `coro::thread_pool::spawn_detached(coro::task&& task)` Spawns the task to be detached and owned by the `coro::thread_pool`, use this if you want to fire and forget the task, the `coro::thread_pool` will maintain the task's lifetime. -* `coro::thread_pool::spawn_joinable(coro::task&& task) -> coro::task` Spawns the task to be started immediately but can be joined at later time, use this if you want to start the task immediately but want to join it later. -* `coro::task_group(coro::task&& task | range)` schedules the task(s) on the `coro::thread_pool`. Use this when you want to share a `coro::thread_pool` while monitoring the progress of a group of tasks. + +* `coro::thread_pool::schedule()` Use `co_await` on this method inside a coroutine to transfer the tasks execution to + the `coro::thread_pool`. +* `coro::thread_pool::schedule(coro::task task) -> coro::task` schedules the task on the `coro::thread_pool` and + then returns the result in a task that must be awaited. This is useful if you want to schedule work on the + `coro::thread_pool` and want to wait for the result inline. +* `coro::thread_pool::spawn_detached(coro::task&& task)` Spawns the task to be detached and owned by the + `coro::thread_pool`, use this if you want to fire and forget the task, the `coro::thread_pool` will maintain the + task's lifetime. +* `coro::thread_pool::spawn_joinable(coro::task&& task) -> coro::task` Spawns the task to be started + immediately but can be joined at later time, use this if you want to start the task immediately but want to join it + later. +* `coro::task_group(coro::task&& task | range)` schedules the task(s) on the + `coro::thread_pool`. Use this when you want to share a `coro::thread_pool` while monitoring the progress of a group of + tasks. ```C++ #include @@ -1177,6 +1281,7 @@ int main() ``` Example output (will vary based on threads): + ```bash $ ./examples/coro_thread_pool thread pool worker 0 is starting up. @@ -1201,31 +1306,62 @@ thread pool worker 0 is shutting down. ``` ### io_scheduler + `coro::io_scheduler` is a i/o event scheduler execution context that can use two methods of task processing: * A background `coro::thread_pool` * Inline task processing on the `coro::io_scheduler`'s event loop -Using a background `coro::thread_pool` will default to using `(std::thread::hardware_concurrency() - 1)` threads to process tasks. This processing strategy is best for longer tasks that would block the i/o scheduler or for tasks that are latency sensitive. +Using a background `coro::thread_pool` will default to using `(std::thread::hardware_concurrency() - 1)` threads to +process tasks. This processing strategy is best for longer tasks that would block the i/o scheduler or for tasks that +are latency sensitive. -Using the inline processing strategy will have the event loop i/o thread process the tasks inline on that thread when events are received. This processing strategy is best for shorter task that will not block the i/o thread for long or for pure throughput by using thread per core architecture, e.g. spin up an inline i/o scheduler per core and inline process tasks on each scheduler. +Using the inline processing strategy will have the event loop i/o thread process the tasks inline on that thread when +events are received. This processing strategy is best for shorter task that will not block the i/o thread for long or +for pure throughput by using thread per core architecture, e.g. spin up an inline i/o scheduler per core and inline +process tasks on each scheduler. -The `coro::io_scheduler` can use a dedicated spawned thread for processing events that are ready or it can be maually driven via its `process_events()` function for integration into existing event loops. By default i/o schedulers will spawn a dedicated event thread and use a thread pool to process tasks. +The `coro::io_scheduler` can use a dedicated spawned thread for processing events that are ready or it can be maually +driven via its `process_events()` function for integration into existing event loops. By default i/o schedulers will +spawn a dedicated event thread and use a thread pool to process tasks. #### Ways to schedule tasks onto a `coro::io_scheduler` -* `coro::io_scheduler::schedule()` Use `co_await` on this method inside a coroutine to transfer the tasks execution to the `coro::io_scheduler`. -* `coro::io_scheduler::schedule(coro::task task) -> coro::task` schedules the task on the `coro::io_scheduler` and then returns the result in a task that must be awaited. This is useful if you want to schedule work on the `coro::io_scheduler` and want to wait for the result. -* `coro::io_scheduler::schedule(std::stop_source st, coro::task task, std::chrono::duration timeout) -> coro::expected` schedules the task on the `coro::io_scheduler` and then returns the result in a task that must be awaited. That task will then either return the completed task's value if it completes before the timeout, or a return value denoted the task timed out. If the task times out the `std::stop_source.request_stop()` will be invoked so the task can check for it and stop executing. This must be done by the user, the `coro::io_scheduler` cannot stop the execution of the task but it is able through the `std::stop_source` to signal to the task it should stop executing. -* `coro::io_scheduler::scheduler_after(std::chrono::milliseconds amount)` schedules the current task to be rescheduled after a specified amount of time has passed. -* `coro::io_scheduler::schedule_at(std::chrono::steady_clock::time_point time)` schedules the current task to be rescheduled at the specified timepoint. -* `coro::io_scheduler::yield()` will yield execution of the current task and resume after other tasks have had a chance to execute. This effectively places the task at the back of the queue of waiting tasks. -* `coro::io_scheduler::yield_for(std::chrono::milliseconds amount)` will yield for the given amount of time and then reschedule the task. This is a yield for at least this much time since it is placed in the waiting execution queue and might take additional time to start executing again. -* `coro::io_scheduler::yield_until(std::chrono::steady_clock::time_point time)` will yield execution until the time point. -* `coro::io_scheduler::spawn_detached(coro::task)` Spawns the task to be detached and owned by the `coro::io_scheduler`, use this if you want to fire and forget the task, the `coro::io_scheduler` will maintain the task's lifetime. -* `coro::io_scheduler::spawn_joinable(coro::task&& task) -> coro::task` Spawns the task to be started immediately but can be joined at later time, use this if you want to start the task immediately but want to join it later. -* `coro::task_group(coro::task&& task | range>)` schedules the task(s) on the `coro::io_scheduler`. Use this when you want to share a `coro::io_scheduler` while monitoring the progress of a subset of tasks. - -The example provided here shows an i/o scheduler that spins up a basic `coro::net::tcp::server` and a `coro::net::tcp::client` that will connect to each other and then send a request and a response. + +* `coro::io_scheduler::schedule()` Use `co_await` on this method inside a coroutine to transfer the tasks execution to + the `coro::io_scheduler`. +* `coro::io_scheduler::schedule(coro::task task) -> coro::task` schedules the task on the `coro::io_scheduler` and + then returns the result in a task that must be awaited. This is useful if you want to schedule work on the + `coro::io_scheduler` and want to wait for the result. +* +`coro::io_scheduler::schedule(std::stop_source st, coro::task task, std::chrono::duration timeout) -> coro::expected` +schedules the task on the `coro::io_scheduler` and then returns the result in a task that must be awaited. That task +will then either return the completed task's value if it completes before the timeout, or a return value denoted the +task timed out. If the task times out the `std::stop_source.request_stop()` will be invoked so the task can check for it +and stop executing. This must be done by the user, the `coro::io_scheduler` cannot stop the execution of the task but it +is able through the `std::stop_source` to signal to the task it should stop executing. +* `coro::io_scheduler::scheduler_after(std::chrono::milliseconds amount)` schedules the current task to be rescheduled + after a specified amount of time has passed. +* `coro::io_scheduler::schedule_at(std::chrono::steady_clock::time_point time)` schedules the current task to be + rescheduled at the specified timepoint. +* `coro::io_scheduler::yield()` will yield execution of the current task and resume after other tasks have had a chance + to execute. This effectively places the task at the back of the queue of waiting tasks. +* `coro::io_scheduler::yield_for(std::chrono::milliseconds amount)` will yield for the given amount of time and then + reschedule the task. This is a yield for at least this much time since it is placed in the waiting execution queue and + might take additional time to start executing again. +* `coro::io_scheduler::yield_until(std::chrono::steady_clock::time_point time)` will yield execution until the time + point. +* `coro::io_scheduler::spawn_detached(coro::task)` Spawns the task to be detached and owned by the + `coro::io_scheduler`, use this if you want to fire and forget the task, the `coro::io_scheduler` will maintain the + task's lifetime. +* `coro::io_scheduler::spawn_joinable(coro::task&& task) -> coro::task` Spawns the task to be started + immediately but can be joined at later time, use this if you want to start the task immediately but want to join it + later. +* `coro::task_group(coro::task&& task | range>)` schedules the task(s) on the + `coro::io_scheduler`. Use this when you want to share a `coro::io_scheduler` while monitoring the progress of a subset + of tasks. + +The example provided here shows an i/o scheduler that spins up a basic `coro::net::tcp::server` and a +`coro::net::tcp::client` that will connect to each other and then send a request and a response. ```C++ #include @@ -1269,78 +1405,38 @@ int main() co_await scheduler->schedule(); // Wait for an incoming connection and accept it. - auto poll_status = co_await server.poll(); - if (poll_status != coro::poll_status::read) - { - co_return; // Handle error, see poll_status for detailed error states. + auto client = co_await server.accept(); + if (!client) { + co_return; // Handle error, see io_status for detailed error states. } - // Accept the incoming client connection. - auto client = server.accept(); - // Verify the incoming connection was accepted correctly. - if (!client.socket().is_valid()) + if (!client->socket().is_valid()) { co_return; // Handle error. } - // Now wait for the client message, this message is small enough it should always arrive - // with a single recv() call. - poll_status = co_await client.poll(coro::poll_op::read); - if (poll_status != coro::poll_status::read) - { - co_return; // Handle error. - } - - // Prepare a buffer and recv() the client's message. This function returns the recv() status - // as well as a span that overlaps the given buffer for the bytes that were read. This + // Prepare a buffer and read_some() the client's message. This function returns the operation status + // as well as a span that overlaps the given buffer for the bytes that were read. This // can be used to resize the buffer or work with the bytes without modifying the buffer at all. std::string request(256, '\0'); - auto [recv_status, recv_bytes] = client.recv(request); - if (recv_status != coro::net::recv_status::ok) + auto [read_status, read_bytes] = co_await client.read_some(request); + if (!read_status.is_ok()) { - co_return; // Handle error, see net::recv_status for detailed error states. + co_return; // Handle error, see net::io_status for detailed error states. } - request.resize(recv_bytes.size()); + request.resize(read_bytes.size()); std::cout << "server: " << request << "\n"; - // Make sure the client socket can be written to. - poll_status = co_await client.poll(coro::poll_op::write); - if (poll_status != coro::poll_status::write) - { - co_return; // Handle error. - } - // Send the server response to the client. - // This message is small enough that it will be sent in a single send() call, but to demonstrate - // how to use the 'remaining' portion of the send() result this is wrapped in a loop until - // all the bytes are sent. - std::string response = "Hello from server."; - std::span remaining = response; - do - { - // Optimistically send() prior to polling. - auto [send_status, r] = client.send(remaining); - if (send_status != coro::net::send_status::ok) - { - co_return; // Handle error, see net::send_status for detailed error states. - } - - if (r.empty()) - { - break; // The entire message has been sent. - } - - // Re-assign remaining bytes for the next loop iteration and poll for the socket to be - // able to be written to again. - remaining = r; - auto pstatus = co_await client.poll(coro::poll_op::write); - if (pstatus != coro::poll_status::write) - { - co_return; // Handle error. - } - } while (true); + // If the operation was successful unsent data will be empty. + std::string response = "Hello from server."; + auto [write_status, unsent_data] = co_await client.write_all(response); + + if (!write_status.is_ok()) { + co_return; // Handle error + } co_return; }; @@ -1358,19 +1454,15 @@ int main() // verify the number of bytes sent or received. // Connect to the server. - co_await client.connect(); - - // Make sure the client socket can be written to. - co_await client.poll(coro::poll_op::write); + co_await client.connect(/* localhost:8080 by default */); // Send the request data. - client.send(std::string_view{"Hello from client."}); + co_await client.write_all(std::string_view{"Hello from client."}); - // Wait for the response and receive it. - co_await client.poll(coro::poll_op::read); + // Receive the response. std::string response(256, '\0'); - auto [recv_status, recv_bytes] = client.recv(response); - response.resize(recv_bytes.size()); + auto [read_status, read_bytes] = co_await client.read_some(response); + response.resize(read_bytes.size()); std::cout << "client: " << response << "\n"; co_return; @@ -1382,6 +1474,7 @@ int main() ``` Example output: + ```bash $ ./examples/coro_io_scheduler io_scheduler::thread_pool worker 0 starting @@ -1395,7 +1488,9 @@ io_scheduler::process event thread stop ``` ### tcp_echo_server -See [examples/coro_tcp_echo_erver.cpp](./examples/coro_tcp_echo_server.cpp) for a basic TCP echo server implementation. You can use tools like `ab` to benchmark against this echo server. + +See [examples/coro_tcp_echo_erver.cpp](./examples/coro_tcp_echo_server.cpp) for a basic TCP echo server implementation. +You can use tools like `ab` to benchmark against this echo server. Using a `Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz`: @@ -1459,7 +1554,9 @@ Percentage of the requests served within a certain time (ms) ```` ### http_200_ok_server -See [examples/coro_http_200_ok_erver.cpp](./examples/coro_http_200_ok_server.cpp) for a basic HTTP 200 OK response server implementation. You can use tools like `wrk` or `autocannon` to benchmark against this HTTP 200 OK server. + +See [examples/coro_http_200_ok_erver.cpp](./examples/coro_http_200_ok_server.cpp) for a basic HTTP 200 OK response +server implementation. You can use tools like `wrk` or `autocannon` to benchmark against this HTTP 200 OK server. Using a `Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz`: @@ -1477,9 +1574,12 @@ Transfer/sec: 18.33MB ## Android Support -libcoro ships with an Android test harness that builds and runs the library on Android devices and emulators. This is intended to validate coroutine primitives on Android and to sanity-check networking/TLS integration using OpenSSL where available. +libcoro ships with an Android test harness that builds and runs the library on Android devices and emulators. This is +intended to validate coroutine primitives on Android and to sanity-check networking/TLS integration using OpenSSL where +available. ### Status at a glance + - Toolchain: Android Gradle Plugin 8.5.2, Gradle Wrapper, NDK r29 (29.0.13846066), CMake 3.22.1 - Minimum SDK: 24, Target SDK: 34 - ABIs: arm64-v8a, armeabi-v7a, x86, x86_64 (APK contains selected ABIs; CI builds per-ABI) @@ -1488,12 +1588,14 @@ libcoro ships with an Android test harness that builds and runs the library on A - Networking: enabled in Android builds (`LIBCORO_FEATURE_NETWORKING`, `LIBCORO_FEATURE_TLS`) ### Project layout + - `test/android/` — Android application project (Gradle) - `src/main/cpp/CMakeLists.txt` — integrates libcoro and links in the canonical test sources - `src/main/cpp/main.cpp` — JNI host that launches Catch2-based libcoro tests with live log streaming - `scripts/build_openssl.sh` — helper to produce per-ABI OpenSSL static libs under `external/openssl//` ### Building the Android test APK locally + Prerequisites: Android SDK + NDK r29, CMake 3.22.1 (installed via SDK), JDK 17. From repo root: @@ -1511,11 +1613,15 @@ gradle assembleDebug ``` Notes: + - You can override the Gradle build directory via `-PcustomBuildDir=...` (used in CI). - You can restrict to a specific ABI via `-PciAbi=`. ### Running tests on an emulator -Tests are executed by launching the app, which loads a shared library `libcoroTest.so` that embeds the libcoro test suite (Catch2). Output is streamed to Logcat with tag `coroTest` and also mirrored into the app sandbox file `files/libcoro-tests.log`. + +Tests are executed by launching the app, which loads a shared library `libcoroTest.so` that embeds the libcoro test +suite (Catch2). Output is streamed to Logcat with tag `coroTest` and also mirrored into the app sandbox file +`files/libcoro-tests.log`. You can pass test options by pushing a simple properties file to the device: @@ -1524,27 +1630,35 @@ filter=~[benchmark] ~[bench] ~[semaphore] ~[io_scheduler] timeout=600 ``` -Place it at `/data/local/tmp/coro_test_config.properties` or inside the app sandbox at `files/coro_test_config.properties`. The JNI runner falls back to defaults when the sandbox is not writable. +Place it at `/data/local/tmp/coro_test_config.properties` or inside the app sandbox at +`files/coro_test_config.properties`. The JNI runner falls back to defaults when the sandbox is not writable. -Default exclusions in emulator runs skip slow/fragile suites (benchmarks, some networking/TLS servers, long-running schedulers). See `test/android/src/main/cpp/main.cpp` for the current filter set. +Default exclusions in emulator runs skip slow/fragile suites (benchmarks, some networking/TLS servers, long-running +schedulers). See `test/android/src/main/cpp/main.cpp` for the current filter set. ### CI pipeline + The GitHub Actions workflow `.github/workflows/ci-android.yml` performs: + - Per-ABI matrix builds (arm64-v8a, armeabi-v7a, x86, x86_64) - OpenSSL prebuild per ABI via `scripts/build_openssl.sh` - Emulator provisioning on x86_64 (Android 30), headless launch, storage readiness checks - Pushing test configuration (filter/timeout) and running the app - Collecting `coroTest` Logcat into `emulator.log` and exporting `libcoro-tests.log` -The test filter excludes particularly slow suites to keep runs under a 10-minute global timeout inside the app. Adjust `TEST_FILTER`/`TEST_TIMEOUT` env vars in the workflow as needed. +The test filter excludes particularly slow suites to keep runs under a 10-minute global timeout inside the app. Adjust +`TEST_FILTER`/`TEST_TIMEOUT` env vars in the workflow as needed. ### Known limitations on Android + - Network server tests (e.g., TCP/TLS servers) are skipped in CI to avoid emulator networking flakiness - Some timing-sensitive `condition_variable` cases may be excluded on emulators due to short timeouts - The Android harness is for validation; apps should link `libcoro` as a regular CMake target in their own projects ### Using libcoro in your Android CMake project -Add libcoro as a subdirectory in your native CMake and link it to your library target. Example snippet for your module’s CMakeLists: + +Add libcoro as a subdirectory in your native CMake and link it to your library target. Example snippet for your module’s +CMakeLists: ```cmake add_subdirectory(${CMAKE_SOURCE_DIR}/path/to/libcoro libcoro_build) @@ -1552,9 +1666,11 @@ target_link_libraries(your-lib PRIVATE libcoro log) target_compile_definitions(your-lib PRIVATE LIBCORO_FEATURE_NETWORKING LIBCORO_FEATURE_TLS) ``` -If you require TLS, provide OpenSSL for the target ABI (static or shared) and set `OPENSSL_ROOT_DIR`/`OPENSSL_USE_STATIC_LIBS` accordingly. +If you require TLS, provide OpenSSL for the target ABI (static or shared) and set `OPENSSL_ROOT_DIR`/ +`OPENSSL_USE_STATIC_LIBS` accordingly. ### Requirements + C++20 Compiler with coroutine support g++ [11, 12, 13] clang++ [16, 17] @@ -1574,39 +1690,44 @@ If you require TLS, provide OpenSSL for the target ABI (static or shared) and se #### Tested Operating Systems Full feature supported operating systems: - * ubuntu:22.04, 24.04 - * fedora:37-40 - * MacOS 15 - * openSUSE/leap:15.6 + +* ubuntu:22.04, 24.04 +* fedora:37-40 +* MacOS 15 +* openSUSE/leap:15.6 The following systems currently do not support `LIBCORO_FEATURE_NETWORKING` or `LIBCORO_FEATURE_TLS`: - * Windows 2022 - * Emscripten 3.1.45 + +* Windows 2022 +* Emscripten 3.1.45 #### Cloning the project + This project uses git submodules, to properly checkout this project use: git clone --recurse-submodules This project depends on the following git sub-modules: - * [libc-ares](https://github.com/c-ares/c-ares) For async DNS resolver, this is a git submodule. - * [catch2](https://github.com/catchorg/Catch2) For testing, this is embedded in the `test/` directory. + +* [libc-ares](https://github.com/c-ares/c-ares) For async DNS resolver, this is a git submodule. +* [catch2](https://github.com/catchorg/Catch2) For testing, this is embedded in the `test/` directory. #### Building + mkdir Release && cd Release cmake -DCMAKE_BUILD_TYPE=Release .. cmake --build . CMake Options: -| Name | Default | Description | -|:------------------------------|:--------|:---------------------------------------------------------------------------------------------------| -| LIBCORO_EXTERNAL_DEPENDENCIES | OFF | Use CMake find_package to resolve dependencies instead of embedded libraries. | -| LIBCORO_BUILD_TESTS | ON | Should the tests be built? Note this is only default ON if libcoro is the root CMakeLists.txt | -| LIBCORO_CODE_COVERAGE | OFF | Should code coverage be enabled? Requires tests to be enabled. | -| LIBCORO_BUILD_EXAMPLES | ON | Should the examples be built? Note this is only default ON if libcoro is the root CMakeLists.txt | -| LIBCORO_FEATURE_NETWORKING | ON | Include networking features. MSVC not currently supported | -| LIBCORO_FEATURE_TLS | ON | Include TLS features. Requires networking to be enabled. MSVC not currently supported. | +| Name | Default | Description | +|:------------------------------|:--------|:-------------------------------------------------------------------------------------------------| +| LIBCORO_EXTERNAL_DEPENDENCIES | OFF | Use CMake find_package to resolve dependencies instead of embedded libraries. | +| LIBCORO_BUILD_TESTS | ON | Should the tests be built? Note this is only default ON if libcoro is the root CMakeLists.txt | +| LIBCORO_CODE_COVERAGE | OFF | Should code coverage be enabled? Requires tests to be enabled. | +| LIBCORO_BUILD_EXAMPLES | ON | Should the examples be built? Note this is only default ON if libcoro is the root CMakeLists.txt | +| LIBCORO_FEATURE_NETWORKING | ON | Include networking features. MSVC not currently supported | +| LIBCORO_FEATURE_TLS | ON | Include TLS features. Requires networking to be enabled. MSVC not currently supported. | #### Adding to your project @@ -1622,8 +1743,9 @@ target_link_libraries(${PROJECT_NAME} PUBLIC libcoro) ``` ##### FetchContent -CMake can include the project directly by downloading the source, compiling and linking to your project via FetchContent, below is an example on how you might do this within your project. +CMake can include the project directly by downloading the source, compiling and linking to your project via +FetchContent, below is an example on how you might do this within your project. ```cmake cmake_minimum_required(VERSION 3.11) @@ -1631,9 +1753,9 @@ cmake_minimum_required(VERSION 3.11) # Fetch the project and make it available for use. include(FetchContent) FetchContent_Declare( - libcoro - GIT_REPOSITORY https://github.com/jbaldwin/libcoro.git - GIT_TAG + libcoro + GIT_REPOSITORY https://github.com/jbaldwin/libcoro.git + GIT_TAG ) FetchContent_MakeAvailable(libcoro) @@ -1643,14 +1765,19 @@ target_link_libraries(${PROJECT_NAME} PUBLIC libcoro) ``` ##### Package managers + libcoro is available via package managers [Conan](https://conan.io/center/libcoro) and [vcpkg](https://vcpkg.io/). ### Contributing -Contributing is welcome, if you have ideas or bugs please open an issue. If you want to open a PR they are also welcome, if you are adding a bugfix or a feature please include tests to verify the feature or bugfix is working properly. If it isn't included I will be asking for you to add some! +Contributing is welcome, if you have ideas or bugs please open an issue. If you want to open a PR they are also welcome, +if you are adding a bugfix or a feature please include tests to verify the feature or bugfix is working properly. If it +isn't included I will be asking for you to add some! #### Tests -The tests will automatically be run by github actions on creating a pull request. They can also be ran locally after building from the build directory: + +The tests will automatically be run by github actions on creating a pull request. They can also be ran locally after +building from the build directory: # Invoke via cmake with all output from the tests displayed to console: ctest -VV @@ -1659,16 +1786,21 @@ The tests will automatically be run by github actions on creating a pull request # Tests are tagged with their group, below is how to run all of the coro::net::tcp::server tests: ./test/libcoro_test "[tcp_server]" -If you open a PR for a bugfix or new feature please include tests to verify that the change is working as intended. If your PR doesn't include tests I will ask you to add them and won't merge until they are added and working properly. Tests are found in the `/test` directory and are organized by object type. +If you open a PR for a bugfix or new feature please include tests to verify that the change is working as intended. If +your PR doesn't include tests I will ask you to add them and won't merge until they are added and working properly. +Tests are found in the `/test` directory and are organized by object type. ### Support -File bug reports, feature requests and questions using [GitHub libcoro Issues](https://github.com/jbaldwin/libcoro/issues) +File bug reports, feature requests and questions +using [GitHub libcoro Issues](https://github.com/jbaldwin/libcoro/issues) Copyright © 2020-2025 Josh Baldwin [badge.language]: https://img.shields.io/badge/language-C%2B%2B20-yellow.svg + [badge.license]: https://img.shields.io/badge/license-Apache--2.0-blue [language]: https://en.wikipedia.org/wiki/C%2B%2B17 + [license]: https://en.wikipedia.org/wiki/Apache_License diff --git a/examples/coro_tcp_echo_server.cpp b/examples/coro_tcp_echo_server.cpp index 708ef6c6..58abab0e 100644 --- a/examples/coro_tcp_echo_server.cpp +++ b/examples/coro_tcp_echo_server.cpp @@ -13,18 +13,11 @@ auto main() -> int // Wait for data to be available to read. co_await client.poll(coro::poll_op::read); auto [rstatus, rspan] = client.recv(buf); - switch (rstatus) - { - case coro::net::recv_status::ok: - // Make sure the client socket can be written to. - co_await client.poll(coro::poll_op::write); - client.send(std::span{rspan}); - break; - case coro::net::recv_status::would_block: - break; - case coro::net::recv_status::closed: - default: - co_return; + if (rstatus.is_ok()) { + co_await client.poll(coro::poll_op::write); + client.send(std::span{rspan}); + } else if (rstatus.is_closed()) { + co_return; } } }; @@ -40,7 +33,7 @@ auto main() -> int { case coro::poll_status::read: { - auto client = server.accept(); + auto client = server.accept_now(); if (client.socket().is_valid()) { scheduler->spawn_detached(make_on_connection_task(std::move(client))); diff --git a/include/coro/net/tcp/client.hpp b/include/coro/net/tcp/client.hpp index 58c68689..6d1dfc25 100644 --- a/include/coro/net/tcp/client.hpp +++ b/include/coro/net/tcp/client.hpp @@ -64,19 +64,21 @@ class client auto connect(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task; /** - * Polls for the given operation on this client's tcp socket. This should be done prior to - * calling recv and after a send call that doesn't send the entire buffer. - * @param op The poll operation to perform, use read for incoming data and write for outgoing. - * @param timeout The amount of time to wait for the poll event to be ready. Use zero for infinte timeout. - * @return The status result of th poll operation. When poll_status::read or poll_status::write is returned then - * this specific event operation is ready. + * Attempts to asynchronously read data from the socket into the provided buffer. + * + * The function may read fewer bytes than requested. In that case, the returned + * status will be 'ok' and the returned span will reference the prefix of the + * buffer that was filled with received data. + * + * @see read_exact() + * @param buffer Destination buffer to read data into. + * @param timeout Maximum time to wait for the socket to become readable + * Use 0 for infinite timeout. + * @return A pair of: + * - status of the operation + * - span pointing to the read part of buffer + * @{ */ - auto poll(const coro::poll_op op, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) - -> coro::task - { - return m_io_scheduler->poll(m_socket, op, timeout); - } - auto read_some(std::span buffer, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task>> { @@ -93,7 +95,24 @@ class client { return read_some(std::as_writable_bytes(std::span{buffer}), timeout); } + /** }@ */ + /** + * Asynchronously reads exactly buffer.size() bytes from the socket. + * + * Repeatedly calls write_some until either: + * - buffer.size() bytes have been read, or + * - an error or timeout occurs. + * + * @see read_some() + * @param buffer Destination buffer to read data into. + * @param timeout Maximum time to wait for the socket to become readable + * Use 0 for infinite timeout. + * @return A pair of: + * - status of the operation + * - span pointing to the read part of buffer + * @{ + */ auto read_exact(std::span buffer, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task>> { @@ -135,43 +154,24 @@ class client { return read_exact(std::as_writable_bytes(std::span{buffer}), timeout); } + /** }@ */ + /** - * Receives incoming data into the given buffer. By default, since all tcp client sockets are set - * to non-blocking use co_await poll() to determine when data is ready to be received. - * @param buffer Received bytes are written into this buffer up to the buffers size. - * @return The status of the recv call and a span of the bytes received (if any). The span of - * bytes will be a subspan or full span of the given input buffer. + * Attempts to asynchronously write data from the provided buffer to the socket. + * + * If only part of the buffer is written, the returned status will be 'ok' and + * the returned span will reference the portion of the original buffer that + * was not sent. + * + * @see write_all() + * @param buffer Buffer containing the data to write. + * @param timeout Maximum time to wait for the socket to become writable. + * Use 0 for infinite timeout. + * @return A pair of: + * - status of the operation + * - span pointing to the unsent part of the buffer + * @{ */ - template< - concepts::mutable_buffer buffer_type, - typename element_type = typename concepts::mutable_buffer_traits::element_type> - auto recv(buffer_type&& buffer) -> std::pair> - { - // If the user requested zero bytes, just return. - if (buffer.empty()) - { - return {io_status{io_status::kind::ok}, std::span{}}; - } - - auto bytes_recv = ::recv(m_socket.native_handle(), buffer.data(), buffer.size(), 0); - if (bytes_recv > 0) - { - // Ok, we've received some data. - return { - io_status{io_status::kind::ok}, - std::span{buffer.data(), static_cast(bytes_recv)}}; - } - - if (bytes_recv == 0) - { - // On TCP stream sockets 0 indicates the connection has been closed by the peer. - return {io_status{io_status::kind::closed}, std::span{}}; - } - - // Report the error to the user. - return {make_io_status_from_native(errno), std::span{}}; - } - auto write_some( std::span buffer, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task>> @@ -179,7 +179,7 @@ class client auto poll_status = co_await poll(poll_op::write, timeout); if (poll_status != poll_status::write) { - co_return std::pair{make_io_status_poll_status(poll_status), std::span{}}; + co_return std::pair{make_io_status_poll_status(poll_status), buffer}; } co_return send(buffer); } @@ -189,7 +189,22 @@ class client { return write_some(std::as_bytes(std::span{buffer}), timeout); } + /** }@ */ + /** + * Asynchronously writes the entire contents of the provided buffer to the socket. + * Repeatedly call write_some until either: + * - all bytes have been sent, or + * - an error or timeout occurs. + * + * @see write_some() + * @param buffer The data to write on the tcp socket. + * @return A pair of: + * - status of the operation + * - span pointing to the unsent part of the buffer; + * empty if all data was sent successfully + * @{ + */ auto write_all( std::span buffer, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task>> @@ -234,13 +249,64 @@ class client co_return {io_status{io_status::kind::ok}, {}}; } - template auto write_all(const buffer_type& buffer, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task>> { return write_all(std::as_bytes(std::span{buffer}), timeout); } + /** }@ */ + + /** + * Polls for the given operation on this client's tcp socket. This should be done prior to + * calling recv and after a send call that doesn't send the entire buffer. + * @param op The poll operation to perform, use read for incoming data and write for outgoing. + * @param timeout The amount of time to wait for the poll event to be ready. Use zero for infinte timeout. + * @return The status result of th poll operation. When poll_status::read or poll_status::write is returned then + * this specific event operation is ready. + */ + auto poll(const coro::poll_op op, const std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + -> coro::task + { + return m_io_scheduler->poll(m_socket, op, timeout); + } + + /** + * Receives incoming data into the given buffer. By default, since all tcp client sockets are set + * to non-blocking use co_await poll() to determine when data is ready to be received. + * @param buffer Received bytes are written into this buffer up to the buffers size. + * @return The status of the recv call and a span of the bytes received (if any). The span of + * bytes will be a subspan or full span of the given input buffer. + */ + template< + concepts::mutable_buffer buffer_type, + typename element_type = typename concepts::mutable_buffer_traits::element_type> + auto recv(buffer_type&& buffer) -> std::pair> + { + // If the user requested zero bytes, just return. + if (buffer.empty()) + { + return {io_status{io_status::kind::ok}, std::span{}}; + } + + auto bytes_recv = ::recv(m_socket.native_handle(), buffer.data(), buffer.size(), 0); + if (bytes_recv > 0) + { + // Ok, we've received some data. + return { + io_status{io_status::kind::ok}, + std::span{buffer.data(), static_cast(bytes_recv)}}; + } + + if (bytes_recv == 0) + { + // On TCP stream sockets 0 indicates the connection has been closed by the peer. + return {io_status{io_status::kind::closed}, std::span{}}; + } + + // Report the error to the user. + return {make_io_status_from_native(errno), std::span{}}; + } /** * Sends outgoing data from the given buffer. If a partial write occurs then use co_await poll() diff --git a/include/coro/net/tcp/server.hpp b/include/coro/net/tcp/server.hpp index a45fc4ef..c73694f1 100644 --- a/include/coro/net/tcp/server.hpp +++ b/include/coro/net/tcp/server.hpp @@ -32,10 +32,10 @@ class server explicit server( std::unique_ptr& scheduler, - options opts = options{ - .address = net::ip_address::from_string("0.0.0.0"), - .port = 8080, - .backlog = 128, + options opts = options{ + .address = net::ip_address::from_string("0.0.0.0"), + .port = 8080, + .backlog = 128, }); server(const server&) = delete; @@ -44,6 +44,27 @@ class server auto operator=(server&& other) -> server&; ~server() = default; + /** + * Asynchronously waits for an incoming TCP connection and accepts it. + * + * Use the socket.is_valid() to verify the client was correctly accepted. + * + * @param timeout How long to wait for a new connection before timing out, zero waits indefinitely. + * @return The newly connected tcp client connection on success or an io_status describing the failure. + */ + auto accept(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + -> coro::task> + { + auto pstatus = co_await poll(timeout); + + if (pstatus != coro::poll_status::read) + { + co_return unexpected{make_io_status_poll_status(pstatus)}; + } + + co_return accept_now(); + }; + /** * Polls for new incoming tcp connections. * @param timeout How long to wait for a new connection before timing out, zero waits indefinitely. @@ -60,7 +81,7 @@ class server * and invalid state, use the socket.is_valid() to verify the client was correctly accepted. * @return The newly connected tcp client connection. */ - auto accept() -> coro::net::tcp::client; + auto accept_now() -> coro::net::tcp::client; /** * @return The tcp accept socket this server is using. diff --git a/src/net/tcp/server.cpp b/src/net/tcp/server.cpp index 7476cf8b..5fd396af 100644 --- a/src/net/tcp/server.cpp +++ b/src/net/tcp/server.cpp @@ -38,7 +38,7 @@ auto server::operator=(server&& other) -> server& return *this; } -auto server::accept() -> coro::net::tcp::client +auto server::accept_now() -> coro::net::tcp::client { sockaddr_in client{}; constexpr const int len = sizeof(struct sockaddr_in); diff --git a/test/bench.cpp b/test/bench.cpp index d4897e93..a5095839 100644 --- a/test/bench.cpp +++ b/test/bench.cpp @@ -402,27 +402,26 @@ TEST_CASE("benchmark tcp::server echo server thread pool", "[benchmark]") // Echo the messages until the socket is closed. while (true) { - auto pstatus = co_await client.poll(coro::poll_op::read); - if (pstatus != coro::poll_status::read) + auto [rstatus, rspan] = co_await client.read_some(in); + if (!rstatus.is_ok()) { - REQUIRE_THREAD_SAFE(pstatus == coro::poll_status::closed); + REQUIRE_THREAD_SAFE(rstatus.is_closed()); // the socket has been closed break; } - REQUIRE_THREAD_SAFE(pstatus == coro::poll_status::read); + REQUIRE_THREAD_SAFE(rstatus.is_ok()); - auto [rstatus, rspan] = client.recv(in); - if (rstatus == coro::net::recv_status::closed) + if (rstatus.is_closed()) { REQUIRE_THREAD_SAFE(rspan.empty()); break; } - REQUIRE_THREAD_SAFE(rstatus == coro::net::recv_status::ok); + REQUIRE_THREAD_SAFE(rstatus.is_ok()); in.resize(rspan.size()); - auto [sstatus, remaining] = client.send(in); - REQUIRE_THREAD_SAFE(sstatus == coro::net::send_status::ok); + auto [sstatus, remaining] = co_await client.write_some(in); + REQUIRE_THREAD_SAFE(sstatus.is_ok()); REQUIRE_THREAD_SAFE(remaining.empty()); } @@ -440,15 +439,11 @@ TEST_CASE("benchmark tcp::server echo server thread pool", "[benchmark]") while (accepted.load(std::memory_order::acquire) < connections) { - auto pstatus = co_await server.poll(std::chrono::milliseconds{1}); - if (pstatus == coro::poll_status::read) + auto c = co_await server.accept(std::chrono::milliseconds{1}); + if (c && c->socket().is_valid()) { - auto c = server.accept(); - if (c.socket().is_valid()) - { - accepted.fetch_add(1, std::memory_order::release); - server_scheduler->spawn_detached(make_on_connection_task(std::move(c), wait_for_clients)); - } + accepted.fetch_add(1, std::memory_order::release); + server_scheduler->spawn_detached(make_on_connection_task(std::move(*c), wait_for_clients)); } } @@ -480,22 +475,20 @@ TEST_CASE("benchmark tcp::server echo server thread pool", "[benchmark]") for (size_t i = 1; i <= messages_per_connection; ++i) { auto req_start = std::chrono::steady_clock::now(); - auto [sstatus, remaining] = client.send(msg); - REQUIRE_THREAD_SAFE(sstatus == coro::net::send_status::ok); + auto [sstatus, remaining] = co_await client.write_some(msg); + REQUIRE_THREAD_SAFE(sstatus.is_ok()); REQUIRE_THREAD_SAFE(remaining.empty()); - auto pstatus = co_await client.poll(coro::poll_op::read); - if (pstatus != coro::poll_status::read) + std::string response(64, '\0'); + auto [rstatus, rspan] = co_await client.read_some(response); + if (!rstatus.is_ok()) { - REQUIRE_THREAD_SAFE(pstatus == coro::poll_status::closed); + REQUIRE_THREAD_SAFE(rstatus.is_closed()); // the socket has been closed break; } - REQUIRE_THREAD_SAFE(pstatus == coro::poll_status::read); - std::string response(64, '\0'); - auto [rstatus, rspan] = client.recv(response); - REQUIRE_THREAD_SAFE(rstatus == coro::net::recv_status::ok); + REQUIRE_THREAD_SAFE(rstatus.is_ok()); REQUIRE_THREAD_SAFE(rspan.size() == msg.size()); response.resize(rspan.size()); REQUIRE_THREAD_SAFE(response == msg); @@ -608,28 +601,20 @@ TEST_CASE("benchmark tcp::server echo server inline", "[benchmark]") // Echo the messages until the socket is closed. while (true) { - auto pstatus = co_await client.poll(coro::poll_op::read); - if (pstatus != coro::poll_status::read) + auto [rstatus, rspan] = co_await client.read_some(in); + if (!rstatus.is_ok()) { - REQUIRE_THREAD_SAFE(pstatus == coro::poll_status::closed); + REQUIRE_THREAD_SAFE(rstatus.is_closed()); // the socket has been closed break; } - REQUIRE_THREAD_SAFE(pstatus == coro::poll_status::read); - - auto [rstatus, rspan] = client.recv(in); - if (rstatus == coro::net::recv_status::closed) - { - REQUIRE_THREAD_SAFE(rspan.empty()); - break; - } - REQUIRE_THREAD_SAFE(rstatus == coro::net::recv_status::ok); + REQUIRE_THREAD_SAFE(rstatus.is_ok()); in.resize(rspan.size()); - auto [sstatus, remaining] = client.send(in); - REQUIRE_THREAD_SAFE(sstatus == coro::net::send_status::ok); + auto [sstatus, remaining] = co_await client.write_some(in); + REQUIRE_THREAD_SAFE(sstatus.is_ok()); REQUIRE_THREAD_SAFE(remaining.empty()); } @@ -652,15 +637,11 @@ TEST_CASE("benchmark tcp::server echo server inline", "[benchmark]") while (accepted_clients < connections_per_client) { - auto pstatus = co_await server.poll(std::chrono::milliseconds{1000}); - if (pstatus == coro::poll_status::read) + auto c = co_await server.accept(std::chrono::milliseconds{1000}); + if (c && c->socket().is_valid()) { - auto c = server.accept(); - if (c.socket().is_valid()) - { - s.live_clients++; - s.scheduler->spawn_detached(make_on_connection_task(s, std::move(c))); - } + s.live_clients++; + s.scheduler->spawn_detached(make_on_connection_task(s, std::move(*c))); } } @@ -704,22 +685,19 @@ TEST_CASE("benchmark tcp::server echo server inline", "[benchmark]") for (size_t i = 1; i <= messages_per_connection; ++i) { auto req_start = std::chrono::steady_clock::now(); - auto [sstatus, remaining] = client.send(msg); - REQUIRE_THREAD_SAFE(sstatus == coro::net::send_status::ok); + auto [sstatus, remaining] = co_await client.write_some(msg); + REQUIRE_THREAD_SAFE(sstatus.is_ok()); REQUIRE_THREAD_SAFE(remaining.empty()); - auto pstatus = co_await client.poll(coro::poll_op::read); - if (pstatus != coro::poll_status::read) + std::string response(64, '\0'); + auto [rstatus, rspan] = co_await client.read_some(response); + if (!rstatus.is_ok()) { - REQUIRE_THREAD_SAFE(pstatus == coro::poll_status::closed); + REQUIRE_THREAD_SAFE(rstatus.is_closed()); // the socket has been closed break; } - REQUIRE_THREAD_SAFE(pstatus == coro::poll_status::read); - std::string response(64, '\0'); - auto [rstatus, rspan] = client.recv(response); - REQUIRE_THREAD_SAFE(rstatus == coro::net::recv_status::ok); REQUIRE_THREAD_SAFE(rspan.size() == msg.size()); response.resize(rspan.size()); REQUIRE_THREAD_SAFE(response == msg); diff --git a/test/net/test_tcp_server.cpp b/test/net/test_tcp_server.cpp index 8c5fc317..732b2c35 100644 --- a/test/net/test_tcp_server.cpp +++ b/test/net/test_tcp_server.cpp @@ -56,18 +56,14 @@ TEST_CASE("tcp_server ping server", "[tcp_server]") co_await scheduler->schedule(); coro::net::tcp::server server{scheduler}; - // Poll for client connection. - std::cerr << "server poll(accept)\n"; - auto pstatus = co_await server.poll(); - REQUIRE(pstatus == coro::poll_status::read); std::cerr << "server accept()\n"; - auto client = server.accept(); - REQUIRE(client.socket().is_valid()); - + auto client = co_await server.accept(); + REQUIRE(client.has_value()); + REQUIRE(client->socket().is_valid()); std::string buffer(256, '\0'); std::cerr << "server read_some()\n"; - auto [rstatus, rspan] = co_await client.read_some(buffer); + auto [rstatus, rspan] = co_await client->read_some(buffer); REQUIRE(rstatus.is_ok()); REQUIRE(rspan.size() == client_msg.size()); buffer.resize(rspan.size()); @@ -75,7 +71,7 @@ TEST_CASE("tcp_server ping server", "[tcp_server]") // Respond to client. std::cerr << "server send()\n"; - auto [wstatus, remaining] = co_await client.write_some(server_msg); + auto [wstatus, remaining] = co_await client->write_some(server_msg); REQUIRE(wstatus.is_ok()); REQUIRE(remaining.empty()); @@ -110,36 +106,21 @@ TEST_CASE("tcp_server concurrent polling on the same socket", "[tcp_server]") co_await scheduler->schedule(); coro::net::tcp::server server{scheduler}; - auto poll_status = co_await server.poll(); - REQUIRE(poll_status == coro::poll_status::read); - - auto read_client = server.accept(); - REQUIRE(read_client.socket().is_valid()); + auto read_client = co_await server.accept(); + REQUIRE(read_client.has_value()); + REQUIRE(read_client->socket().is_valid()); // make a copy so we can poll twice at the same time in different coroutines - auto write_client = read_client; + auto write_client = *read_client; - scheduler->spawn_detached(make_read_task(std::move(read_client))); + scheduler->spawn_detached(make_read_task(std::move(*read_client))); // Make sure the read op has completed. co_await scheduler->yield_for(500ms); - std::string data(8096, 'A'); - std::span remaining{data}; - do - { - auto poll_status = co_await write_client.poll(coro::poll_op::write); - REQUIRE(poll_status == coro::poll_status::write); - auto [send_status, r] = write_client.send(remaining); - REQUIRE(send_status.is_ok()); - - if (r.empty()) - { - break; - } - - remaining = r; - } while (true); + std::string data(8096, 'A'); + auto [send_status, r] = co_await write_client.write_all(data); + REQUIRE(send_status.is_ok()); co_return data; }; @@ -198,10 +179,9 @@ TEST_CASE("tcp_server graceful shutdown via socket", "[tcp_server]") { std::cerr << "make accept task start\n"; started.set(); - auto poll_result = co_await server.poll(); - REQUIRE(poll_result == coro::poll_status::cancelled); - auto client = server.accept(); - REQUIRE_FALSE(client.socket().is_valid()); + auto client = co_await server.accept(); + REQUIRE_FALSE(client.has_value()); + REQUIRE(client.error().type == coro::net::io_status::kind::cancelled); std::cerr << "make accept task completed\n"; }; From db2ef68ab335f61a5c8be8510485099738314304 Mon Sep 17 00:00:00 2001 From: PyXiion Date: Mon, 26 Jan 2026 13:51:45 +0300 Subject: [PATCH 3/4] Rename make_io_status_poll_status to make_io_status_from_poll_status --- include/coro/net/io_status.hpp | 5 +++-- include/coro/net/tcp/client.hpp | 4 ++-- include/coro/net/tcp/server.hpp | 2 +- src/net/io_status.cpp | 11 +++++++---- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/include/coro/net/io_status.hpp b/include/coro/net/io_status.hpp index a5c6bfa6..6e445116 100644 --- a/include/coro/net/io_status.hpp +++ b/include/coro/net/io_status.hpp @@ -17,7 +17,7 @@ struct io_status connection_refused, timeout, - try_again, + would_block_or_try_again, polling_error, cancelled, @@ -30,6 +30,7 @@ struct io_status [[nodiscard]] auto is_ok() const -> bool { return type == kind::ok; } [[nodiscard]] auto is_timeout() const -> bool { return type == kind::timeout; } [[nodiscard]] auto is_closed() const -> bool { return type == kind::closed; } + [[nodiscard]] auto would_block() const -> bool { return type == kind::would_block_or_try_again; } [[nodiscard]] auto is_native() const -> bool { return type == kind::native; } @@ -42,5 +43,5 @@ struct io_status }; auto make_io_status_from_native(int native_code) -> io_status; -auto make_io_status_poll_status(coro::poll_status status) -> io_status; +auto make_io_status_from_poll_status(coro::poll_status status) -> io_status; } // namespace coro::net \ No newline at end of file diff --git a/include/coro/net/tcp/client.hpp b/include/coro/net/tcp/client.hpp index 6d1dfc25..4aa6e689 100644 --- a/include/coro/net/tcp/client.hpp +++ b/include/coro/net/tcp/client.hpp @@ -85,7 +85,7 @@ class client auto poll_status = co_await poll(poll_op::read, timeout); if (poll_status != poll_status::read) { - co_return std::pair{make_io_status_poll_status(poll_status), std::span{}}; + co_return std::pair{make_io_status_from_poll_status(poll_status), std::span{}}; } co_return recv(buffer); } @@ -179,7 +179,7 @@ class client auto poll_status = co_await poll(poll_op::write, timeout); if (poll_status != poll_status::write) { - co_return std::pair{make_io_status_poll_status(poll_status), buffer}; + co_return std::pair{make_io_status_from_poll_status(poll_status), buffer}; } co_return send(buffer); } diff --git a/include/coro/net/tcp/server.hpp b/include/coro/net/tcp/server.hpp index c73694f1..f6f99398 100644 --- a/include/coro/net/tcp/server.hpp +++ b/include/coro/net/tcp/server.hpp @@ -59,7 +59,7 @@ class server if (pstatus != coro::poll_status::read) { - co_return unexpected{make_io_status_poll_status(pstatus)}; + co_return unexpected{make_io_status_from_poll_status(pstatus)}; } co_return accept_now(); diff --git a/src/net/io_status.cpp b/src/net/io_status.cpp index 9d40068e..88c08ebe 100644 --- a/src/net/io_status.cpp +++ b/src/net/io_status.cpp @@ -25,8 +25,8 @@ std::string coro::net::io_status::message() const return "Connection refused by target host"; case kind::timeout: return "Operation timed out"; - case kind::try_again: - return "try_again"; + case kind::would_block_or_try_again: + return "would_block_or_try_again"; case kind::polling_error: return "polling_error"; case kind::cancelled: @@ -92,7 +92,10 @@ auto coro::net::make_io_status_from_native(int native_code) -> coro::net::io_sta type = kind::connection_reset; break; case EAGAIN: - type = kind::try_again; +#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + type = kind::would_block_or_try_again; break; default: type = kind::native; @@ -101,7 +104,7 @@ auto coro::net::make_io_status_from_native(int native_code) -> coro::net::io_sta return coro::net::io_status{.type = type, .native_code = native_code}; #endif } -auto coro::net::make_io_status_poll_status(coro::poll_status status) -> coro::net::io_status +auto coro::net::make_io_status_from_poll_status(coro::poll_status status) -> coro::net::io_status { switch (status) { From f59cd08bebac7f75fa70c2e8ed49d46e8036539b Mon Sep 17 00:00:00 2001 From: PyXiion Date: Mon, 26 Jan 2026 23:02:16 +0300 Subject: [PATCH 4/4] udp::peer read/write API --- include/coro/coro.hpp | 3 +- include/coro/net/io_status.hpp | 3 ++ include/coro/net/recv_status.hpp | 32 --------------- include/coro/net/send_status.hpp | 31 -------------- include/coro/net/tcp/server.hpp | 6 ++- include/coro/net/udp/peer.hpp | 69 ++++++++++++++++++++++++++------ src/net/io_status.cpp | 7 ++++ src/net/recv_status.cpp | 56 -------------------------- src/net/send_status.cpp | 5 --- test/net/test_udp_peers.cpp | 37 ++++++----------- 10 files changed, 84 insertions(+), 165 deletions(-) delete mode 100644 include/coro/net/recv_status.hpp delete mode 100644 include/coro/net/send_status.hpp delete mode 100644 src/net/recv_status.cpp delete mode 100644 src/net/send_status.cpp diff --git a/include/coro/coro.hpp b/include/coro/coro.hpp index 9f34bec2..eefce908 100644 --- a/include/coro/coro.hpp +++ b/include/coro/coro.hpp @@ -23,8 +23,7 @@ #include "coro/net/connect.hpp" #include "coro/net/hostname.hpp" #include "coro/net/ip_address.hpp" - #include "coro/net/recv_status.hpp" - #include "coro/net/send_status.hpp" + #include "coro/net/io_status.hpp" #include "coro/net/socket.hpp" #include "coro/net/udp/peer.hpp" #endif diff --git a/include/coro/net/io_status.hpp b/include/coro/net/io_status.hpp index 6e445116..8d449bce 100644 --- a/include/coro/net/io_status.hpp +++ b/include/coro/net/io_status.hpp @@ -21,6 +21,9 @@ struct io_status polling_error, cancelled, + udp_not_bound, + message_to_big, + native }; diff --git a/include/coro/net/recv_status.hpp b/include/coro/net/recv_status.hpp deleted file mode 100644 index 07946ecf..00000000 --- a/include/coro/net/recv_status.hpp +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#include -#include -#include - -namespace coro::net -{ -enum class recv_status : int64_t -{ - ok = 0, - /// The peer closed the socket. - closed = -1, - /// The udp socket has not been bind()'ed to a local port. - udp_not_bound = -2, - try_again = EAGAIN, - // Note: that only the tcp::client will return this, a tls::client returns the specific ssl_would_block_* status'. - would_block = EWOULDBLOCK, - bad_file_descriptor = EBADF, - connection_refused = ECONNREFUSED, - memory_fault = EFAULT, - interrupted = EINTR, - invalid_argument = EINVAL, - no_memory = ENOMEM, - not_connected = ENOTCONN, - not_a_socket = ENOTSOCK, - connection_reset_by_peer = ECONNRESET, -}; - -auto to_string(recv_status status) -> const std::string&; - -} // namespace coro::net diff --git a/include/coro/net/send_status.hpp b/include/coro/net/send_status.hpp deleted file mode 100644 index c567f175..00000000 --- a/include/coro/net/send_status.hpp +++ /dev/null @@ -1,31 +0,0 @@ -#pragma once - -#include -#include - -namespace coro::net -{ -enum class send_status : int64_t -{ - ok = 0, - closed = -1, - permission_denied = EACCES, - try_again = EAGAIN, - would_block = EWOULDBLOCK, - already_in_progress = EALREADY, - bad_file_descriptor = EBADF, - connection_reset = ECONNRESET, - no_peer_address = EDESTADDRREQ, - memory_fault = EFAULT, - interrupted = EINTR, - is_connection = EISCONN, - message_size = EMSGSIZE, - output_queue_full = ENOBUFS, - no_memory = ENOMEM, - not_connected = ENOTCONN, - not_a_socket = ENOTSOCK, - operationg_not_supported = EOPNOTSUPP, - pipe_closed = EPIPE, -}; - -} // namespace coro::net diff --git a/include/coro/net/tcp/server.hpp b/include/coro/net/tcp/server.hpp index f6f99398..7e162478 100644 --- a/include/coro/net/tcp/server.hpp +++ b/include/coro/net/tcp/server.hpp @@ -62,7 +62,11 @@ class server co_return unexpected{make_io_status_from_poll_status(pstatus)}; } - co_return accept_now(); + auto client = accept_now(); + if (!client.socket().is_valid()) { + co_return unexpected{make_io_status_from_native(errno)}; + } + co_return client; }; /** diff --git a/include/coro/net/udp/peer.hpp b/include/coro/net/udp/peer.hpp index 4fd40943..b461c894 100644 --- a/include/coro/net/udp/peer.hpp +++ b/include/coro/net/udp/peer.hpp @@ -2,9 +2,8 @@ #include "coro/concepts/buffer.hpp" #include "coro/io_scheduler.hpp" +#include "coro/net/io_status.hpp" #include "coro/net/ip_address.hpp" -#include "coro/net/recv_status.hpp" -#include "coro/net/send_status.hpp" #include "coro/net/socket.hpp" #include "coro/task.hpp" @@ -58,6 +57,48 @@ class peer */ auto socket() const noexcept -> const net::socket& { return m_socket; } + auto write_to( + const info& peer_info, + const std::span buffer, + std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task + { + auto pstatus = co_await poll(poll_op::write, timeout); + if (pstatus != poll_status::write) + { + co_return make_io_status_from_poll_status(pstatus); + } + + co_return sendto(peer_info, buffer); + } + template + auto write_to( + const info& peer_info, + const buffer_type& buffer, + std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task + { + return write_to(peer_info, std::as_bytes(std::span{buffer}), timeout); + } + + auto read_from(std::span buffer, std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + -> coro::task>> + { + auto pstatus = co_await poll(poll_op::read, timeout); + if (pstatus != poll_status::read) + { + co_return {make_io_status_from_poll_status(pstatus), {}, {}}; + } + + co_return recvfrom(buffer); + } + + template< + concepts::mutable_buffer buffer_type> + auto read_from(buffer_type &&buffer, std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + -> coro::task>> + { + return read_from(std::as_writable_bytes(std::span{buffer}), timeout); + } + /** * @param op The poll operation to perform on the udp socket. Note that if this is a send call only * udp socket (did not bind) then polling for read will not work. @@ -76,12 +117,12 @@ class peer * @return The status of send call and a span view of any data that wasn't sent. This data if * un-sent will correspond to bytes at the end of the given buffer. */ - template::element_type> - auto sendto(const info& peer_info, const buffer_type& buffer) -> std::pair> + template + auto sendto(const info& peer_info, const buffer_type& buffer) -> io_status { if (buffer.empty()) { - return {send_status::ok, std::span{}}; + return io_status{io_status::kind::ok}; } sockaddr_in peer{}; @@ -94,13 +135,13 @@ class peer auto bytes_sent = ::sendto( m_socket.native_handle(), buffer.data(), buffer.size(), 0, reinterpret_cast(&peer), peer_len); - if (bytes_sent >= 0) + if (bytes_sent == std::ssize(buffer)) { - return {send_status::ok, std::span{buffer.data() + bytes_sent, buffer.size() - bytes_sent}}; + return io_status{io_status::kind::ok}; } else { - return {static_cast(errno), std::span{}}; + return make_io_status_from_native(errno); } } @@ -111,13 +152,15 @@ class peer * always start at the beggining of the buffer but depending on how large the data was * it might not fill the entire buffer. */ - template::element_type> - auto recvfrom(buffer_type&& buffer) -> std::tuple> + template< + concepts::mutable_buffer buffer_type, + typename element_type = typename concepts::mutable_buffer_traits::element_type> + auto recvfrom(buffer_type&& buffer) -> std::tuple> { // The user must bind locally to be able to receive packets. if (!m_bound) { - return {recv_status::udp_not_bound, peer::info{}, std::span{}}; + return {io_status{io_status::kind::udp_not_bound}, peer::info{}, std::span{}}; } sockaddr_in peer{}; @@ -128,7 +171,7 @@ class peer if (bytes_read < 0) { - return {static_cast(errno), peer::info{}, std::span{}}; + return {make_io_status_from_native(errno), peer::info{}, std::span{}}; } std::span ip_addr_view{ @@ -137,7 +180,7 @@ class peer }; return { - recv_status::ok, + io_status{io_status::kind::ok}, peer::info{ .address = net::ip_address{ip_addr_view, static_cast(peer.sin_family)}, .port = ntohs(peer.sin_port)}, diff --git a/src/net/io_status.cpp b/src/net/io_status.cpp index 88c08ebe..00ad3ea9 100644 --- a/src/net/io_status.cpp +++ b/src/net/io_status.cpp @@ -31,8 +31,12 @@ std::string coro::net::io_status::message() const return "polling_error"; case kind::cancelled: return "cancelled"; + case kind::udp_not_bound: + return "udp_not_bound"; case kind::native: return "native"; + case kind::message_to_big: + return "message_to_big"; } } @@ -97,6 +101,9 @@ auto coro::net::make_io_status_from_native(int native_code) -> coro::net::io_sta #endif type = kind::would_block_or_try_again; break; + case EMSGSIZE: + type = kind::message_to_big; + break; default: type = kind::native; break; diff --git a/src/net/recv_status.cpp b/src/net/recv_status.cpp deleted file mode 100644 index 2f1b3c4e..00000000 --- a/src/net/recv_status.cpp +++ /dev/null @@ -1,56 +0,0 @@ -#include "coro/net/recv_status.hpp" - -namespace coro::net -{ -static const std::string recv_status_ok{"ok"}; -static const std::string recv_status_closed{"closed"}; -static const std::string recv_status_udp_not_bound{"udp_not_bound"}; -static const std::string recv_status_would_block{"would_block"}; -static const std::string recv_status_bad_file_descriptor{"bad_file_descriptor"}; -static const std::string recv_status_connection_refused{"connection_refused"}; -static const std::string recv_status_memory_fault{"memory_fault"}; -static const std::string recv_status_interrupted{"interrupted"}; -static const std::string recv_status_invalid_argument{"invalid_argument"}; -static const std::string recv_status_no_memory{"no_memory"}; -static const std::string recv_status_not_connected{"not_connected"}; -static const std::string recv_status_not_a_socket{"not_a_socket"}; -static const std::string connection_reset_by_peer{"connection_reset_by_peer"}; -static const std::string recv_status_unknown{"unknown"}; - -auto to_string(recv_status status) -> const std::string& -{ - switch (status) - { - case recv_status::ok: - return recv_status_ok; - case recv_status::closed: - return recv_status_closed; - case recv_status::udp_not_bound: - return recv_status_udp_not_bound; - // case recv_status::try_again: return recv_status_try_again; - case recv_status::would_block: - return recv_status_would_block; - case recv_status::bad_file_descriptor: - return recv_status_bad_file_descriptor; - case recv_status::connection_refused: - return recv_status_connection_refused; - case recv_status::memory_fault: - return recv_status_memory_fault; - case recv_status::interrupted: - return recv_status_interrupted; - case recv_status::invalid_argument: - return recv_status_invalid_argument; - case recv_status::no_memory: - return recv_status_no_memory; - case recv_status::not_connected: - return recv_status_not_connected; - case recv_status::not_a_socket: - return recv_status_not_a_socket; - case recv_status::connection_reset_by_peer: - return connection_reset_by_peer; - } - - return recv_status_unknown; -} - -} // namespace coro::net diff --git a/src/net/send_status.cpp b/src/net/send_status.cpp deleted file mode 100644 index bc7b7023..00000000 --- a/src/net/send_status.cpp +++ /dev/null @@ -1,5 +0,0 @@ -#include "coro/net/send_status.hpp" - -namespace coro::net -{ -} // namespace coro::net diff --git a/test/net/test_udp_peers.cpp b/test/net/test_udp_peers.cpp index b329f60f..50c2cec3 100644 --- a/test/net/test_udp_peers.cpp +++ b/test/net/test_udp_peers.cpp @@ -17,9 +17,8 @@ TEST_CASE("udp one way", "[udp]") coro::net::udp::peer peer{scheduler}; coro::net::udp::peer::info peer_info{}; - auto [sstatus, remaining] = peer.sendto(peer_info, msg); - REQUIRE(sstatus == coro::net::send_status::ok); - REQUIRE(remaining.empty()); + auto wstatus = co_await peer.write_to(peer_info, msg); + REQUIRE(wstatus.is_ok()); co_return; }; @@ -31,12 +30,9 @@ TEST_CASE("udp one way", "[udp]") coro::net::udp::peer self{scheduler, self_info}; - auto pstatus = co_await self.poll(coro::poll_op::read); - REQUIRE(pstatus == coro::poll_status::read); - std::string buffer(64, '\0'); - auto [rstatus, peer_info, rspan] = self.recvfrom(buffer); - REQUIRE(rstatus == coro::net::recv_status::ok); + auto [rstatus, peer_info, rspan] = co_await self.read_from(buffer); + REQUIRE(rstatus.is_ok()); REQUIRE(peer_info.address == coro::net::ip_address::from_string("127.0.0.1")); // The peer's port will be randomly picked by the kernel since it wasn't bound. REQUIRE(rspan.size() == msg.size()); @@ -74,19 +70,14 @@ TEST_CASE("udp echo peers", "[udp]") if (send_first) { // Send my message to my peer first. - auto [sstatus, remaining] = me.sendto(peer_info, my_msg); - REQUIRE(sstatus == coro::net::send_status::ok); - REQUIRE(remaining.empty()); + auto wstatus= co_await me.write_to(peer_info, my_msg); + REQUIRE(wstatus.is_ok()); } else { - // Poll for my peers message first. - auto pstatus = co_await me.poll(coro::poll_op::read); - REQUIRE(pstatus == coro::poll_status::read); - std::string buffer(64, '\0'); - auto [rstatus, recv_peer_info, rspan] = me.recvfrom(buffer); - REQUIRE(rstatus == coro::net::recv_status::ok); + auto [rstatus, recv_peer_info, rspan] = co_await me.read_from(buffer); + REQUIRE(rstatus.is_ok()); REQUIRE(recv_peer_info == peer_info); REQUIRE(rspan.size() == peer_msg.size()); buffer.resize(rspan.size()); @@ -96,12 +87,9 @@ TEST_CASE("udp echo peers", "[udp]") if (send_first) { // I sent first so now I need to await my peer's message. - auto pstatus = co_await me.poll(coro::poll_op::read); - REQUIRE(pstatus == coro::poll_status::read); - std::string buffer(64, '\0'); - auto [rstatus, recv_peer_info, rspan] = me.recvfrom(buffer); - REQUIRE(rstatus == coro::net::recv_status::ok); + auto [rstatus, recv_peer_info, rspan] = co_await me.read_from(buffer); + REQUIRE(rstatus.is_ok()); REQUIRE(recv_peer_info == peer_info); REQUIRE(rspan.size() == peer_msg.size()); buffer.resize(rspan.size()); @@ -109,9 +97,8 @@ TEST_CASE("udp echo peers", "[udp]") } else { - auto [sstatus, remaining] = me.sendto(peer_info, my_msg); - REQUIRE(sstatus == coro::net::send_status::ok); - REQUIRE(remaining.empty()); + auto sstatus = co_await me.write_to(peer_info, my_msg); + REQUIRE(sstatus.is_ok()); } co_return;