From 196c88f50a47037da64214e802ee327fefe4adc5 Mon Sep 17 00:00:00 2001 From: armelvil <103518994+armelvil@users.noreply.github.com> Date: Mon, 12 Jan 2026 15:14:40 -0500 Subject: [PATCH 01/10] Hacky attempt to modify control port API to: Allow listing control ports Allow removal by IP address instead of port index # --- src/control_socket.cpp | 10 +++- src/control_socket.h | 1 + src/hd-rum-translator/hd-rum-translator.cpp | 61 +++++++++++++++++++-- 3 files changed, 63 insertions(+), 9 deletions(-) diff --git a/src/control_socket.cpp b/src/control_socket.cpp index 8b8595ea6..6a75e617f 100644 --- a/src/control_socket.cpp +++ b/src/control_socket.cpp @@ -852,7 +852,6 @@ static void * control_thread(void *args) socket_error("[control socket] accept"); continue; } - // all remote sockets are written sequentially so // we don't want to block if one gets stuck set_socket_nonblock(fd); @@ -1063,7 +1062,8 @@ get_control_state(struct module *mod) return (struct control_state *) control_mod->priv_data; } -static void print_control_help() { + + static void print_control_help() { color_printf("Control internal commands:\n" TBOLD("\texit") "\n" TBOLD("\tpause") "\n" @@ -1081,10 +1081,14 @@ static void print_control_help() { " - (un)mutes audio sender or receiver\n" TBOLD("\tpostprocess | flush") "\n" TBOLD("\tdump-tree")"\n"); + color_printf("\nHD-RUM Translator commands:\n" + TBOLD("\tcreate-port [compression]") " - create new output port\n" + TBOLD("\tdelete-port ") " - remove output port\n" + TBOLD("\tlist-ports") " - show all configured output ports and their IP addresses\n"); color_printf("\nOther commands can be issued directly to individual " "modules (see \"" TBOLD("dump-tree") "\"), eg.:\n" "\t" TBOLD("capture.filter mirror") "\n" "\nSometimes those modules support help (eg. \"" TBOLD("capture.filter help") "\")\n\n"); color_printf(TBOLD(u8"¹") " audio commands applying to receiver\n\n"); -} + } diff --git a/src/control_socket.h b/src/control_socket.h index e94520943..434b1b0a3 100644 --- a/src/control_socket.h +++ b/src/control_socket.h @@ -51,6 +51,7 @@ struct module; * @retval 0 if success */ int control_init(int port, int connection_type, struct control_state **state, struct module *root_module, int force_ip_version); +int control_get_port(struct control_state *state); struct control_state *get_control_state(struct module *mod); void control_start(struct control_state *state); void control_done(struct control_state *s); diff --git a/src/hd-rum-translator/hd-rum-translator.cpp b/src/hd-rum-translator/hd-rum-translator.cpp index 56649e7dc..2b13baefd 100644 --- a/src/hd-rum-translator/hd-rum-translator.cpp +++ b/src/hd-rum-translator/hd-rum-translator.cpp @@ -106,6 +106,7 @@ struct replica { replica(const char *addr, uint16_t rx_port, uint16_t tx_port, int bufsize, struct module *parent, int force_ip_version) { magic = REPLICA_MAGIC; host = addr; + ip_address = addr; // Store IP address for identification m_tx_port = tx_port; sock = std::shared_ptr(udp_init(addr, rx_port, tx_port, 255, force_ip_version, false), udp_exit); int mode = 0; @@ -133,6 +134,7 @@ struct replica { struct module mod; uint32_t magic; string host; + string ip_address; // Add this field for IP-based identification int m_tx_port; enum type_t { @@ -386,7 +388,14 @@ static void *writer(void *arg) if (strncasecmp(msg->text, "delete-port ", strlen("delete-port ")) == 0) { char *port_spec = msg->text + strlen("delete-port "); int index = -1; - if (isdigit(port_spec[0])) { + bool is_all_digits = true; + for (int j = 0; port_spec[j] != '\0'; j++) { + if (!isdigit(port_spec[j])) { + is_all_digits = false; + break; + } + } + if (is_all_digits && strlen(port_spec) > 0) { int i = stoi(port_spec); if (i >= 0 && i < (int) s->replicas.size()) { index = i; @@ -394,16 +403,31 @@ static void *writer(void *arg) log_msg(LOG_LEVEL_WARNING, "Invalid port index: %d. Not removing.\n", i); } } else { + // It's not all digits, so treat as IP address or name int i = 0; + // Check for IP address match first for (auto r : s->replicas) { - if (strcmp(r->mod.name, port_spec) == 0) { - index = i; - break; + // Ensure replica and its IP address are valid before comparing + if (!r->ip_address.empty() && r->ip_address == port_spec) { + index = i; + break; } i++; } + // If not found by IP, check by port name if (index == -1) { - log_msg(LOG_LEVEL_WARNING, "Unknown port name: %s. Not removing.\n", port_spec); + i = 0; + for (auto r : s->replicas) { + if (strcmp(r->mod.name, port_spec) == 0) { + index = i; + break; + } + i++; + } + } + // Log if neither IP address or name matches + if (index == -1) { + log_msg(LOG_LEVEL_WARNING, "Unknown port (IP or name): %s. Not removing.\n", port_spec); } } if (index >= 0) { @@ -412,6 +436,31 @@ static void *writer(void *arg) s->replicas.erase(s->replicas.begin() + index); log_msg(LOG_LEVEL_NOTICE, "Deleted output port %d.\n", index); } + } else if (strncasecmp(msg->text, "list-ports", strlen("list-ports")) == 0 || + strncasecmp(msg->text, "query-ports", strlen("query-ports")) == 0) { + // Debug: Check what we actually have + fprintf(stderr, "[DEBUG] s->replicas.size() = %zu\n", s->replicas.size()); + // List all current root ports and their IP addresses + string port_list = "Root ports:\n"; + if (s->replicas.empty()) { + port_list += " No ports configured.\n"; + } else { + for (size_t i = 0; i < s->replicas.size(); i++) { + const auto& replica = s->replicas[i]; + const char* type_str = (replica->type == replica::type_t::USE_SOCK) ? "forwarding" : + (replica->type == replica::type_t::RECOMPRESS) ? "transcoding" : "none"; + char port_info[512]; + snprintf(port_info, sizeof(port_info), " [%zu] %s:%d (%s) - %s\n", + i, replica->ip_address.c_str(), replica->m_tx_port, + replica->mod.name, type_str); + port_list += port_info; + } + } + // Also print to console for debugging + fprintf(stderr, "%s", port_list.c_str()); + fflush(stderr); + // Send back to netcat client + r = new_response(RESPONSE_OK, port_list.c_str()); } else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) { // format of parameters is either: // : [] @@ -482,7 +531,7 @@ static void *writer(void *arg) // distribute it to output ports that don't need transcoding #ifdef _WIN32 - // send it asynchronously in MSW (performance optimalization) + // send it asynchronously in MSW (performance optimization) SleepEx(0, true); // allow system to call our completion routines in APC int ref = 0; for (unsigned int i = 0; i < s->replicas.size(); i++) { From 42a6fcb8ac2b3e571b95452630301a5ffc4ebdb8 Mon Sep 17 00:00:00 2001 From: armelvil <103518994+armelvil@users.noreply.github.com> Date: Tue, 13 Jan 2026 12:35:36 -0500 Subject: [PATCH 02/10] - list ports now outputs correctly on terminal log but I remain no closer to getting a netcat output --- src/control_socket.cpp | 32 +++-- src/hd-rum-translator/hd-rum-translator.cpp | 145 ++++++++++---------- 2 files changed, 96 insertions(+), 81 deletions(-) diff --git a/src/control_socket.cpp b/src/control_socket.cpp index 6a75e617f..11905ceef 100644 --- a/src/control_socket.cpp +++ b/src/control_socket.cpp @@ -655,17 +655,25 @@ static int process_msg(struct control_state *s, fd_t client_fd, char *message, s dump_tree(s->root_module, 0); resp = new_response(RESPONSE_OK, NULL); } else { // assume message in format "path message" - struct msg_universal *msg = (struct msg_universal *) - new_message(sizeof(struct msg_universal)); - - if (strchr(message, ' ')) { - memcpy(path, message, strchr(message, ' ') - message); - strncpy(msg->text, strchr(message, ' ') + 1, sizeof(path) - 1); - } else { - strncpy(path, message, sizeof(path) - 1); // empty message ?? + struct msg_universal *msg = (struct msg_universal *) + new_message(sizeof(struct msg_universal)); + + if (strchr(message, ' ')) { + size_t path_len = strchr(message, ' ') - message; + memcpy(path, message, path_len); + path[path_len] = '\0'; + strncpy(msg->text, strchr(message, ' ') + 1, sizeof(msg->text) - 1); + + // If path is "root", send directly to root module + if (strcmp(path, "root") == 0) { + path[0] = '\0'; } + } else { + path[0] = '\0'; + strncpy(msg->text, message, sizeof(msg->text) - 1); + } - resp = send_message(s->root_module, path, (struct message *) msg); + resp = send_message_sync(s->root_module, path, (struct message *) msg, 100, 0); } if(!resp) { @@ -695,8 +703,12 @@ static void send_response(fd_t fd, struct response *resp) if (ret < 0) { socket_error("Unable to write response"); } + #ifndef _WIN32 + // Force flush on POSIX systems + fsync(fd); + #endif - free_response(resp); + free_response(resp); } static bool parse_msg(char *buffer, int buffer_len, /* out */ char *message, int *new_buffer_len) diff --git a/src/hd-rum-translator/hd-rum-translator.cpp b/src/hd-rum-translator/hd-rum-translator.cpp index 2b13baefd..c1813e444 100644 --- a/src/hd-rum-translator/hd-rum-translator.cpp +++ b/src/hd-rum-translator/hd-rum-translator.cpp @@ -436,85 +436,82 @@ static void *writer(void *arg) s->replicas.erase(s->replicas.begin() + index); log_msg(LOG_LEVEL_NOTICE, "Deleted output port %d.\n", index); } - } else if (strncasecmp(msg->text, "list-ports", strlen("list-ports")) == 0 || - strncasecmp(msg->text, "query-ports", strlen("query-ports")) == 0) { - // Debug: Check what we actually have - fprintf(stderr, "[DEBUG] s->replicas.size() = %zu\n", s->replicas.size()); - // List all current root ports and their IP addresses - string port_list = "Root ports:\n"; - if (s->replicas.empty()) { - port_list += " No ports configured.\n"; - } else { - for (size_t i = 0; i < s->replicas.size(); i++) { - const auto& replica = s->replicas[i]; - const char* type_str = (replica->type == replica::type_t::USE_SOCK) ? "forwarding" : - (replica->type == replica::type_t::RECOMPRESS) ? "transcoding" : "none"; - char port_info[512]; - snprintf(port_info, sizeof(port_info), " [%zu] %s:%d (%s) - %s\n", - i, replica->ip_address.c_str(), replica->m_tx_port, - replica->mod.name, type_str); - port_list += port_info; - } + r = new_response(RESPONSE_OK, NULL); + } else if (strncasecmp(msg->text, "list-ports", strlen("list-ports")) == 0 || + strncasecmp(msg->text, "query-ports", strlen("query-ports")) == 0) { + // List all current root ports and their IP addresses + string port_list = "Root ports:\n"; + if (s->replicas.empty()) { + port_list += " No ports configured.\n"; + } else { + for (size_t i = 0; i < s->replicas.size(); i++) { + const auto& replica = s->replicas[i]; + const char* type_str = (replica->type == replica::type_t::USE_SOCK) ? "forwarding" : + (replica->type == replica::type_t::RECOMPRESS) ? "transcoding" : "none"; + char port_info[512]; + snprintf(port_info, sizeof(port_info), "[%zu] %s:%d (%s) - %s\n", + i, replica->ip_address.c_str(), replica->m_tx_port, + replica->mod.name, type_str); + port_list += port_info; // FIXED: was port_list += port_list } - // Also print to console for debugging - fprintf(stderr, "%s", port_list.c_str()); - fflush(stderr); - // Send back to netcat client + } + log_msg(LOG_LEVEL_NOTICE, "Ports: %s\n", port_list.c_str()); r = new_response(RESPONSE_OK, port_list.c_str()); - } else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) { - // format of parameters is either: - // : [] - // or (for compat with older CoUniverse version) - // [] - char *host_port, *port_str = NULL, *save_ptr; - char *host; - int tx_port; - strtok_r(msg->text, " ", &save_ptr); - host_port = strtok_r(NULL, " ", &save_ptr); - if (host_port && (strchr(host_port, ':') != NULL || (port_str = strtok_r(NULL, " ", &save_ptr)) != NULL)) { - if (port_str) { - host = host_port; - tx_port = stoi(port_str); + } else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) { + // format of parameters is either: + // : [] + // or (for compat with older CoUniverse version) + // [] + char *host_port, *port_str = NULL, *save_ptr; + char *host; + int tx_port; + strtok_r(msg->text, " ", &save_ptr); + host_port = strtok_r(NULL, " ", &save_ptr); + if (host_port && (strchr(host_port, ':') != NULL || (port_str = strtok_r(NULL, " ", &save_ptr)) != NULL)) { + if (port_str) { + host = host_port; + tx_port = stoi(port_str); + } else { + tx_port = stoi(strrchr(host_port, ':') + 1); + host = host_port; + *strrchr(host_port, ':') = '\0'; + } + // handle square brackets around an IPv6 address + if (host[0] == '[' && host[strlen(host) - 1] == ']') { + host += 1; + host[strlen(host) - 1] = '\0'; + } } else { - tx_port = stoi(strrchr(host_port, ':') + 1); - host = host_port; - *strrchr(host_port, ':') = '\0'; + const char *err_msg = "wrong format"; + log_msg(LOG_LEVEL_ERROR, "%s\n", err_msg); + free_message((struct message *) msg, new_response(RESPONSE_BAD_REQUEST, err_msg)); + continue; } - // handle square brackets around an IPv6 address - if (host[0] == '[' && host[strlen(host) - 1] == ']') { - host += 1; - host[strlen(host) - 1] = '\0'; + char *compress = strtok_r(NULL, " ", &save_ptr); + + struct common_opts opts = { COMMON_OPTS_INIT }; + int idx = create_output_port(s, + host, 0, tx_port, s->bufsize, &opts, + compress, nullptr, RATE_UNLIMITED, s->server_socket != nullptr); + + if(idx < 0) { + r = new_response(RESPONSE_INT_SERV_ERR, "Cannot create output port."); + continue; } - } else { - const char *err_msg = "wrong format"; - log_msg(LOG_LEVEL_ERROR, "%s\n", err_msg); - free_message((struct message *) msg, new_response(RESPONSE_BAD_REQUEST, err_msg)); - continue; - } - char *compress = strtok_r(NULL, " ", &save_ptr); - struct common_opts opts = { COMMON_OPTS_INIT }; - int idx = create_output_port(s, - host, 0, tx_port, s->bufsize, &opts, - compress, nullptr, RATE_UNLIMITED, s->server_socket != nullptr); + if(compress) + log_msg(LOG_LEVEL_NOTICE, "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_port_ssrc(s->recompress, idx)); + else + log_msg(LOG_LEVEL_NOTICE, "Created new forwarding output port %s:%d.\n", host, tx_port); - if(idx < 0) { - free_message((struct message *) msg, new_response(RESPONSE_INT_SERV_ERR, "Cannot create output port.")); - continue; + r = new_response(RESPONSE_OK, NULL); + } else { + r = new_response(RESPONSE_BAD_REQUEST, NULL); } - if(compress) - log_msg(LOG_LEVEL_NOTICE, "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_port_ssrc(s->recompress, idx)); - else - log_msg(LOG_LEVEL_NOTICE, "Created new forwarding output port %s:%d.\n", host, tx_port); - - } else { - r = new_response(RESPONSE_BAD_REQUEST, NULL); + free_message((struct message *) msg, r); } - free_message((struct message *) msg, r ? r : new_response(RESPONSE_OK, NULL)); - } - // then process incoming packets while (s->qhead != s->qtail) { if(s->qhead->size == 0) { // poisoned pill @@ -574,10 +571,16 @@ static void *writer(void *arg) pthread_cond_signal(&s->qfull_cond); pthread_mutex_unlock(&s->qfull_mtx); } - pthread_mutex_lock(&s->qempty_mtx); - if (s->qempty) - pthread_cond_wait(&s->qempty_cond, &s->qempty_mtx); + if (s->qempty) { + // Use timed wait instead of indefinite wait so we can process messages periodically + struct timespec timeout; + struct timeval now; + gettimeofday(&now, NULL); + timeout.tv_sec = now.tv_sec + 1; // Wake up every 1 second + timeout.tv_nsec = now.tv_usec * 1000; + pthread_cond_timedwait(&s->qempty_cond, &s->qempty_mtx, &timeout); + } s->qempty = 1; pthread_mutex_unlock(&s->qempty_mtx); } From ec37ad698d6f9fec23edb0ab12200b0eaac09ae7 Mon Sep 17 00:00:00 2001 From: armelvil <103518994+armelvil@users.noreply.github.com> Date: Tue, 13 Jan 2026 13:14:15 -0500 Subject: [PATCH 03/10] - add 409 RESPONSE_CONFLICT category to messaging.h - add existing port checking to ensure the same client/port may not be added multiple times to a reflector --- src/hd-rum-translator/hd-rum-translator.cpp | 18 +++++++++++++++++- src/messaging.h | 1 + 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/hd-rum-translator/hd-rum-translator.cpp b/src/hd-rum-translator/hd-rum-translator.cpp index c1813e444..38d76d690 100644 --- a/src/hd-rum-translator/hd-rum-translator.cpp +++ b/src/hd-rum-translator/hd-rum-translator.cpp @@ -440,7 +440,7 @@ static void *writer(void *arg) } else if (strncasecmp(msg->text, "list-ports", strlen("list-ports")) == 0 || strncasecmp(msg->text, "query-ports", strlen("query-ports")) == 0) { // List all current root ports and their IP addresses - string port_list = "Root ports:\n"; + string port_list = "\nRoot ports:\n"; if (s->replicas.empty()) { port_list += " No ports configured.\n"; } else { @@ -489,6 +489,22 @@ static void *writer(void *arg) } char *compress = strtok_r(NULL, " ", &save_ptr); + // Check if a replica with the same host and port already exists + bool exists = false; + for (auto r : s->replicas) { + if (r->ip_address == host && r->m_tx_port == tx_port) { + exists = true; + break; + } + } + + if (exists) { + log_msg(LOG_LEVEL_ERROR, "Output port %s:%d already exists.\n", host, tx_port); + r = new_response(RESPONSE_CONFLICT, "Port already exists"); + free_message((struct message *) msg, r); + continue; + } + struct common_opts opts = { COMMON_OPTS_INIT }; int idx = create_output_port(s, host, 0, tx_port, s->bufsize, &opts, diff --git a/src/messaging.h b/src/messaging.h index 8e93c1f4a..8b8e320e4 100644 --- a/src/messaging.h +++ b/src/messaging.h @@ -62,6 +62,7 @@ struct response; #define RESPONSE_BAD_REQUEST 400 #define RESPONSE_NOT_FOUND 404 #define RESPONSE_REQ_TIMEOUT 408 +#define RESPONSE_CONFLICT 409 #define RESPONSE_INT_SERV_ERR 500 #define RESPONSE_NOT_IMPL 501 From b978b94f5553a80291d9c01e5f576221020cdfc9 Mon Sep 17 00:00:00 2001 From: armelvil <103518994+armelvil@users.noreply.github.com> Date: Wed, 14 Jan 2026 09:01:47 -0500 Subject: [PATCH 04/10] - add a timed wait function to the writer so that we are more likely to get return messages back out of netcat when a client makes a write request. Without this timer, we don't seem to get proper outputs in terminal sdtout or via netcat. I need to understand this better so that I can switch it back to a full event-based messaging system as appears to be the original intent, as effectively running a poll doesn't seem efficient to me. --- src/hd-rum-translator/hd-rum-translator.cpp | 68 +++++++++++++-------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/src/hd-rum-translator/hd-rum-translator.cpp b/src/hd-rum-translator/hd-rum-translator.cpp index 38d76d690..380508db7 100644 --- a/src/hd-rum-translator/hd-rum-translator.cpp +++ b/src/hd-rum-translator/hd-rum-translator.cpp @@ -333,8 +333,18 @@ static int create_output_port(struct hd_rum_translator_state *s, { struct replica *rep; try { - rep = new replica(addr, rx_port, tx_port, bufsize, &s->mod, + // Process the address string to handle IPv6 brackets + char *processed_addr = strdup(addr); + if (processed_addr[0] == '[' && processed_addr[strlen(processed_addr) - 1] == ']') { + processed_addr[0] = '\0'; + memmove(processed_addr, processed_addr + 1, strlen(processed_addr)); + processed_addr[strlen(processed_addr) - 1] = '\0'; + } + + rep = new replica(processed_addr, rx_port, tx_port, bufsize, &s->mod, common->force_ip_version); + free(processed_addr); + if(use_server_sock){ rep->sock = s->server_socket; } @@ -362,7 +372,8 @@ static int create_output_port(struct hd_rum_translator_state *s, } assert((unsigned) idx == s->replicas.size() - 1); - recompress_port_set_active(s->recompress, idx, compression != nullptr); + recompress_port_set_active(s->recompress, idx, + rep->type == replica::type_t::RECOMPRESS); return idx; } @@ -407,10 +418,10 @@ static void *writer(void *arg) int i = 0; // Check for IP address match first for (auto r : s->replicas) { - // Ensure replica and its IP address are valid before comparing - if (!r->ip_address.empty() && r->ip_address == port_spec) { - index = i; - break; + // Ensure replica and its IP address are valid before comparing + if (!r->ip_address.empty() && r->ip_address == port_spec) { + index = i; + break; } i++; } @@ -434,13 +445,19 @@ static void *writer(void *arg) recompress_remove_port(s->recompress, index); delete s->replicas[index]; s->replicas.erase(s->replicas.begin() + index); - log_msg(LOG_LEVEL_NOTICE, "Deleted output port %d.\n", index); + + char buffer[256]; + snprintf(buffer, sizeof(buffer), "Deleted output port %d.\n", index); + log_msg(LOG_LEVEL_NOTICE, "%s", buffer); + r = new_response(RESPONSE_OK, buffer); + } else { + r = new_response(RESPONSE_NOT_FOUND, "Port not found"); } - r = new_response(RESPONSE_OK, NULL); + } else if (strncasecmp(msg->text, "list-ports", strlen("list-ports")) == 0 || strncasecmp(msg->text, "query-ports", strlen("query-ports")) == 0) { // List all current root ports and their IP addresses - string port_list = "\nRoot ports:\n"; + string port_list = "\n"; if (s->replicas.empty()) { port_list += " No ports configured.\n"; } else { @@ -455,8 +472,10 @@ static void *writer(void *arg) port_list += port_info; // FIXED: was port_list += port_list } } - log_msg(LOG_LEVEL_NOTICE, "Ports: %s\n", port_list.c_str()); - r = new_response(RESPONSE_OK, port_list.c_str()); + char buffer[2048]; + snprintf(buffer, sizeof(buffer), "Ports: %s\n", port_list.c_str()); + log_msg(LOG_LEVEL_NOTICE, "%s", buffer); + r = new_response(RESPONSE_OK, buffer); } else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) { // format of parameters is either: // : [] @@ -515,19 +534,21 @@ static void *writer(void *arg) continue; } - if(compress) - log_msg(LOG_LEVEL_NOTICE, "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_port_ssrc(s->recompress, idx)); - else - log_msg(LOG_LEVEL_NOTICE, "Created new forwarding output port %s:%d.\n", host, tx_port); - - r = new_response(RESPONSE_OK, NULL); - } else { - r = new_response(RESPONSE_BAD_REQUEST, NULL); - } + if(compress) { + char buffer[256]; + snprintf(buffer, sizeof(buffer), "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_port_ssrc(s->recompress, idx)); + log_msg(LOG_LEVEL_NOTICE, "%s", buffer); + r = new_response(RESPONSE_OK, buffer); + } else { + char buffer[256]; + snprintf(buffer, sizeof(buffer), "Created new forwarding output port %s:%d.\n", host, tx_port); + log_msg(LOG_LEVEL_NOTICE, "%s", buffer); + r = new_response(RESPONSE_OK, buffer); + } free_message((struct message *) msg, r); } - + } // then process incoming packets while (s->qhead != s->qtail) { if(s->qhead->size == 0) { // poisoned pill @@ -593,14 +614,13 @@ static void *writer(void *arg) struct timespec timeout; struct timeval now; gettimeofday(&now, NULL); - timeout.tv_sec = now.tv_sec + 1; // Wake up every 1 second - timeout.tv_nsec = now.tv_usec * 1000; + timeout.tv_sec = now.tv_sec + 0; + timeout.tv_nsec = now.tv_usec * 1000 + 500000; pthread_cond_timedwait(&s->qempty_cond, &s->qempty_mtx, &timeout); } s->qempty = 1; pthread_mutex_unlock(&s->qempty_mtx); } - return NULL; } From 0503504055f7ea1fb26291e1d988f488f80be019 Mon Sep 17 00:00:00 2001 From: armelvil <103518994+armelvil@users.noreply.github.com> Date: Wed, 14 Jan 2026 12:09:08 -0500 Subject: [PATCH 05/10] - writer now works with a callback function so we can dispense with polling - ensured error messages are properly terminated with newline - moved messaging from end of writer function to complete;continue; from within each else condition - init char buffer for messages at start of else conditions - increase buffer size of control socket as we might be handling a list of clients now --- src/control_socket.cpp | 2 +- src/hd-rum-translator/hd-rum-translator.cpp | 60 +++++++++++++-------- 2 files changed, 38 insertions(+), 24 deletions(-) diff --git a/src/control_socket.cpp b/src/control_socket.cpp index 11905ceef..3e0507b34 100644 --- a/src/control_socket.cpp +++ b/src/control_socket.cpp @@ -88,7 +88,7 @@ using namespace std; struct client { fd_t fd; - char buff[1024]; + char buff[2048]; int buff_len; struct client *prev; diff --git a/src/hd-rum-translator/hd-rum-translator.cpp b/src/hd-rum-translator/hd-rum-translator.cpp index 380508db7..cf7c454ef 100644 --- a/src/hd-rum-translator/hd-rum-translator.cpp +++ b/src/hd-rum-translator/hd-rum-translator.cpp @@ -134,7 +134,7 @@ struct replica { struct module mod; uint32_t magic; string host; - string ip_address; // Add this field for IP-based identification + string ip_address; int m_tx_port; enum type_t { @@ -148,9 +148,13 @@ struct replica { socklen_t sockaddr_len; }; +void writer_new_message_callback(struct module *m); + struct hd_rum_translator_state { hd_rum_translator_state() { init_root_module(&mod); + mod.priv_data = this; + mod.new_message = writer_new_message_callback; pthread_mutex_init(&qempty_mtx, NULL); pthread_mutex_init(&qfull_mtx, NULL); pthread_cond_init(&qempty_cond, NULL); @@ -378,6 +382,18 @@ static int create_output_port(struct hd_rum_translator_state *s, return idx; } +void writer_new_message_callback(struct module *m) { + // add callback function + struct hd_rum_translator_state *s = (struct hd_rum_translator_state *) m->priv_data; + if (s) { + log_msg(LOG_LEVEL_DEBUG, "Message callback triggered, waking up writer thread\n"); + // Wake up the writer thread when a new message arrives + pthread_mutex_lock(&s->qempty_mtx); + pthread_cond_signal(&s->qempty_cond); + pthread_mutex_unlock(&s->qempty_mtx); + } +} + static void *writer(void *arg) { struct hd_rum_translator_state *s = @@ -397,6 +413,7 @@ static void *writer(void *arg) while ((msg = (struct msg_universal *) check_message(&s->mod))) { struct response *r = NULL; if (strncasecmp(msg->text, "delete-port ", strlen("delete-port ")) == 0) { + char buffer[2048]; char *port_spec = msg->text + strlen("delete-port "); int index = -1; bool is_all_digits = true; @@ -412,6 +429,7 @@ static void *writer(void *arg) index = i; } else { log_msg(LOG_LEVEL_WARNING, "Invalid port index: %d. Not removing.\n", i); + snprintf(buffer, sizeof(buffer), "Invalid port index: %d. Not removing.\n", i); } } else { // It's not all digits, so treat as IP address or name @@ -439,23 +457,26 @@ static void *writer(void *arg) // Log if neither IP address or name matches if (index == -1) { log_msg(LOG_LEVEL_WARNING, "Unknown port (IP or name): %s. Not removing.\n", port_spec); + snprintf(buffer, sizeof(buffer), "Unknown port (IP or name): %s. Not removing.\n", port_spec); } } if (index >= 0) { recompress_remove_port(s->recompress, index); delete s->replicas[index]; s->replicas.erase(s->replicas.begin() + index); - - char buffer[256]; snprintf(buffer, sizeof(buffer), "Deleted output port %d.\n", index); log_msg(LOG_LEVEL_NOTICE, "%s", buffer); r = new_response(RESPONSE_OK, buffer); } else { r = new_response(RESPONSE_NOT_FOUND, "Port not found"); } - + log_msg(LOG_LEVEL_NOTICE, "%s", buffer); + r = new_response(RESPONSE_OK, buffer); + free_message((struct message *) msg, r); + continue; } else if (strncasecmp(msg->text, "list-ports", strlen("list-ports")) == 0 || strncasecmp(msg->text, "query-ports", strlen("query-ports")) == 0) { + char buffer[2048]; // List all current root ports and their IP addresses string port_list = "\n"; if (s->replicas.empty()) { @@ -472,15 +493,17 @@ static void *writer(void *arg) port_list += port_info; // FIXED: was port_list += port_list } } - char buffer[2048]; snprintf(buffer, sizeof(buffer), "Ports: %s\n", port_list.c_str()); log_msg(LOG_LEVEL_NOTICE, "%s", buffer); r = new_response(RESPONSE_OK, buffer); + free_message((struct message *) msg, r); + continue; } else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) { // format of parameters is either: // : [] // or (for compat with older CoUniverse version) // [] + char buffer[2048]; char *host_port, *port_str = NULL, *save_ptr; char *host; int tx_port; @@ -501,7 +524,7 @@ static void *writer(void *arg) host[strlen(host) - 1] = '\0'; } } else { - const char *err_msg = "wrong format"; + const char *err_msg = "wrong format\n"; log_msg(LOG_LEVEL_ERROR, "%s\n", err_msg); free_message((struct message *) msg, new_response(RESPONSE_BAD_REQUEST, err_msg)); continue; @@ -519,7 +542,7 @@ static void *writer(void *arg) if (exists) { log_msg(LOG_LEVEL_ERROR, "Output port %s:%d already exists.\n", host, tx_port); - r = new_response(RESPONSE_CONFLICT, "Port already exists"); + r = new_response(RESPONSE_CONFLICT, "Port already exists\n"); free_message((struct message *) msg, r); continue; } @@ -530,23 +553,19 @@ static void *writer(void *arg) compress, nullptr, RATE_UNLIMITED, s->server_socket != nullptr); if(idx < 0) { - r = new_response(RESPONSE_INT_SERV_ERR, "Cannot create output port."); + r = new_response(RESPONSE_INT_SERV_ERR, "Cannot create output port.\n"); continue; } if(compress) { - char buffer[256]; snprintf(buffer, sizeof(buffer), "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_port_ssrc(s->recompress, idx)); - log_msg(LOG_LEVEL_NOTICE, "%s", buffer); - r = new_response(RESPONSE_OK, buffer); } else { - char buffer[256]; snprintf(buffer, sizeof(buffer), "Created new forwarding output port %s:%d.\n", host, tx_port); - log_msg(LOG_LEVEL_NOTICE, "%s", buffer); - r = new_response(RESPONSE_OK, buffer); } - - free_message((struct message *) msg, r); + log_msg(LOG_LEVEL_NOTICE, "%s", buffer); + r = new_response(RESPONSE_OK, buffer); + free_message((struct message *) msg, r); + continue; } } // then process incoming packets @@ -610,13 +629,8 @@ static void *writer(void *arg) } pthread_mutex_lock(&s->qempty_mtx); if (s->qempty) { - // Use timed wait instead of indefinite wait so we can process messages periodically - struct timespec timeout; - struct timeval now; - gettimeofday(&now, NULL); - timeout.tv_sec = now.tv_sec + 0; - timeout.tv_nsec = now.tv_usec * 1000 + 500000; - pthread_cond_timedwait(&s->qempty_cond, &s->qempty_mtx, &timeout); + // Wait indefinitely - we'll be woken up by new packets or messages + pthread_cond_wait(&s->qempty_cond, &s->qempty_mtx); } s->qempty = 1; pthread_mutex_unlock(&s->qempty_mtx); From dcfb09878eaeae0b956bce8e82e3e545188a1c46 Mon Sep 17 00:00:00 2001 From: armelvil <103518994+armelvil@users.noreply.github.com> Date: Mon, 12 Jan 2026 15:14:40 -0500 Subject: [PATCH 06/10] Hacky attempt to modify control port API to: Allow listing control ports Allow removal by IP address instead of port index # --- src/control_socket.cpp | 10 +++- src/control_socket.h | 1 + src/hd-rum-translator/hd-rum-translator.cpp | 61 +++++++++++++++++++-- 3 files changed, 63 insertions(+), 9 deletions(-) diff --git a/src/control_socket.cpp b/src/control_socket.cpp index 8b8595ea6..6a75e617f 100644 --- a/src/control_socket.cpp +++ b/src/control_socket.cpp @@ -852,7 +852,6 @@ static void * control_thread(void *args) socket_error("[control socket] accept"); continue; } - // all remote sockets are written sequentially so // we don't want to block if one gets stuck set_socket_nonblock(fd); @@ -1063,7 +1062,8 @@ get_control_state(struct module *mod) return (struct control_state *) control_mod->priv_data; } -static void print_control_help() { + + static void print_control_help() { color_printf("Control internal commands:\n" TBOLD("\texit") "\n" TBOLD("\tpause") "\n" @@ -1081,10 +1081,14 @@ static void print_control_help() { " - (un)mutes audio sender or receiver\n" TBOLD("\tpostprocess | flush") "\n" TBOLD("\tdump-tree")"\n"); + color_printf("\nHD-RUM Translator commands:\n" + TBOLD("\tcreate-port [compression]") " - create new output port\n" + TBOLD("\tdelete-port ") " - remove output port\n" + TBOLD("\tlist-ports") " - show all configured output ports and their IP addresses\n"); color_printf("\nOther commands can be issued directly to individual " "modules (see \"" TBOLD("dump-tree") "\"), eg.:\n" "\t" TBOLD("capture.filter mirror") "\n" "\nSometimes those modules support help (eg. \"" TBOLD("capture.filter help") "\")\n\n"); color_printf(TBOLD(u8"¹") " audio commands applying to receiver\n\n"); -} + } diff --git a/src/control_socket.h b/src/control_socket.h index e94520943..434b1b0a3 100644 --- a/src/control_socket.h +++ b/src/control_socket.h @@ -51,6 +51,7 @@ struct module; * @retval 0 if success */ int control_init(int port, int connection_type, struct control_state **state, struct module *root_module, int force_ip_version); +int control_get_port(struct control_state *state); struct control_state *get_control_state(struct module *mod); void control_start(struct control_state *state); void control_done(struct control_state *s); diff --git a/src/hd-rum-translator/hd-rum-translator.cpp b/src/hd-rum-translator/hd-rum-translator.cpp index 56649e7dc..2b13baefd 100644 --- a/src/hd-rum-translator/hd-rum-translator.cpp +++ b/src/hd-rum-translator/hd-rum-translator.cpp @@ -106,6 +106,7 @@ struct replica { replica(const char *addr, uint16_t rx_port, uint16_t tx_port, int bufsize, struct module *parent, int force_ip_version) { magic = REPLICA_MAGIC; host = addr; + ip_address = addr; // Store IP address for identification m_tx_port = tx_port; sock = std::shared_ptr(udp_init(addr, rx_port, tx_port, 255, force_ip_version, false), udp_exit); int mode = 0; @@ -133,6 +134,7 @@ struct replica { struct module mod; uint32_t magic; string host; + string ip_address; // Add this field for IP-based identification int m_tx_port; enum type_t { @@ -386,7 +388,14 @@ static void *writer(void *arg) if (strncasecmp(msg->text, "delete-port ", strlen("delete-port ")) == 0) { char *port_spec = msg->text + strlen("delete-port "); int index = -1; - if (isdigit(port_spec[0])) { + bool is_all_digits = true; + for (int j = 0; port_spec[j] != '\0'; j++) { + if (!isdigit(port_spec[j])) { + is_all_digits = false; + break; + } + } + if (is_all_digits && strlen(port_spec) > 0) { int i = stoi(port_spec); if (i >= 0 && i < (int) s->replicas.size()) { index = i; @@ -394,16 +403,31 @@ static void *writer(void *arg) log_msg(LOG_LEVEL_WARNING, "Invalid port index: %d. Not removing.\n", i); } } else { + // It's not all digits, so treat as IP address or name int i = 0; + // Check for IP address match first for (auto r : s->replicas) { - if (strcmp(r->mod.name, port_spec) == 0) { - index = i; - break; + // Ensure replica and its IP address are valid before comparing + if (!r->ip_address.empty() && r->ip_address == port_spec) { + index = i; + break; } i++; } + // If not found by IP, check by port name if (index == -1) { - log_msg(LOG_LEVEL_WARNING, "Unknown port name: %s. Not removing.\n", port_spec); + i = 0; + for (auto r : s->replicas) { + if (strcmp(r->mod.name, port_spec) == 0) { + index = i; + break; + } + i++; + } + } + // Log if neither IP address or name matches + if (index == -1) { + log_msg(LOG_LEVEL_WARNING, "Unknown port (IP or name): %s. Not removing.\n", port_spec); } } if (index >= 0) { @@ -412,6 +436,31 @@ static void *writer(void *arg) s->replicas.erase(s->replicas.begin() + index); log_msg(LOG_LEVEL_NOTICE, "Deleted output port %d.\n", index); } + } else if (strncasecmp(msg->text, "list-ports", strlen("list-ports")) == 0 || + strncasecmp(msg->text, "query-ports", strlen("query-ports")) == 0) { + // Debug: Check what we actually have + fprintf(stderr, "[DEBUG] s->replicas.size() = %zu\n", s->replicas.size()); + // List all current root ports and their IP addresses + string port_list = "Root ports:\n"; + if (s->replicas.empty()) { + port_list += " No ports configured.\n"; + } else { + for (size_t i = 0; i < s->replicas.size(); i++) { + const auto& replica = s->replicas[i]; + const char* type_str = (replica->type == replica::type_t::USE_SOCK) ? "forwarding" : + (replica->type == replica::type_t::RECOMPRESS) ? "transcoding" : "none"; + char port_info[512]; + snprintf(port_info, sizeof(port_info), " [%zu] %s:%d (%s) - %s\n", + i, replica->ip_address.c_str(), replica->m_tx_port, + replica->mod.name, type_str); + port_list += port_info; + } + } + // Also print to console for debugging + fprintf(stderr, "%s", port_list.c_str()); + fflush(stderr); + // Send back to netcat client + r = new_response(RESPONSE_OK, port_list.c_str()); } else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) { // format of parameters is either: // : [] @@ -482,7 +531,7 @@ static void *writer(void *arg) // distribute it to output ports that don't need transcoding #ifdef _WIN32 - // send it asynchronously in MSW (performance optimalization) + // send it asynchronously in MSW (performance optimization) SleepEx(0, true); // allow system to call our completion routines in APC int ref = 0; for (unsigned int i = 0; i < s->replicas.size(); i++) { From d35295947216d0972d89fab51513a8d0b3e51392 Mon Sep 17 00:00:00 2001 From: armelvil <103518994+armelvil@users.noreply.github.com> Date: Tue, 13 Jan 2026 12:35:36 -0500 Subject: [PATCH 07/10] - list ports now outputs correctly on terminal log but I remain no closer to getting a netcat output --- src/control_socket.cpp | 32 +++-- src/hd-rum-translator/hd-rum-translator.cpp | 145 ++++++++++---------- 2 files changed, 96 insertions(+), 81 deletions(-) diff --git a/src/control_socket.cpp b/src/control_socket.cpp index 6a75e617f..11905ceef 100644 --- a/src/control_socket.cpp +++ b/src/control_socket.cpp @@ -655,17 +655,25 @@ static int process_msg(struct control_state *s, fd_t client_fd, char *message, s dump_tree(s->root_module, 0); resp = new_response(RESPONSE_OK, NULL); } else { // assume message in format "path message" - struct msg_universal *msg = (struct msg_universal *) - new_message(sizeof(struct msg_universal)); - - if (strchr(message, ' ')) { - memcpy(path, message, strchr(message, ' ') - message); - strncpy(msg->text, strchr(message, ' ') + 1, sizeof(path) - 1); - } else { - strncpy(path, message, sizeof(path) - 1); // empty message ?? + struct msg_universal *msg = (struct msg_universal *) + new_message(sizeof(struct msg_universal)); + + if (strchr(message, ' ')) { + size_t path_len = strchr(message, ' ') - message; + memcpy(path, message, path_len); + path[path_len] = '\0'; + strncpy(msg->text, strchr(message, ' ') + 1, sizeof(msg->text) - 1); + + // If path is "root", send directly to root module + if (strcmp(path, "root") == 0) { + path[0] = '\0'; } + } else { + path[0] = '\0'; + strncpy(msg->text, message, sizeof(msg->text) - 1); + } - resp = send_message(s->root_module, path, (struct message *) msg); + resp = send_message_sync(s->root_module, path, (struct message *) msg, 100, 0); } if(!resp) { @@ -695,8 +703,12 @@ static void send_response(fd_t fd, struct response *resp) if (ret < 0) { socket_error("Unable to write response"); } + #ifndef _WIN32 + // Force flush on POSIX systems + fsync(fd); + #endif - free_response(resp); + free_response(resp); } static bool parse_msg(char *buffer, int buffer_len, /* out */ char *message, int *new_buffer_len) diff --git a/src/hd-rum-translator/hd-rum-translator.cpp b/src/hd-rum-translator/hd-rum-translator.cpp index 2b13baefd..c1813e444 100644 --- a/src/hd-rum-translator/hd-rum-translator.cpp +++ b/src/hd-rum-translator/hd-rum-translator.cpp @@ -436,85 +436,82 @@ static void *writer(void *arg) s->replicas.erase(s->replicas.begin() + index); log_msg(LOG_LEVEL_NOTICE, "Deleted output port %d.\n", index); } - } else if (strncasecmp(msg->text, "list-ports", strlen("list-ports")) == 0 || - strncasecmp(msg->text, "query-ports", strlen("query-ports")) == 0) { - // Debug: Check what we actually have - fprintf(stderr, "[DEBUG] s->replicas.size() = %zu\n", s->replicas.size()); - // List all current root ports and their IP addresses - string port_list = "Root ports:\n"; - if (s->replicas.empty()) { - port_list += " No ports configured.\n"; - } else { - for (size_t i = 0; i < s->replicas.size(); i++) { - const auto& replica = s->replicas[i]; - const char* type_str = (replica->type == replica::type_t::USE_SOCK) ? "forwarding" : - (replica->type == replica::type_t::RECOMPRESS) ? "transcoding" : "none"; - char port_info[512]; - snprintf(port_info, sizeof(port_info), " [%zu] %s:%d (%s) - %s\n", - i, replica->ip_address.c_str(), replica->m_tx_port, - replica->mod.name, type_str); - port_list += port_info; - } + r = new_response(RESPONSE_OK, NULL); + } else if (strncasecmp(msg->text, "list-ports", strlen("list-ports")) == 0 || + strncasecmp(msg->text, "query-ports", strlen("query-ports")) == 0) { + // List all current root ports and their IP addresses + string port_list = "Root ports:\n"; + if (s->replicas.empty()) { + port_list += " No ports configured.\n"; + } else { + for (size_t i = 0; i < s->replicas.size(); i++) { + const auto& replica = s->replicas[i]; + const char* type_str = (replica->type == replica::type_t::USE_SOCK) ? "forwarding" : + (replica->type == replica::type_t::RECOMPRESS) ? "transcoding" : "none"; + char port_info[512]; + snprintf(port_info, sizeof(port_info), "[%zu] %s:%d (%s) - %s\n", + i, replica->ip_address.c_str(), replica->m_tx_port, + replica->mod.name, type_str); + port_list += port_info; // FIXED: was port_list += port_list } - // Also print to console for debugging - fprintf(stderr, "%s", port_list.c_str()); - fflush(stderr); - // Send back to netcat client + } + log_msg(LOG_LEVEL_NOTICE, "Ports: %s\n", port_list.c_str()); r = new_response(RESPONSE_OK, port_list.c_str()); - } else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) { - // format of parameters is either: - // : [] - // or (for compat with older CoUniverse version) - // [] - char *host_port, *port_str = NULL, *save_ptr; - char *host; - int tx_port; - strtok_r(msg->text, " ", &save_ptr); - host_port = strtok_r(NULL, " ", &save_ptr); - if (host_port && (strchr(host_port, ':') != NULL || (port_str = strtok_r(NULL, " ", &save_ptr)) != NULL)) { - if (port_str) { - host = host_port; - tx_port = stoi(port_str); + } else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) { + // format of parameters is either: + // : [] + // or (for compat with older CoUniverse version) + // [] + char *host_port, *port_str = NULL, *save_ptr; + char *host; + int tx_port; + strtok_r(msg->text, " ", &save_ptr); + host_port = strtok_r(NULL, " ", &save_ptr); + if (host_port && (strchr(host_port, ':') != NULL || (port_str = strtok_r(NULL, " ", &save_ptr)) != NULL)) { + if (port_str) { + host = host_port; + tx_port = stoi(port_str); + } else { + tx_port = stoi(strrchr(host_port, ':') + 1); + host = host_port; + *strrchr(host_port, ':') = '\0'; + } + // handle square brackets around an IPv6 address + if (host[0] == '[' && host[strlen(host) - 1] == ']') { + host += 1; + host[strlen(host) - 1] = '\0'; + } } else { - tx_port = stoi(strrchr(host_port, ':') + 1); - host = host_port; - *strrchr(host_port, ':') = '\0'; + const char *err_msg = "wrong format"; + log_msg(LOG_LEVEL_ERROR, "%s\n", err_msg); + free_message((struct message *) msg, new_response(RESPONSE_BAD_REQUEST, err_msg)); + continue; } - // handle square brackets around an IPv6 address - if (host[0] == '[' && host[strlen(host) - 1] == ']') { - host += 1; - host[strlen(host) - 1] = '\0'; + char *compress = strtok_r(NULL, " ", &save_ptr); + + struct common_opts opts = { COMMON_OPTS_INIT }; + int idx = create_output_port(s, + host, 0, tx_port, s->bufsize, &opts, + compress, nullptr, RATE_UNLIMITED, s->server_socket != nullptr); + + if(idx < 0) { + r = new_response(RESPONSE_INT_SERV_ERR, "Cannot create output port."); + continue; } - } else { - const char *err_msg = "wrong format"; - log_msg(LOG_LEVEL_ERROR, "%s\n", err_msg); - free_message((struct message *) msg, new_response(RESPONSE_BAD_REQUEST, err_msg)); - continue; - } - char *compress = strtok_r(NULL, " ", &save_ptr); - struct common_opts opts = { COMMON_OPTS_INIT }; - int idx = create_output_port(s, - host, 0, tx_port, s->bufsize, &opts, - compress, nullptr, RATE_UNLIMITED, s->server_socket != nullptr); + if(compress) + log_msg(LOG_LEVEL_NOTICE, "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_port_ssrc(s->recompress, idx)); + else + log_msg(LOG_LEVEL_NOTICE, "Created new forwarding output port %s:%d.\n", host, tx_port); - if(idx < 0) { - free_message((struct message *) msg, new_response(RESPONSE_INT_SERV_ERR, "Cannot create output port.")); - continue; + r = new_response(RESPONSE_OK, NULL); + } else { + r = new_response(RESPONSE_BAD_REQUEST, NULL); } - if(compress) - log_msg(LOG_LEVEL_NOTICE, "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_port_ssrc(s->recompress, idx)); - else - log_msg(LOG_LEVEL_NOTICE, "Created new forwarding output port %s:%d.\n", host, tx_port); - - } else { - r = new_response(RESPONSE_BAD_REQUEST, NULL); + free_message((struct message *) msg, r); } - free_message((struct message *) msg, r ? r : new_response(RESPONSE_OK, NULL)); - } - // then process incoming packets while (s->qhead != s->qtail) { if(s->qhead->size == 0) { // poisoned pill @@ -574,10 +571,16 @@ static void *writer(void *arg) pthread_cond_signal(&s->qfull_cond); pthread_mutex_unlock(&s->qfull_mtx); } - pthread_mutex_lock(&s->qempty_mtx); - if (s->qempty) - pthread_cond_wait(&s->qempty_cond, &s->qempty_mtx); + if (s->qempty) { + // Use timed wait instead of indefinite wait so we can process messages periodically + struct timespec timeout; + struct timeval now; + gettimeofday(&now, NULL); + timeout.tv_sec = now.tv_sec + 1; // Wake up every 1 second + timeout.tv_nsec = now.tv_usec * 1000; + pthread_cond_timedwait(&s->qempty_cond, &s->qempty_mtx, &timeout); + } s->qempty = 1; pthread_mutex_unlock(&s->qempty_mtx); } From 866956be1c7e2d3c072c412f300cebdee90810e9 Mon Sep 17 00:00:00 2001 From: armelvil <103518994+armelvil@users.noreply.github.com> Date: Tue, 13 Jan 2026 13:14:15 -0500 Subject: [PATCH 08/10] - add 409 RESPONSE_CONFLICT category to messaging.h - add existing port checking to ensure the same client/port may not be added multiple times to a reflector --- src/hd-rum-translator/hd-rum-translator.cpp | 18 +++++++++++++++++- src/messaging.h | 1 + 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/hd-rum-translator/hd-rum-translator.cpp b/src/hd-rum-translator/hd-rum-translator.cpp index c1813e444..38d76d690 100644 --- a/src/hd-rum-translator/hd-rum-translator.cpp +++ b/src/hd-rum-translator/hd-rum-translator.cpp @@ -440,7 +440,7 @@ static void *writer(void *arg) } else if (strncasecmp(msg->text, "list-ports", strlen("list-ports")) == 0 || strncasecmp(msg->text, "query-ports", strlen("query-ports")) == 0) { // List all current root ports and their IP addresses - string port_list = "Root ports:\n"; + string port_list = "\nRoot ports:\n"; if (s->replicas.empty()) { port_list += " No ports configured.\n"; } else { @@ -489,6 +489,22 @@ static void *writer(void *arg) } char *compress = strtok_r(NULL, " ", &save_ptr); + // Check if a replica with the same host and port already exists + bool exists = false; + for (auto r : s->replicas) { + if (r->ip_address == host && r->m_tx_port == tx_port) { + exists = true; + break; + } + } + + if (exists) { + log_msg(LOG_LEVEL_ERROR, "Output port %s:%d already exists.\n", host, tx_port); + r = new_response(RESPONSE_CONFLICT, "Port already exists"); + free_message((struct message *) msg, r); + continue; + } + struct common_opts opts = { COMMON_OPTS_INIT }; int idx = create_output_port(s, host, 0, tx_port, s->bufsize, &opts, diff --git a/src/messaging.h b/src/messaging.h index 8e93c1f4a..8b8e320e4 100644 --- a/src/messaging.h +++ b/src/messaging.h @@ -62,6 +62,7 @@ struct response; #define RESPONSE_BAD_REQUEST 400 #define RESPONSE_NOT_FOUND 404 #define RESPONSE_REQ_TIMEOUT 408 +#define RESPONSE_CONFLICT 409 #define RESPONSE_INT_SERV_ERR 500 #define RESPONSE_NOT_IMPL 501 From 57c06d2b2c696b3f2c6e49f6afa377da10798196 Mon Sep 17 00:00:00 2001 From: armelvil <103518994+armelvil@users.noreply.github.com> Date: Wed, 14 Jan 2026 09:01:47 -0500 Subject: [PATCH 09/10] - add a timed wait function to the writer so that we are more likely to get return messages back out of netcat when a client makes a write request. Without this timer, we don't seem to get proper outputs in terminal sdtout or via netcat. I need to understand this better so that I can switch it back to a full event-based messaging system as appears to be the original intent, as effectively running a poll doesn't seem efficient to me. --- src/hd-rum-translator/hd-rum-translator.cpp | 68 +++++++++++++-------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/src/hd-rum-translator/hd-rum-translator.cpp b/src/hd-rum-translator/hd-rum-translator.cpp index 38d76d690..380508db7 100644 --- a/src/hd-rum-translator/hd-rum-translator.cpp +++ b/src/hd-rum-translator/hd-rum-translator.cpp @@ -333,8 +333,18 @@ static int create_output_port(struct hd_rum_translator_state *s, { struct replica *rep; try { - rep = new replica(addr, rx_port, tx_port, bufsize, &s->mod, + // Process the address string to handle IPv6 brackets + char *processed_addr = strdup(addr); + if (processed_addr[0] == '[' && processed_addr[strlen(processed_addr) - 1] == ']') { + processed_addr[0] = '\0'; + memmove(processed_addr, processed_addr + 1, strlen(processed_addr)); + processed_addr[strlen(processed_addr) - 1] = '\0'; + } + + rep = new replica(processed_addr, rx_port, tx_port, bufsize, &s->mod, common->force_ip_version); + free(processed_addr); + if(use_server_sock){ rep->sock = s->server_socket; } @@ -362,7 +372,8 @@ static int create_output_port(struct hd_rum_translator_state *s, } assert((unsigned) idx == s->replicas.size() - 1); - recompress_port_set_active(s->recompress, idx, compression != nullptr); + recompress_port_set_active(s->recompress, idx, + rep->type == replica::type_t::RECOMPRESS); return idx; } @@ -407,10 +418,10 @@ static void *writer(void *arg) int i = 0; // Check for IP address match first for (auto r : s->replicas) { - // Ensure replica and its IP address are valid before comparing - if (!r->ip_address.empty() && r->ip_address == port_spec) { - index = i; - break; + // Ensure replica and its IP address are valid before comparing + if (!r->ip_address.empty() && r->ip_address == port_spec) { + index = i; + break; } i++; } @@ -434,13 +445,19 @@ static void *writer(void *arg) recompress_remove_port(s->recompress, index); delete s->replicas[index]; s->replicas.erase(s->replicas.begin() + index); - log_msg(LOG_LEVEL_NOTICE, "Deleted output port %d.\n", index); + + char buffer[256]; + snprintf(buffer, sizeof(buffer), "Deleted output port %d.\n", index); + log_msg(LOG_LEVEL_NOTICE, "%s", buffer); + r = new_response(RESPONSE_OK, buffer); + } else { + r = new_response(RESPONSE_NOT_FOUND, "Port not found"); } - r = new_response(RESPONSE_OK, NULL); + } else if (strncasecmp(msg->text, "list-ports", strlen("list-ports")) == 0 || strncasecmp(msg->text, "query-ports", strlen("query-ports")) == 0) { // List all current root ports and their IP addresses - string port_list = "\nRoot ports:\n"; + string port_list = "\n"; if (s->replicas.empty()) { port_list += " No ports configured.\n"; } else { @@ -455,8 +472,10 @@ static void *writer(void *arg) port_list += port_info; // FIXED: was port_list += port_list } } - log_msg(LOG_LEVEL_NOTICE, "Ports: %s\n", port_list.c_str()); - r = new_response(RESPONSE_OK, port_list.c_str()); + char buffer[2048]; + snprintf(buffer, sizeof(buffer), "Ports: %s\n", port_list.c_str()); + log_msg(LOG_LEVEL_NOTICE, "%s", buffer); + r = new_response(RESPONSE_OK, buffer); } else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) { // format of parameters is either: // : [] @@ -515,19 +534,21 @@ static void *writer(void *arg) continue; } - if(compress) - log_msg(LOG_LEVEL_NOTICE, "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_port_ssrc(s->recompress, idx)); - else - log_msg(LOG_LEVEL_NOTICE, "Created new forwarding output port %s:%d.\n", host, tx_port); - - r = new_response(RESPONSE_OK, NULL); - } else { - r = new_response(RESPONSE_BAD_REQUEST, NULL); - } + if(compress) { + char buffer[256]; + snprintf(buffer, sizeof(buffer), "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_port_ssrc(s->recompress, idx)); + log_msg(LOG_LEVEL_NOTICE, "%s", buffer); + r = new_response(RESPONSE_OK, buffer); + } else { + char buffer[256]; + snprintf(buffer, sizeof(buffer), "Created new forwarding output port %s:%d.\n", host, tx_port); + log_msg(LOG_LEVEL_NOTICE, "%s", buffer); + r = new_response(RESPONSE_OK, buffer); + } free_message((struct message *) msg, r); } - + } // then process incoming packets while (s->qhead != s->qtail) { if(s->qhead->size == 0) { // poisoned pill @@ -593,14 +614,13 @@ static void *writer(void *arg) struct timespec timeout; struct timeval now; gettimeofday(&now, NULL); - timeout.tv_sec = now.tv_sec + 1; // Wake up every 1 second - timeout.tv_nsec = now.tv_usec * 1000; + timeout.tv_sec = now.tv_sec + 0; + timeout.tv_nsec = now.tv_usec * 1000 + 500000; pthread_cond_timedwait(&s->qempty_cond, &s->qempty_mtx, &timeout); } s->qempty = 1; pthread_mutex_unlock(&s->qempty_mtx); } - return NULL; } From 2208fc444a46b40b224e2564c6b0c377f1a33741 Mon Sep 17 00:00:00 2001 From: armelvil <103518994+armelvil@users.noreply.github.com> Date: Wed, 14 Jan 2026 12:09:08 -0500 Subject: [PATCH 10/10] - writer now works with a callback function so we can dispense with polling - ensured error messages are properly terminated with newline - moved messaging from end of writer function to complete;continue; from within each else condition - init char buffer for messages at start of else conditions - increase buffer size of control socket as we might be handling a list of clients now --- src/control_socket.cpp | 2 +- src/hd-rum-translator/hd-rum-translator.cpp | 60 +++++++++++++-------- 2 files changed, 38 insertions(+), 24 deletions(-) diff --git a/src/control_socket.cpp b/src/control_socket.cpp index 11905ceef..3e0507b34 100644 --- a/src/control_socket.cpp +++ b/src/control_socket.cpp @@ -88,7 +88,7 @@ using namespace std; struct client { fd_t fd; - char buff[1024]; + char buff[2048]; int buff_len; struct client *prev; diff --git a/src/hd-rum-translator/hd-rum-translator.cpp b/src/hd-rum-translator/hd-rum-translator.cpp index 380508db7..cf7c454ef 100644 --- a/src/hd-rum-translator/hd-rum-translator.cpp +++ b/src/hd-rum-translator/hd-rum-translator.cpp @@ -134,7 +134,7 @@ struct replica { struct module mod; uint32_t magic; string host; - string ip_address; // Add this field for IP-based identification + string ip_address; int m_tx_port; enum type_t { @@ -148,9 +148,13 @@ struct replica { socklen_t sockaddr_len; }; +void writer_new_message_callback(struct module *m); + struct hd_rum_translator_state { hd_rum_translator_state() { init_root_module(&mod); + mod.priv_data = this; + mod.new_message = writer_new_message_callback; pthread_mutex_init(&qempty_mtx, NULL); pthread_mutex_init(&qfull_mtx, NULL); pthread_cond_init(&qempty_cond, NULL); @@ -378,6 +382,18 @@ static int create_output_port(struct hd_rum_translator_state *s, return idx; } +void writer_new_message_callback(struct module *m) { + // add callback function + struct hd_rum_translator_state *s = (struct hd_rum_translator_state *) m->priv_data; + if (s) { + log_msg(LOG_LEVEL_DEBUG, "Message callback triggered, waking up writer thread\n"); + // Wake up the writer thread when a new message arrives + pthread_mutex_lock(&s->qempty_mtx); + pthread_cond_signal(&s->qempty_cond); + pthread_mutex_unlock(&s->qempty_mtx); + } +} + static void *writer(void *arg) { struct hd_rum_translator_state *s = @@ -397,6 +413,7 @@ static void *writer(void *arg) while ((msg = (struct msg_universal *) check_message(&s->mod))) { struct response *r = NULL; if (strncasecmp(msg->text, "delete-port ", strlen("delete-port ")) == 0) { + char buffer[2048]; char *port_spec = msg->text + strlen("delete-port "); int index = -1; bool is_all_digits = true; @@ -412,6 +429,7 @@ static void *writer(void *arg) index = i; } else { log_msg(LOG_LEVEL_WARNING, "Invalid port index: %d. Not removing.\n", i); + snprintf(buffer, sizeof(buffer), "Invalid port index: %d. Not removing.\n", i); } } else { // It's not all digits, so treat as IP address or name @@ -439,23 +457,26 @@ static void *writer(void *arg) // Log if neither IP address or name matches if (index == -1) { log_msg(LOG_LEVEL_WARNING, "Unknown port (IP or name): %s. Not removing.\n", port_spec); + snprintf(buffer, sizeof(buffer), "Unknown port (IP or name): %s. Not removing.\n", port_spec); } } if (index >= 0) { recompress_remove_port(s->recompress, index); delete s->replicas[index]; s->replicas.erase(s->replicas.begin() + index); - - char buffer[256]; snprintf(buffer, sizeof(buffer), "Deleted output port %d.\n", index); log_msg(LOG_LEVEL_NOTICE, "%s", buffer); r = new_response(RESPONSE_OK, buffer); } else { r = new_response(RESPONSE_NOT_FOUND, "Port not found"); } - + log_msg(LOG_LEVEL_NOTICE, "%s", buffer); + r = new_response(RESPONSE_OK, buffer); + free_message((struct message *) msg, r); + continue; } else if (strncasecmp(msg->text, "list-ports", strlen("list-ports")) == 0 || strncasecmp(msg->text, "query-ports", strlen("query-ports")) == 0) { + char buffer[2048]; // List all current root ports and their IP addresses string port_list = "\n"; if (s->replicas.empty()) { @@ -472,15 +493,17 @@ static void *writer(void *arg) port_list += port_info; // FIXED: was port_list += port_list } } - char buffer[2048]; snprintf(buffer, sizeof(buffer), "Ports: %s\n", port_list.c_str()); log_msg(LOG_LEVEL_NOTICE, "%s", buffer); r = new_response(RESPONSE_OK, buffer); + free_message((struct message *) msg, r); + continue; } else if (strncasecmp(msg->text, "create-port", strlen("create-port")) == 0) { // format of parameters is either: // : [] // or (for compat with older CoUniverse version) // [] + char buffer[2048]; char *host_port, *port_str = NULL, *save_ptr; char *host; int tx_port; @@ -501,7 +524,7 @@ static void *writer(void *arg) host[strlen(host) - 1] = '\0'; } } else { - const char *err_msg = "wrong format"; + const char *err_msg = "wrong format\n"; log_msg(LOG_LEVEL_ERROR, "%s\n", err_msg); free_message((struct message *) msg, new_response(RESPONSE_BAD_REQUEST, err_msg)); continue; @@ -519,7 +542,7 @@ static void *writer(void *arg) if (exists) { log_msg(LOG_LEVEL_ERROR, "Output port %s:%d already exists.\n", host, tx_port); - r = new_response(RESPONSE_CONFLICT, "Port already exists"); + r = new_response(RESPONSE_CONFLICT, "Port already exists\n"); free_message((struct message *) msg, r); continue; } @@ -530,23 +553,19 @@ static void *writer(void *arg) compress, nullptr, RATE_UNLIMITED, s->server_socket != nullptr); if(idx < 0) { - r = new_response(RESPONSE_INT_SERV_ERR, "Cannot create output port."); + r = new_response(RESPONSE_INT_SERV_ERR, "Cannot create output port.\n"); continue; } if(compress) { - char buffer[256]; snprintf(buffer, sizeof(buffer), "Created new transcoding output port %s:%d:0x%08" PRIx32 ".\n", host, tx_port, recompress_get_port_ssrc(s->recompress, idx)); - log_msg(LOG_LEVEL_NOTICE, "%s", buffer); - r = new_response(RESPONSE_OK, buffer); } else { - char buffer[256]; snprintf(buffer, sizeof(buffer), "Created new forwarding output port %s:%d.\n", host, tx_port); - log_msg(LOG_LEVEL_NOTICE, "%s", buffer); - r = new_response(RESPONSE_OK, buffer); } - - free_message((struct message *) msg, r); + log_msg(LOG_LEVEL_NOTICE, "%s", buffer); + r = new_response(RESPONSE_OK, buffer); + free_message((struct message *) msg, r); + continue; } } // then process incoming packets @@ -610,13 +629,8 @@ static void *writer(void *arg) } pthread_mutex_lock(&s->qempty_mtx); if (s->qempty) { - // Use timed wait instead of indefinite wait so we can process messages periodically - struct timespec timeout; - struct timeval now; - gettimeofday(&now, NULL); - timeout.tv_sec = now.tv_sec + 0; - timeout.tv_nsec = now.tv_usec * 1000 + 500000; - pthread_cond_timedwait(&s->qempty_cond, &s->qempty_mtx, &timeout); + // Wait indefinitely - we'll be woken up by new packets or messages + pthread_cond_wait(&s->qempty_cond, &s->qempty_mtx); } s->qempty = 1; pthread_mutex_unlock(&s->qempty_mtx);