Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion picoquic/picosocks.c
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ void picoquic_socks_cmsg_parse(
}
}
#ifdef UDP_COALESCED_INFO
if (cmsg->cmsg_level == IPPROTO_UDP &&
else if (cmsg->cmsg_level == IPPROTO_UDP &&
cmsg->cmsg_type == UDP_COALESCED_INFO) {
if (cmsg->cmsg_len > 0) {
if (udp_coalesced_size != NULL) {
Expand Down
156 changes: 149 additions & 7 deletions picoquic/sockloop.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@
#include <netdb.h>
#include <netinet/in.h>
#include <sys/select.h>
#define PICOQUIC_USE_POLL
#ifdef PICOQUIC_USE_POLL
#include <poll.h>
#endif

#ifndef __APPLE__
#ifdef __LINUX__
Expand Down Expand Up @@ -572,7 +576,110 @@ int picoquic_packet_loop_wait(picoquic_socket_ctx_t* s_ctx,
}
return bytes_recv;
}
#else
#else
#ifdef PICOQUIC_USE_POLL
void picoquic_packet_loop_set_fds(struct pollfd * poll_list,
picoquic_socket_ctx_t* s_ctx,
int nb_sockets,
picoquic_network_thread_ctx_t* thread_ctx)
{
int nb_pollfd = nb_sockets + (thread_ctx->wake_up_defined) ? 1 : 0;
memset(poll_list, 0, sizeof(struct pollfd)* (PICOQUIC_PACKET_LOOP_SOCKETS_MAX+1));
int i_poll = 0;

if (thread_ctx->wake_up_defined) {
poll_list[0].fd = (int)thread_ctx->wake_up_pipe_fd[0];
poll_list[0].events = POLLIN;
i_poll = 1;
}
for (int i = 0; i < nb_sockets && i < PICOQUIC_PACKET_LOOP_SOCKETS_MAX; i++, i_poll++) {
poll_list[i_poll].fd = (int)s_ctx[i].fd;
poll_list[i_poll].events = POLLIN;
}
for (; i_poll < PICOQUIC_PACKET_LOOP_SOCKETS_MAX + 1; i_poll++) {
poll_list[i_poll].fd = -1;
}
}


int picoquic_packet_loop_poll(
picoquic_socket_ctx_t* s_ctx,
int nb_sockets,
struct pollfd* poll_list,
struct sockaddr_storage* addr_from,
struct sockaddr_storage* addr_dest,
int* dest_if,
unsigned char* received_ecn,
uint8_t* buffer, int buffer_max,
int64_t delta_t,
int* is_wake_up_event,
picoquic_network_thread_ctx_t* thread_ctx,
int* socket_rank)
{
int delta_t_ms = (int)((delta_t + 999) / 1000);
int bytes_recv = 0;
int i_poll = (thread_ctx->wake_up_defined) ? 1 : 0;
int ret_poll = poll(poll_list, nb_sockets + i_poll, delta_t_ms);

if (received_ecn != NULL) {
*received_ecn = 0;
}
*is_wake_up_event = 0;

if (ret_poll < 0) {
bytes_recv = -1;
DBG_PRINTF("Error: poll returns %d\n", ret_poll);
}
else if (ret_poll > 0) {
/* Check if the 'wake up' pipe is full. If it is, read the data on it,
* set the is_wake_up_event flag, and ignore the other file descriptors. */
if (thread_ctx->wake_up_defined && poll_list[0].revents != 0) {
/* Something was written on the "wakeup" pipe. Read it. */
uint8_t eventbuf[8];
int pipe_recv;
DBG_PRINTF("Waking up -- defined: %d, nb_sockets: %d",
(thread_ctx->wake_up_defined) ? 1 : 0, nb_sockets);
if ((pipe_recv = read(thread_ctx->wake_up_pipe_fd[0], eventbuf, sizeof(eventbuf))) <= 0) {
bytes_recv = -1;
DBG_PRINTF("Error: read pipe returns %d\n", (pipe_recv == 0) ? EPIPE : errno);
}
else {
DBG_PRINTF("Waking up -- received: %d", pipe_recv);
*is_wake_up_event = 1;
}
}
else
{
for (int i = 0; i < nb_sockets; i++) {
if (poll_list[i+i_poll].revents != 0) {
*socket_rank = i;
bytes_recv = picoquic_recvmsg(s_ctx[i].fd, addr_from,
addr_dest, dest_if, received_ecn,
buffer, buffer_max);

if (bytes_recv <= 0) {
DBG_PRINTF("Could not receive packet on UDP socket[%d]= %d!\n",
i, (int)s_ctx[i].fd);
break;
}
else {
/* Document incoming port */
if (addr_dest->ss_family == AF_INET6) {
((struct sockaddr_in6*)addr_dest)->sin6_port = s_ctx[i].n_port;
}
else if (addr_dest->ss_family == AF_INET) {
((struct sockaddr_in*)addr_dest)->sin_port = s_ctx[i].n_port;
}
break;
}
}
}
}
}

return bytes_recv;
}
#else
int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx,
int nb_sockets,
struct sockaddr_storage* addr_from,
Expand Down Expand Up @@ -677,6 +784,7 @@ int picoquic_packet_loop_select(picoquic_socket_ctx_t* s_ctx,
return bytes_recv;
}
#endif
#endif

static int monitor_system_call_duration(packet_loop_system_call_duration_t* sc_duration, uint64_t current_time, uint64_t previous_time)
{
Expand Down Expand Up @@ -734,7 +842,7 @@ void* picoquic_packet_loop_v3(void* v_ctx)
size_t* send_msg_ptr = NULL;
int bytes_recv;
picoquic_connection_id_t log_cid;
picoquic_socket_ctx_t s_ctx[4];
picoquic_socket_ctx_t s_ctx[PICOQUIC_PACKET_LOOP_SOCKETS_MAX];
int nb_sockets = 0;
int nb_sockets_available = 0;
picoquic_cnx_t* last_cnx = NULL;
Expand All @@ -747,6 +855,10 @@ void* picoquic_packet_loop_v3(void* v_ctx)
#ifdef _WINDOWS
WSADATA wsaData = { 0 };
(void)WSA_START(MAKEWORD(2, 2), &wsaData);
#else
#ifdef PICOQUIC_USE_POLL
struct pollfd poll_list[PICOQUIC_PACKET_LOOP_SOCKETS_MAX + 1];
#endif
#endif

if (thread_ctx->thread_name != NULL) {
Expand Down Expand Up @@ -776,6 +888,13 @@ void* picoquic_packet_loop_v3(void* v_ctx)
ret = loop_callback(quic, picoquic_packet_loop_alt_port, loop_callback_ctx, &alt_port);
}
}
#ifndef _WINDOWS
#ifdef PICOQUIC_USE_POLL
if (ret == 0) {
picoquic_packet_loop_set_fds(poll_list, s_ctx, nb_sockets, thread_ctx);
}
#endif
#endif

if (ret == 0) {
nb_sockets_available = nb_sockets;
Expand Down Expand Up @@ -841,19 +960,30 @@ void* picoquic_packet_loop_v3(void* v_ctx)
loop_immediate = 0;
/* Remember the time before the select call, so it duration be monitored */
previous_time = current_time;
/* Initialize the dest addr family to UNSPEC yo handle systems that cannot set it. */
/* Initialize the dest addr family to UNSPEC to handle systems that cannot set it. */
addr_to.ss_family = AF_UNSPEC;
#ifdef _WINDOWS
bytes_recv = picoquic_packet_loop_wait(s_ctx, nb_sockets_available,
&addr_from, &addr_to, &if_index_to, &received_ecn, &received_buffer,
delta_t, &is_wake_up_event, thread_ctx, &socket_rank);
#else
#ifdef PICOQUIC_USE_POLL
bytes_recv = picoquic_packet_loop_poll(
s_ctx, nb_sockets_available,
poll_list,
& addr_from,
& addr_to, & if_index_to, & received_ecn,
buffer, sizeof(buffer),
delta_t, & is_wake_up_event, thread_ctx, & socket_rank);
received_buffer = buffer;
#else
bytes_recv = picoquic_packet_loop_select(s_ctx, nb_sockets_available,
&addr_from,
&addr_to, &if_index_to, &received_ecn,
buffer, sizeof(buffer),
delta_t, &is_wake_up_event, thread_ctx, &socket_rank);
received_buffer = buffer;
#endif
#endif
current_time = picoquic_current_time();
if (options.do_system_call_duration && delta_t == 0 &&
Expand Down Expand Up @@ -902,16 +1032,14 @@ void* picoquic_packet_loop_v3(void* v_ctx)
(struct sockaddr*)&addr_to, if_index_to, received_ecn,
&last_cnx, current_time);
#endif


if (loop_callback != NULL) {
size_t b_recvd = (size_t)bytes_recv;
ret = loop_callback(quic, picoquic_packet_loop_after_receive, loop_callback_ctx, &b_recvd);
}

/* If the number of packets received in immediate mode has not
* reached the threshold, set the "immediate" flag and bypass
* the sending code.
* the sending code.
*/
if (ret == 0 && nb_loop_immediate < PICOQUIC_PACKET_LOOP_RECV_MAX) {
loop_immediate = 1;
Expand All @@ -932,6 +1060,11 @@ void* picoquic_packet_loop_v3(void* v_ctx)
* memorized for that path.
*/
nb_sockets_available = nb_sockets / 2;
#ifndef _WINDOWS
#ifdef PICOQUIC_USE_POLL
picoquic_packet_loop_set_fds(poll_list, s_ctx, nb_sockets_available, thread_ctx);
#endif
#endif
}
ret = 0;
}
Expand Down Expand Up @@ -1004,7 +1137,13 @@ void* picoquic_packet_loop_v3(void* v_ctx)
send_port = new_ctx->n_port;
nb_sockets_available++;
if (nb_sockets < nb_sockets_available) {
DBG_PRINTF("new socket, nb = %d", nb_sockets_available);
nb_sockets = nb_sockets_available;
#ifndef _WINDOWS
#ifdef PICOQUIC_USE_POLL
picoquic_packet_loop_set_fds(poll_list, s_ctx, nb_sockets_available, thread_ctx);
#endif
#endif
}
}
}
Expand All @@ -1016,12 +1155,12 @@ void* picoquic_packet_loop_v3(void* v_ctx)
}
else
{

if (param->simulate_eio && send_length > PICOQUIC_MAX_PACKET_SIZE) {
/* Test hook, simulating a driver that does not support GSO */
sock_ret = -1;
sock_err = EIO;
param->simulate_eio = 0;
DBG_PRINTF("Simulating EIO, send length = %zu", send_length);
}
else {
sock_ret = picoquic_sendmsg(send_socket,
Expand Down Expand Up @@ -1054,6 +1193,7 @@ void* picoquic_packet_loop_v3(void* v_ctx)
size_t packet_size = send_msg_size;

while (packet_index < send_length) {
DBG_PRINTF("EIO, length= %zu/%zu", packet_index, send_length);
if (packet_index + packet_size > send_length) {
packet_size = send_length - packet_index;
}
Expand All @@ -1064,6 +1204,8 @@ void* picoquic_packet_loop_v3(void* v_ctx)
packet_index += packet_size;
}
else {
DBG_PRINTF("Retry with packet size=%zu fails at index %zu, ret=%d, err=%d.",
packet_size, packet_index, sock_ret, sock_err);
picoquic_log_app_message(last_cnx, "Retry with packet size=%zu fails at index %zu, ret=%d, err=%d.",
packet_size, packet_index, sock_ret, sock_err);
break;
Expand Down
Loading