diff --git a/client/src/network/TcpClient.lua b/client/src/network/TcpClient.lua index a1e7f7ed..904432e1 100644 --- a/client/src/network/TcpClient.lua +++ b/client/src/network/TcpClient.lua @@ -9,6 +9,7 @@ require("client.src.TimeQueue") local class = require("common.lib.class") local Request = require("client.src.network.Request") local ServerMessages = require("client.src.network.ServerMessages") +local Queue = require("common.lib.Queue") ---@class TcpSocket @@ -22,12 +23,18 @@ local ServerMessages = require("client.src.network.ServerMessages") ---@field ip string ---@field port integer ---@field socket TcpSocket +---@field outgoingMessageQueue Queue +---@field sendRetryCount integer +---@field sendRetryLimit integer local TcpClient = class(function(tcpClient) -- holds data fragments tcpClient.data = "" --connectionUptime counts "E" messages, not seconds tcpClient.connectionUptime = 0 tcpClient.receivedMessageQueue = ServerQueue() + tcpClient.outgoingMessageQueue = Queue() + tcpClient.sendRetryCount = 0 + tcpClient.sendRetryLimit = 5 tcpClient.delayedProcessing = false math.randomseed(os.time()) for i = 1, 4 do @@ -97,18 +104,46 @@ function TcpClient:resetNetwork() self.socket:close() end self.socket = nil + self.outgoingMessageQueue:clear() end -function TcpClient:sendMessage(stringData) - if self:isConnected() then - local fullMessageSent, error, partialBytesSent = self.socket:send(stringData) +---@return boolean # if the connection is still considered open +function TcpClient:sendQueuedMessages() + while self.outgoingMessageQueue:len() > 0 do + local message = self.outgoingMessageQueue:peek() + local fullMessageSent, error, partialBytesSent = self.socket:send(message) if fullMessageSent then - --logger.trace("json bytes sent in one go: " .. tostring(fullMessageSent)) - return true - else - logger.error("Error sending network message: " .. (error or "") .. " only sent " .. (partialBytesSent or "0") .. "bytes") + self.outgoingMessageQueue:pop() + if self.sendRetryCount > 0 then + logger.debug("TcpClient retry succeeded after " .. self.sendRetryCount) + self.sendRetryCount = 0 + end + elseif error == "closed" then return false + elseif error == "timeout" and partialBytesSent and partialBytesSent > 0 then + local remaining = message:sub(partialBytesSent + 1) + self.outgoingMessageQueue[self.outgoingMessageQueue.first] = remaining + logger.trace("TcpClient partial send: " .. partialBytesSent .. "/" .. #message .. " bytes sent. " .. #remaining .. " bytes remain in queue.") + break + else + self.sendRetryCount = self.sendRetryCount + 1 + logger.trace("TcpClient send timeout with no progress (retry " .. self.sendRetryCount .. "/" .. self.sendRetryLimit .. ")") + break end + end + + if self.sendRetryCount >= self.sendRetryLimit then + logger.info("TcpClient send failed after " .. self.sendRetryLimit .. " retries were attempted") + return false + end + + return true +end + +function TcpClient:sendMessage(stringData) + if self:isConnected() then + self.outgoingMessageQueue:push(stringData) + return self:sendQueuedMessages() else return false end @@ -129,6 +164,11 @@ function TcpClient:updateNetwork(dt) self:queueMessage(data[1], data[2]) data = self.receiveNetworkQueue:popIfReady() end + else + -- In non-delayed mode, try to send any queued messages + if self:isConnected() then + self:sendQueuedMessages() + end end end diff --git a/server/Connection.lua b/server/Connection.lua index 3e3f96f5..0ab8e7ba 100644 --- a/server/Connection.lua +++ b/server/Connection.lua @@ -122,27 +122,30 @@ end ---@param connection Connection ---@return boolean # if the connection is still considered open local function sendQueuedMessages(connection) - for i = connection.outgoingMessageQueue.first, connection.outgoingMessageQueue.last do - local message = connection.outgoingMessageQueue[i] - local success, error = connection.socket:send(message) - if not success then - if error == "closed" then - return false - else - connection.sendRetryCount = connection.sendRetryCount + 1 - break + while connection.outgoingMessageQueue:len() > 0 do + local message = connection.outgoingMessageQueue:peek() + local fullMessageSent, error, partialBytesSent = connection.socket:send(message) + if fullMessageSent then + connection.outgoingMessageQueue:pop() + if connection.sendRetryCount > 0 then + logger.debug(connection.index .. " Retry succeeded after " .. connection.sendRetryCount) + connection.sendRetryCount = 0 end - elseif connection.sendRetryCount > 0 then - logger.debug(connection.index .. " Retry succeeded after " .. connection.sendRetryCount) - connection.sendRetryCount = 0 + elseif error == "closed" then + return false + elseif error == "timeout" and partialBytesSent and partialBytesSent > 0 then + local remaining = message:sub(partialBytesSent + 1) + connection.outgoingMessageQueue[connection.outgoingMessageQueue.first] = remaining + logger.trace("Partial send: " .. partialBytesSent .. "/" .. #message .. " bytes sent. " .. #remaining .. " bytes remain in queue.") + break + else + connection.sendRetryCount = connection.sendRetryCount + 1 + logger.trace("Send timeout with no progress (retry " .. connection.sendRetryCount .. "/" .. connection.sendRetryLimit .. ")") + break end end - if connection.sendRetryCount == 0 then - if connection.outgoingMessageQueue:len() > 0 then - connection.outgoingMessageQueue:clear() - end - elseif connection.sendRetryCount >= connection.sendRetryLimit then + if connection.sendRetryCount >= connection.sendRetryLimit then logger.info("Closing connection " .. connection.index .. ". Connection.send failed after " .. connection.sendRetryLimit .. " retries were attempted") return false end diff --git a/server/tests/RealSocketPartialSendTest.lua b/server/tests/RealSocketPartialSendTest.lua new file mode 100644 index 00000000..4da1f177 --- /dev/null +++ b/server/tests/RealSocketPartialSendTest.lua @@ -0,0 +1,367 @@ +-- Test using real sockets to demonstrate partial send bugs in both Connection and TcpClient are fixed +-- This test creates actual TCP server and client connections on localhost +-- +-- When a partial send occurs on a socket the partialBytesSent were LOST - not tracked, not queued, not retried +-- ## TEST STRATEGY ## +-- To reproduce these bugs: +-- 1. Fill TCP buffer by sending large amounts of data +-- 2. Read SLOWLY on receiving side to create backpressure +-- 3. Trigger a partial send (socket:send returns nil, "timeout", X where X > 0) +-- 4. Verify data is not corrupted or lost on the receiving side +-- + +json = require("common.lib.dkjson") +local Connection = require("server.Connection") +require("client.src.server_queue") +local TcpClient = require("client.src.network.TcpClient") +local NetworkProtocol = require("common.network.NetworkProtocol") +local socket = require("socket") +local logger = require("common.lib.logger") + +local SEND_ATTEMPTS_MAX = 10000 +local READ_TIMEOUT = 9 -- 10 seconds will make connection fail with no ping +local SLOW_READ_SIZE = 1 -- bytes per read +local SLEEP_INTERVAL = 0.0001 -- seconds +local FAST_READ_SLEEP = 0.0001 -- seconds +local RETRY_MAX_ATTEMPTS = 50000 +local MESSAGE_COUNT = 50 +local LARGE_MESSAGE_PAD_COUNT = 400 + +-- Helper to create a TCP server on localhost +local function createLocalServer() + local server = socket.tcp() + server:bind("127.0.0.1", 0) -- Bind to any available port + server:listen(1) + server:settimeout(5) -- 5 second timeout for accept + + local ip, port = server:getsockname() + logger.trace("Server listening on " .. ip .. ":" .. port) + + return server, ip, port +end + +-- Helper to create a TcpClient and connect to server +local function createTcpClient(ip, port) + local tcpClient = TcpClient() + local success = tcpClient:connectToServer(ip, port) + if not success then + error("Failed to connect TcpClient to " .. ip .. ":" .. port) + end + logger.trace("TcpClient connected to " .. ip .. ":" .. port) + return tcpClient +end + +-- Helper to create large message with padding data +local function createLargeMessage() + local paddingData = {} + for i = 1, LARGE_MESSAGE_PAD_COUNT do + paddingData[i] = { + index = i, + data = "This is padding data to make the message larger and increase the chance of partial sends. " .. + "We need to fill the TCP buffer quickly with fewer messages, so each message needs to be substantial. " .. + "Adding more text here to ensure we reach the buffer limits and trigger partial send scenarios. " .. + "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore." + } + end + return paddingData +end + +-- Helper to process and validate JSON messages from a data buffer +-- Returns messageCount, corruptedCount +local function processMessages(dataBuffer, messageTypePrefix) + local messageCount = 0 + local corruptedCount = 0 + + while true do + local msgType, message, remaining = NetworkProtocol.getMessageFromString(dataBuffer, true) + if not msgType then + break -- No more complete messages + end + + -- Only process JSON messages + if msgType == messageTypePrefix then + messageCount = messageCount + 1 + + local decoded = json.decode(message) + + if not decoded then + corruptedCount = corruptedCount + 1 + logger.error(" Message #" .. messageCount .. " - JSON decode returned nil!") + logger.error(" Raw data preview: " .. message:sub(1, 100):gsub("\n", "\\n")) + break + end + end + + dataBuffer = remaining + end + + return messageCount, corruptedCount, dataBuffer +end + +-- Test the partial send bug with real sockets (Connection server->client) +local function testRealSocketPartialSend() + logger.info("Running testRealSocketPartialSend test") + + -- Setup: Create server and establish connection + local serverSocket, ip, port = createLocalServer() + local tcpClient = createTcpClient(ip, port) + local serverClientSocket, err = serverSocket:accept() + if not serverClientSocket then + error("Failed to accept connection: " .. (err or "unknown error")) + end + serverClientSocket:settimeout(0) + local connection = Connection(serverClientSocket, 1) + + -- Create and queue large messages + local paddingData = createLargeMessage() + local largeMessage = { + messageType = NetworkProtocol.serverMessageTypes.jsonMessage, + messageText = { + type = "settingsUpdate", + content = { + padding = paddingData + }, + sender = "player", + senderId = 1 + } + } + + for _ = 1, MESSAGE_COUNT do + connection:sendJson(largeMessage) + end + + -- Phase 1: Send while reading SLOWLY to trigger partial send + local sendAttempts = 0 + local partialSendDetected = false + + while sendAttempts < SEND_ATTEMPTS_MAX and not partialSendDetected do + sendAttempts = sendAttempts + 1 + + local success = connection:update(os.time(), false, true) + if not success then + assert(false, "couldn't keep connection open for test") + break + end + + -- Read slowly to create backpressure + if tcpClient.socket then + local chunk, _, partial = tcpClient.socket:receive(SLOW_READ_SIZE) + if chunk or partial then + tcpClient.data = tcpClient.data .. (chunk or partial) + end + end + + if connection.sendRetryCount and connection.sendRetryCount > 0 then + logger.trace("✓ Partial send detected! (sendRetryCount = " .. connection.sendRetryCount .. ")") + logger.trace(" Messages still in queue: " .. connection.outgoingMessageQueue:len()) + logger.trace(" Client has buffered: " .. #tcpClient.data .. " bytes so far") + partialSendDetected = true + break + end + + socket.sleep(SLEEP_INTERVAL) + end + assert(partialSendDetected) + + -- Phase 2: Read fast to allow retry to succeed + local retryAttempts = 0 + local resendSucceeded = false + + while retryAttempts < RETRY_MAX_ATTEMPTS do + retryAttempts = retryAttempts + 1 + local sendRetryCountBefore = connection.sendRetryCount + + connection:update(os.time(), false, true) + tcpClient:readSocket() + + if sendRetryCountBefore > 0 and connection.sendRetryCount == 0 then + resendSucceeded = true + break + end + + socket.sleep(SLEEP_INTERVAL) + end + assert(resendSucceeded) + + logger.trace("Retry phase completed. sendRetryCount = " .. (connection.sendRetryCount or 0)) + logger.trace("Client buffer size: " .. #tcpClient.data .. " bytes\n") + + -- Phase 3: Read remaining data and validate messages + local startTime = socket.gettime() + local messageCount = 0 + local corruptedCount = 0 + + while socket.gettime() - startTime < READ_TIMEOUT and corruptedCount == 0 do + connection:update(os.time(), false, true) + + if not tcpClient:readSocket() then + assert(false, "couldn't keep connection open for test") + end + + local newMessages, newCorrupted + newMessages, newCorrupted, tcpClient.data = processMessages( + tcpClient.data, + NetworkProtocol.serverMessageTypes.jsonMessage.prefix + ) + messageCount = messageCount + newMessages + corruptedCount = corruptedCount + newCorrupted + + if messageCount == MESSAGE_COUNT or corruptedCount > 0 then + logger.info("Ending early") + break + end + + socket.sleep(FAST_READ_SLEEP) + end + + assert(messageCount > 0 and corruptedCount == 0) + logger.trace("Total messages received: " .. messageCount) + logger.trace("Successfully decoded: " .. (messageCount - corruptedCount)) + logger.trace("Corrupted messages: " .. corruptedCount) + logger.trace("TcpClient buffer remaining: " .. #tcpClient.data .. " bytes") + + -- Cleanup + connection:close() + tcpClient:resetNetwork() + serverSocket:close() +end + +-- Test the TcpClient partial send bug (TcpClient client->server) +local function testTcpClientPartialSend() + logger.info("Running testTcpClientPartialSend test") + + -- Setup: Create server and establish connection + local serverSocket, ip, port = createLocalServer() + local tcpClient = createTcpClient(ip, port) + local serverClientSocket, err = serverSocket:accept() + if not serverClientSocket then + error("Failed to accept connection: " .. (err or "unknown error")) + end + serverClientSocket:settimeout(0) + + -- Create large message to send from client to server + local paddingData = createLargeMessage() + local largeMessage = { + type = "chatMessage", + content = { + padding = paddingData + }, + sender = "player", + senderId = 1 + } + local messageString = NetworkProtocol.markedMessageForTypeAndBody( + NetworkProtocol.clientMessageTypes.jsonMessage.prefix, + json.encode(largeMessage) + ) + + -- Phase 1: Send while reading SLOWLY to trigger partial send + local sendAttempts = 0 + local partialSendDetected = false + local serverReceivedData = "" + + while sendAttempts < SEND_ATTEMPTS_MAX and not partialSendDetected do + sendAttempts = sendAttempts + 1 + + local success = tcpClient:sendMessage(messageString) + if not success then + assert(false, "couldn't keep connection open for test") + break + end + + -- Read slowly to create backpressure + local chunk, _, partial = serverClientSocket:receive(SLOW_READ_SIZE) + if chunk or partial then + serverReceivedData = serverReceivedData .. (chunk or partial) + end + + if tcpClient.sendRetryCount and tcpClient.sendRetryCount > 0 then + logger.trace("✓ Partial send detected! (sendRetryCount = " .. tcpClient.sendRetryCount .. ")") + logger.trace(" Messages still in queue: " .. tcpClient.outgoingMessageQueue:len()) + logger.trace(" Server has buffered: " .. #serverReceivedData .. " bytes so far") + partialSendDetected = true + break + end + + socket.sleep(SLEEP_INTERVAL) + end + assert(partialSendDetected) + + -- Phase 2: Read fast to allow retry to succeed + local retryAttempts = 0 + local resendSucceeded = false + + while retryAttempts < RETRY_MAX_ATTEMPTS do + retryAttempts = retryAttempts + 1 + local sendRetryCountBefore = tcpClient.sendRetryCount + + tcpClient:updateNetwork(0) + + -- Read from server socket + local chunk, error, partial = serverClientSocket:receive("*a") + if error == "timeout" then + chunk = partial + end + if chunk and #chunk > 0 then + serverReceivedData = serverReceivedData .. chunk + end + + if sendRetryCountBefore > 0 and tcpClient.sendRetryCount == 0 then + resendSucceeded = true + break + end + + socket.sleep(SLEEP_INTERVAL) + end + assert(resendSucceeded) + + logger.trace("Retry phase completed. sendRetryCount = " .. (tcpClient.sendRetryCount or 0)) + logger.trace("Server buffer size: " .. #serverReceivedData .. " bytes\n") + + -- Phase 3: Read remaining data and validate messages + local startTime = socket.gettime() + local messageCount = 0 + local corruptedCount = 0 + + while socket.gettime() - startTime < READ_TIMEOUT and corruptedCount == 0 do + tcpClient:updateNetwork(0) + -- Read from server socket + local chunk, error, partial = serverClientSocket:receive("*a") + + if error == "timeout" then + chunk = partial + end + + if chunk and #chunk > 0 then + serverReceivedData = serverReceivedData .. chunk + end + + local newMessages, newCorrupted, updatedBuffer + newMessages, newCorrupted, updatedBuffer = processMessages( + serverReceivedData, + NetworkProtocol.clientMessageTypes.jsonMessage.prefix + ) + serverReceivedData = updatedBuffer or serverReceivedData + messageCount = messageCount + newMessages + corruptedCount = corruptedCount + newCorrupted + + if messageCount == sendAttempts or corruptedCount > 0 then + logger.info("Ending early") + break + end + + socket.sleep(FAST_READ_SLEEP) + end + + assert(messageCount > 0 and corruptedCount == 0) + logger.info("Total messages received: " .. messageCount) + logger.info("Corrupted messages: " .. corruptedCount) + + -- Cleanup + tcpClient:resetNetwork() + serverClientSocket:close() + serverSocket:close() +end + +-- Run the tests +testRealSocketPartialSend() +testTcpClientPartialSend() diff --git a/testLauncher.lua b/testLauncher.lua index 53d23c1c..b38496d9 100644 --- a/testLauncher.lua +++ b/testLauncher.lua @@ -81,6 +81,7 @@ local allTests = { "server.tests.LeaderboardTests", "server.tests.RoomTests", "server.tests.LoginTests", + "server.tests.RealSocketPartialSendTest", "client.tests.FileUtilsTests", "client.tests.ModControllerTests", "client.tests.QueueTests",