From bf8d4526b925277b9d28d5309ad1be2008f575b1 Mon Sep 17 00:00:00 2001 From: JamBox <8935453+JamesVanBoxtel@users.noreply.github.com> Date: Sat, 8 Nov 2025 20:23:43 -0800 Subject: [PATCH 1/5] Fix Connection and TCPClient bugs with partial send When socket sends data and it only partially sends, we get an error and the amount of data that was sent. We are only supposed to send the rest of the data next time the socket is free but we were sending the whole thing in the Connection server case, and dropping it on the floor in the client case. Note that we didn't have this problem on the receiving side. Added tests to prove the problems, then fixed them. Also note I could refactor this more, but didn't want to introduce more risk. --- client/src/network/TcpClient.lua | 54 ++- server/Connection.lua | 37 ++- server/tests/RealSocketPartialSendTest.lua | 367 +++++++++++++++++++++ testLauncher.lua | 1 + 4 files changed, 435 insertions(+), 24 deletions(-) create mode 100644 server/tests/RealSocketPartialSendTest.lua diff --git a/client/src/network/TcpClient.lua b/client/src/network/TcpClient.lua index a1e7f7ed..904432e1 100644 --- a/client/src/network/TcpClient.lua +++ b/client/src/network/TcpClient.lua @@ -9,6 +9,7 @@ require("client.src.TimeQueue") local class = require("common.lib.class") local Request = require("client.src.network.Request") local ServerMessages = require("client.src.network.ServerMessages") +local Queue = require("common.lib.Queue") ---@class TcpSocket @@ -22,12 +23,18 @@ local ServerMessages = require("client.src.network.ServerMessages") ---@field ip string ---@field port integer ---@field socket TcpSocket +---@field outgoingMessageQueue Queue +---@field sendRetryCount integer +---@field sendRetryLimit integer local TcpClient = class(function(tcpClient) -- holds data fragments tcpClient.data = "" --connectionUptime counts "E" messages, not seconds tcpClient.connectionUptime = 0 tcpClient.receivedMessageQueue = ServerQueue() + tcpClient.outgoingMessageQueue = Queue() + tcpClient.sendRetryCount = 0 + tcpClient.sendRetryLimit = 5 tcpClient.delayedProcessing = false math.randomseed(os.time()) for i = 1, 4 do @@ -97,18 +104,46 @@ function TcpClient:resetNetwork() self.socket:close() end self.socket = nil + self.outgoingMessageQueue:clear() end -function TcpClient:sendMessage(stringData) - if self:isConnected() then - local fullMessageSent, error, partialBytesSent = self.socket:send(stringData) +---@return boolean # if the connection is still considered open +function TcpClient:sendQueuedMessages() + while self.outgoingMessageQueue:len() > 0 do + local message = self.outgoingMessageQueue:peek() + local fullMessageSent, error, partialBytesSent = self.socket:send(message) if fullMessageSent then - --logger.trace("json bytes sent in one go: " .. tostring(fullMessageSent)) - return true - else - logger.error("Error sending network message: " .. (error or "") .. " only sent " .. (partialBytesSent or "0") .. "bytes") + self.outgoingMessageQueue:pop() + if self.sendRetryCount > 0 then + logger.debug("TcpClient retry succeeded after " .. self.sendRetryCount) + self.sendRetryCount = 0 + end + elseif error == "closed" then return false + elseif error == "timeout" and partialBytesSent and partialBytesSent > 0 then + local remaining = message:sub(partialBytesSent + 1) + self.outgoingMessageQueue[self.outgoingMessageQueue.first] = remaining + logger.trace("TcpClient partial send: " .. partialBytesSent .. "/" .. #message .. " bytes sent. " .. #remaining .. " bytes remain in queue.") + break + else + self.sendRetryCount = self.sendRetryCount + 1 + logger.trace("TcpClient send timeout with no progress (retry " .. self.sendRetryCount .. "/" .. self.sendRetryLimit .. ")") + break end + end + + if self.sendRetryCount >= self.sendRetryLimit then + logger.info("TcpClient send failed after " .. self.sendRetryLimit .. " retries were attempted") + return false + end + + return true +end + +function TcpClient:sendMessage(stringData) + if self:isConnected() then + self.outgoingMessageQueue:push(stringData) + return self:sendQueuedMessages() else return false end @@ -129,6 +164,11 @@ function TcpClient:updateNetwork(dt) self:queueMessage(data[1], data[2]) data = self.receiveNetworkQueue:popIfReady() end + else + -- In non-delayed mode, try to send any queued messages + if self:isConnected() then + self:sendQueuedMessages() + end end end diff --git a/server/Connection.lua b/server/Connection.lua index 3e3f96f5..0ab8e7ba 100644 --- a/server/Connection.lua +++ b/server/Connection.lua @@ -122,27 +122,30 @@ end ---@param connection Connection ---@return boolean # if the connection is still considered open local function sendQueuedMessages(connection) - for i = connection.outgoingMessageQueue.first, connection.outgoingMessageQueue.last do - local message = connection.outgoingMessageQueue[i] - local success, error = connection.socket:send(message) - if not success then - if error == "closed" then - return false - else - connection.sendRetryCount = connection.sendRetryCount + 1 - break + while connection.outgoingMessageQueue:len() > 0 do + local message = connection.outgoingMessageQueue:peek() + local fullMessageSent, error, partialBytesSent = connection.socket:send(message) + if fullMessageSent then + connection.outgoingMessageQueue:pop() + if connection.sendRetryCount > 0 then + logger.debug(connection.index .. " Retry succeeded after " .. connection.sendRetryCount) + connection.sendRetryCount = 0 end - elseif connection.sendRetryCount > 0 then - logger.debug(connection.index .. " Retry succeeded after " .. connection.sendRetryCount) - connection.sendRetryCount = 0 + elseif error == "closed" then + return false + elseif error == "timeout" and partialBytesSent and partialBytesSent > 0 then + local remaining = message:sub(partialBytesSent + 1) + connection.outgoingMessageQueue[connection.outgoingMessageQueue.first] = remaining + logger.trace("Partial send: " .. partialBytesSent .. "/" .. #message .. " bytes sent. " .. #remaining .. " bytes remain in queue.") + break + else + connection.sendRetryCount = connection.sendRetryCount + 1 + logger.trace("Send timeout with no progress (retry " .. connection.sendRetryCount .. "/" .. connection.sendRetryLimit .. ")") + break end end - if connection.sendRetryCount == 0 then - if connection.outgoingMessageQueue:len() > 0 then - connection.outgoingMessageQueue:clear() - end - elseif connection.sendRetryCount >= connection.sendRetryLimit then + if connection.sendRetryCount >= connection.sendRetryLimit then logger.info("Closing connection " .. connection.index .. ". Connection.send failed after " .. connection.sendRetryLimit .. " retries were attempted") return false end diff --git a/server/tests/RealSocketPartialSendTest.lua b/server/tests/RealSocketPartialSendTest.lua new file mode 100644 index 00000000..909c1a7c --- /dev/null +++ b/server/tests/RealSocketPartialSendTest.lua @@ -0,0 +1,367 @@ +-- Test using real sockets to demonstrate partial send bugs in both Connection and TcpClient are fixed +-- This test creates actual TCP server and client connections on localhost +-- +-- When a partial send occurs on a socket the partialBytesSent were LOST - not tracked, not queued, not retried +-- ## TEST STRATEGY ## +-- To reproduce these bugs: +-- 1. Fill TCP buffer by sending large amounts of data +-- 2. Read SLOWLY on receiving side to create backpressure +-- 3. Trigger a partial send (socket:send returns nil, "timeout", X where X > 0) +-- 4. Verify data is not corrupted or lost on the receiving side +-- + +json = require("common.lib.dkjson") +local Connection = require("server.Connection") +require("client.src.server_queue") +local TcpClient = require("client.src.network.TcpClient") +local NetworkProtocol = require("common.network.NetworkProtocol") +local socket = require("socket") +local logger = require("common.lib.logger") + +local SEND_ATTEMPTS_MAX = 100 +local READ_TIMEOUT = 9 -- 10 seconds will make connection fail with no ping +local SLOW_READ_SIZE = 100 -- bytes per read +local SLEEP_INTERVAL = 0.0001 -- seconds +local FAST_READ_SLEEP = 0.0001 -- seconds +local RETRY_MAX_ATTEMPTS = 50000 +local MESSAGE_COUNT = 50 +local LARGE_MESSAGE_PAD_COUNT = 100 + +-- Helper to create a TCP server on localhost +local function createLocalServer() + local server = socket.tcp() + server:bind("127.0.0.1", 0) -- Bind to any available port + server:listen(1) + server:settimeout(5) -- 5 second timeout for accept + + local ip, port = server:getsockname() + logger.trace("Server listening on " .. ip .. ":" .. port) + + return server, ip, port +end + +-- Helper to create a TcpClient and connect to server +local function createTcpClient(ip, port) + local tcpClient = TcpClient() + local success = tcpClient:connectToServer(ip, port) + if not success then + error("Failed to connect TcpClient to " .. ip .. ":" .. port) + end + logger.trace("TcpClient connected to " .. ip .. ":" .. port) + return tcpClient +end + +-- Helper to create large message with padding data +local function createLargeMessage() + local paddingData = {} + for i = 1, LARGE_MESSAGE_PAD_COUNT do + paddingData[i] = { + index = i, + data = "This is padding data to make the message larger and increase the chance of partial sends. " .. + "We need to fill the TCP buffer quickly with fewer messages, so each message needs to be substantial. " .. + "Adding more text here to ensure we reach the buffer limits and trigger partial send scenarios. " .. + "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore." + } + end + return paddingData +end + +-- Helper to process and validate JSON messages from a data buffer +-- Returns messageCount, corruptedCount +local function processMessages(dataBuffer, messageTypePrefix) + local messageCount = 0 + local corruptedCount = 0 + + while true do + local msgType, message, remaining = NetworkProtocol.getMessageFromString(dataBuffer, true) + if not msgType then + break -- No more complete messages + end + + -- Only process JSON messages + if msgType == messageTypePrefix then + messageCount = messageCount + 1 + + local decoded = json.decode(message) + + if not decoded then + corruptedCount = corruptedCount + 1 + logger.error(" Message #" .. messageCount .. " - JSON decode returned nil!") + logger.error(" Raw data preview: " .. message:sub(1, 100):gsub("\n", "\\n")) + break + end + end + + dataBuffer = remaining + end + + return messageCount, corruptedCount, dataBuffer +end + +-- Test the partial send bug with real sockets (Connection server->client) +local function testRealSocketPartialSend() + logger.info("Running testRealSocketPartialSend test") + + -- Setup: Create server and establish connection + local serverSocket, ip, port = createLocalServer() + local tcpClient = createTcpClient(ip, port) + local serverClientSocket, err = serverSocket:accept() + if not serverClientSocket then + error("Failed to accept connection: " .. (err or "unknown error")) + end + serverClientSocket:settimeout(0) + local connection = Connection(serverClientSocket, 1) + + -- Create and queue large messages + local paddingData = createLargeMessage() + local largeMessage = { + messageType = NetworkProtocol.serverMessageTypes.jsonMessage, + messageText = { + type = "settingsUpdate", + content = { + padding = paddingData + }, + sender = "player", + senderId = 1 + } + } + + for _ = 1, MESSAGE_COUNT do + connection:sendJson(largeMessage) + end + + -- Phase 1: Send while reading SLOWLY to trigger partial send + local sendAttempts = 0 + local partialSendDetected = false + + while sendAttempts < SEND_ATTEMPTS_MAX and not partialSendDetected do + sendAttempts = sendAttempts + 1 + + local success = connection:update(os.time(), false, true) + if not success then + assert(false, "couldn't keep connection open for test") + break + end + + -- Read slowly to create backpressure + if tcpClient.socket then + local chunk, _, partial = tcpClient.socket:receive(SLOW_READ_SIZE) + if chunk or partial then + tcpClient.data = tcpClient.data .. (chunk or partial) + end + end + + if connection.sendRetryCount and connection.sendRetryCount > 0 then + logger.trace("✓ Partial send detected! (sendRetryCount = " .. connection.sendRetryCount .. ")") + logger.trace(" Messages still in queue: " .. connection.outgoingMessageQueue:len()) + logger.trace(" Client has buffered: " .. #tcpClient.data .. " bytes so far") + partialSendDetected = true + break + end + + socket.sleep(SLEEP_INTERVAL) + end + assert(partialSendDetected) + + -- Phase 2: Read fast to allow retry to succeed + local retryAttempts = 0 + local resendSucceeded = false + + while retryAttempts < RETRY_MAX_ATTEMPTS do + retryAttempts = retryAttempts + 1 + local sendRetryCountBefore = connection.sendRetryCount + + connection:update(os.time(), false, true) + tcpClient:readSocket() + + if sendRetryCountBefore > 0 and connection.sendRetryCount == 0 then + resendSucceeded = true + break + end + + socket.sleep(SLEEP_INTERVAL) + end + assert(resendSucceeded) + + logger.trace("Retry phase completed. sendRetryCount = " .. (connection.sendRetryCount or 0)) + logger.trace("Client buffer size: " .. #tcpClient.data .. " bytes\n") + + -- Phase 3: Read remaining data and validate messages + local startTime = socket.gettime() + local messageCount = 0 + local corruptedCount = 0 + + while socket.gettime() - startTime < READ_TIMEOUT and corruptedCount == 0 do + connection:update(os.time(), false, true) + + if not tcpClient:readSocket() then + assert(false, "couldn't keep connection open for test") + end + + local newMessages, newCorrupted + newMessages, newCorrupted, tcpClient.data = processMessages( + tcpClient.data, + NetworkProtocol.serverMessageTypes.jsonMessage.prefix + ) + messageCount = messageCount + newMessages + corruptedCount = corruptedCount + newCorrupted + + if messageCount == MESSAGE_COUNT or corruptedCount > 0 then + logger.info("Ending early") + break + end + + socket.sleep(FAST_READ_SLEEP) + end + + assert(messageCount > 0 and corruptedCount == 0) + logger.trace("Total messages received: " .. messageCount) + logger.trace("Successfully decoded: " .. (messageCount - corruptedCount)) + logger.trace("Corrupted messages: " .. corruptedCount) + logger.trace("TcpClient buffer remaining: " .. #tcpClient.data .. " bytes") + + -- Cleanup + connection:close() + tcpClient:resetNetwork() + serverSocket:close() +end + +-- Test the TcpClient partial send bug (TcpClient client->server) +local function testTcpClientPartialSend() + logger.info("Running testTcpClientPartialSend test") + + -- Setup: Create server and establish connection + local serverSocket, ip, port = createLocalServer() + local tcpClient = createTcpClient(ip, port) + local serverClientSocket, err = serverSocket:accept() + if not serverClientSocket then + error("Failed to accept connection: " .. (err or "unknown error")) + end + serverClientSocket:settimeout(0) + + -- Create large message to send from client to server + local paddingData = createLargeMessage() + local largeMessage = { + type = "chatMessage", + content = { + padding = paddingData + }, + sender = "player", + senderId = 1 + } + local messageString = NetworkProtocol.markedMessageForTypeAndBody( + NetworkProtocol.clientMessageTypes.jsonMessage.prefix, + json.encode(largeMessage) + ) + + -- Phase 1: Send while reading SLOWLY to trigger partial send + local sendAttempts = 0 + local partialSendDetected = false + local serverReceivedData = "" + + while sendAttempts < SEND_ATTEMPTS_MAX and not partialSendDetected do + sendAttempts = sendAttempts + 1 + + local success = tcpClient:sendMessage(messageString) + if not success then + assert(false, "couldn't keep connection open for test") + break + end + + -- Read slowly to create backpressure + local chunk, _, partial = serverClientSocket:receive(SLOW_READ_SIZE) + if chunk or partial then + serverReceivedData = serverReceivedData .. (chunk or partial) + end + + if tcpClient.sendRetryCount and tcpClient.sendRetryCount > 0 then + logger.trace("✓ Partial send detected! (sendRetryCount = " .. tcpClient.sendRetryCount .. ")") + logger.trace(" Messages still in queue: " .. tcpClient.outgoingMessageQueue:len()) + logger.trace(" Server has buffered: " .. #serverReceivedData .. " bytes so far") + partialSendDetected = true + break + end + + socket.sleep(SLEEP_INTERVAL) + end + assert(partialSendDetected) + + -- Phase 2: Read fast to allow retry to succeed + local retryAttempts = 0 + local resendSucceeded = false + + while retryAttempts < RETRY_MAX_ATTEMPTS do + retryAttempts = retryAttempts + 1 + local sendRetryCountBefore = tcpClient.sendRetryCount + + tcpClient:updateNetwork(0) + + -- Read from server socket + local chunk, error, partial = serverClientSocket:receive("*a") + if error == "timeout" then + chunk = partial + end + if chunk and #chunk > 0 then + serverReceivedData = serverReceivedData .. chunk + end + + if sendRetryCountBefore > 0 and tcpClient.sendRetryCount == 0 then + resendSucceeded = true + break + end + + socket.sleep(SLEEP_INTERVAL) + end + assert(resendSucceeded) + + logger.trace("Retry phase completed. sendRetryCount = " .. (tcpClient.sendRetryCount or 0)) + logger.trace("Server buffer size: " .. #serverReceivedData .. " bytes\n") + + -- Phase 3: Read remaining data and validate messages + local startTime = socket.gettime() + local messageCount = 0 + local corruptedCount = 0 + + while socket.gettime() - startTime < READ_TIMEOUT and corruptedCount == 0 do + tcpClient:updateNetwork(0) + -- Read from server socket + local chunk, error, partial = serverClientSocket:receive("*a") + + if error == "timeout" then + chunk = partial + end + + if chunk and #chunk > 0 then + serverReceivedData = serverReceivedData .. chunk + end + + local newMessages, newCorrupted, updatedBuffer + newMessages, newCorrupted, updatedBuffer = processMessages( + serverReceivedData, + NetworkProtocol.clientMessageTypes.jsonMessage.prefix + ) + serverReceivedData = updatedBuffer or serverReceivedData + messageCount = messageCount + newMessages + corruptedCount = corruptedCount + newCorrupted + + if messageCount == sendAttempts or corruptedCount > 0 then + logger.info("Ending early") + break + end + + socket.sleep(FAST_READ_SLEEP) + end + + assert(messageCount > 0 and corruptedCount == 0) + logger.info("Total messages received: " .. messageCount) + logger.info("Corrupted messages: " .. corruptedCount) + + -- Cleanup + tcpClient:resetNetwork() + serverClientSocket:close() + serverSocket:close() +end + +-- Run the tests +testRealSocketPartialSend() +testTcpClientPartialSend() diff --git a/testLauncher.lua b/testLauncher.lua index 53d23c1c..b38496d9 100644 --- a/testLauncher.lua +++ b/testLauncher.lua @@ -81,6 +81,7 @@ local allTests = { "server.tests.LeaderboardTests", "server.tests.RoomTests", "server.tests.LoginTests", + "server.tests.RealSocketPartialSendTest", "client.tests.FileUtilsTests", "client.tests.ModControllerTests", "client.tests.QueueTests", From 2f51aece6819da6e62a5a8f7f0f440b280237364 Mon Sep 17 00:00:00 2001 From: JamBox <8935453+JamesVanBoxtel@users.noreply.github.com> Date: Sat, 8 Nov 2025 20:39:10 -0800 Subject: [PATCH 2/5] Try to get tests to work on server --- server/tests/RealSocketPartialSendTest.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/tests/RealSocketPartialSendTest.lua b/server/tests/RealSocketPartialSendTest.lua index 909c1a7c..826f6a90 100644 --- a/server/tests/RealSocketPartialSendTest.lua +++ b/server/tests/RealSocketPartialSendTest.lua @@ -18,7 +18,7 @@ local NetworkProtocol = require("common.network.NetworkProtocol") local socket = require("socket") local logger = require("common.lib.logger") -local SEND_ATTEMPTS_MAX = 100 +local SEND_ATTEMPTS_MAX = 10000 local READ_TIMEOUT = 9 -- 10 seconds will make connection fail with no ping local SLOW_READ_SIZE = 100 -- bytes per read local SLEEP_INTERVAL = 0.0001 -- seconds From cc3d48d4cfd7a2ab65450b25e829bd2adde02394 Mon Sep 17 00:00:00 2001 From: JamBox <8935453+JamesVanBoxtel@users.noreply.github.com> Date: Sat, 8 Nov 2025 21:04:56 -0800 Subject: [PATCH 3/5] Try to get tests to work on server --- server/tests/RealSocketPartialSendTest.lua | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/tests/RealSocketPartialSendTest.lua b/server/tests/RealSocketPartialSendTest.lua index 826f6a90..c8e508e2 100644 --- a/server/tests/RealSocketPartialSendTest.lua +++ b/server/tests/RealSocketPartialSendTest.lua @@ -20,12 +20,12 @@ local logger = require("common.lib.logger") local SEND_ATTEMPTS_MAX = 10000 local READ_TIMEOUT = 9 -- 10 seconds will make connection fail with no ping -local SLOW_READ_SIZE = 100 -- bytes per read +local SLOW_READ_SIZE = 1 -- bytes per read local SLEEP_INTERVAL = 0.0001 -- seconds local FAST_READ_SLEEP = 0.0001 -- seconds local RETRY_MAX_ATTEMPTS = 50000 -local MESSAGE_COUNT = 50 -local LARGE_MESSAGE_PAD_COUNT = 100 +local MESSAGE_COUNT = 500 +local LARGE_MESSAGE_PAD_COUNT = 1000 -- Helper to create a TCP server on localhost local function createLocalServer() From 40b055ea7f0224e42eeffb07391917050be83627 Mon Sep 17 00:00:00 2001 From: JamBox <8935453+JamesVanBoxtel@users.noreply.github.com> Date: Sat, 8 Nov 2025 21:06:53 -0800 Subject: [PATCH 4/5] Try to get tests to work on server --- server/tests/RealSocketPartialSendTest.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/tests/RealSocketPartialSendTest.lua b/server/tests/RealSocketPartialSendTest.lua index c8e508e2..0f7f5f78 100644 --- a/server/tests/RealSocketPartialSendTest.lua +++ b/server/tests/RealSocketPartialSendTest.lua @@ -24,8 +24,8 @@ local SLOW_READ_SIZE = 1 -- bytes per read local SLEEP_INTERVAL = 0.0001 -- seconds local FAST_READ_SLEEP = 0.0001 -- seconds local RETRY_MAX_ATTEMPTS = 50000 -local MESSAGE_COUNT = 500 -local LARGE_MESSAGE_PAD_COUNT = 1000 +local MESSAGE_COUNT = 50 +local LARGE_MESSAGE_PAD_COUNT = 100 -- Helper to create a TCP server on localhost local function createLocalServer() From 90b182dc834b12f5e4b050a66eea15e9da5036de Mon Sep 17 00:00:00 2001 From: JamBox <8935453+JamesVanBoxtel@users.noreply.github.com> Date: Sat, 8 Nov 2025 21:08:18 -0800 Subject: [PATCH 5/5] Try to get tests to work on server --- server/tests/RealSocketPartialSendTest.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/tests/RealSocketPartialSendTest.lua b/server/tests/RealSocketPartialSendTest.lua index 0f7f5f78..4da1f177 100644 --- a/server/tests/RealSocketPartialSendTest.lua +++ b/server/tests/RealSocketPartialSendTest.lua @@ -25,7 +25,7 @@ local SLEEP_INTERVAL = 0.0001 -- seconds local FAST_READ_SLEEP = 0.0001 -- seconds local RETRY_MAX_ATTEMPTS = 50000 local MESSAGE_COUNT = 50 -local LARGE_MESSAGE_PAD_COUNT = 100 +local LARGE_MESSAGE_PAD_COUNT = 400 -- Helper to create a TCP server on localhost local function createLocalServer()