From 475f27797e69e49d9d730a56340fcc19483ce66d Mon Sep 17 00:00:00 2001 From: Ilya Loshchinin Date: Fri, 4 Oct 2019 16:11:06 +0200 Subject: [PATCH] Unified asynchronous task usage - use BeardedManStudios.Threading.Task for all task needs, instead of System.Threading directly; - added thread join support: BMS.Threading.Task.WaitAll(); - in any infinite loops, use NetWorker.IsActiveSession checks instead of just IsBound, to cover EndingSession requests; - NetWorker.EndSession now uses Task.WaitAll instead of an arbitrary 1s wait; - fixed NetworkingPlayer.BackgroundServerPing to use non-zero wait time and actually ping the server. Initially meant as a fix for issue #322. --- .../Forge/Networking/CommonServer.cs | 2 +- .../Networking/Forge/Networking/NetWorker.cs | 20 ++++++--------- .../Forge/Networking/NetworkingPlayer.cs | 6 ++--- .../Forge/Networking/SteamP2PClient.cs | 2 +- .../Forge/Networking/SteamP2PServer.cs | 2 +- .../Networking/Forge/Networking/UDPClient.cs | 11 ++++---- .../Networking/Forge/Networking/UDPServer.cs | 2 +- .../Scripts/Networking/Threading/Task.cs | 25 +++++++++++++++++++ 8 files changed, 45 insertions(+), 25 deletions(-) diff --git a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/CommonServer.cs b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/CommonServer.cs index 88ec2757..3704a3a8 100644 --- a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/CommonServer.cs +++ b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/CommonServer.cs @@ -110,7 +110,7 @@ private bool updateCountCheck(NetworkingPlayer sender, NetworkingPlayer player, public void CheckClientTimeout(Action timeoutDisconnect) { List timedoutPlayers = new List(); - while (server.IsBound) + while (server.IsActiveSession) { server.IteratePlayers((player) => { diff --git a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/NetWorker.cs b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/NetWorker.cs index 84f0b3e2..ac0a0fd3 100644 --- a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/NetWorker.cs +++ b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/NetWorker.cs @@ -3,7 +3,6 @@ using System.Linq; using System.Net; using System.Net.Sockets; -using System.Threading; using BeardedManStudios.Forge.Networking.DataStore; using BeardedManStudios.Forge.Networking.Frame; using BeardedManStudios.Source.Forge.Networking; @@ -452,10 +451,10 @@ private void RunNetworkObjectHeartbeats() private void LoopNetworkObjectHeartbeats() { - while (IsBound) + while (IsActiveSession) { RunNetworkObjectHeartbeats(); - Thread.Sleep(10); + Task.Sleep(10); } } @@ -1010,9 +1009,8 @@ public void SetUserAuthenticator(IUserAuthenticator authenticator) this.authenticator = authenticator; } - private static void BindAndReleaseOnTCP(object state) + private static void BindAndReleaseOnTCP(ushort port) { - ushort port = (ushort)state; try { //IPAddress ipAddress = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0]; @@ -1039,7 +1037,7 @@ public static void PingForFirewall(ushort port = 0) port = DEFAULT_PORT - 1; } - ThreadPool.QueueUserWorkItem(BindAndReleaseOnTCP, port); + Task.Queue(() => { BindAndReleaseOnTCP(port); }); } public static void EndSession() @@ -1047,12 +1045,10 @@ public static void EndSession() EndingSession = true; CloseLocalListingsClient(); - // Reset the ending session after 1000ms so that we know all the threads have cleaned up - // for any remaining threads that may be going for this previous process - Task.Queue(() => - { - EndingSession = false; - }, 1000); + // Wait until all the threads have cleaned up before resetting the termination flag + Task.WaitAll(); + + EndingSession = false; } public Ping GeneratePing() diff --git a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/NetworkingPlayer.cs b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/NetworkingPlayer.cs index ac5bf1d7..4caf978f 100644 --- a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/NetworkingPlayer.cs +++ b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/NetworkingPlayer.cs @@ -211,16 +211,16 @@ public NetworkingPlayer(uint networkId, string ip, bool isHost, object socketEnd Port = (ushort)IPEndPointHandle.Port; } - ThreadPool.QueueUserWorkItem(BackgroundServerPing); + Task.Queue(BackgroundServerPing); } - private void BackgroundServerPing(object _) + private void BackgroundServerPing() { // There is no reason for the server to ping itself if ((Networker is IServer)) return; - int waitTime = 0, currentPingWait = 0; + int waitTime = 100, currentPingWait = 0; while (Networker.IsActiveSession) { Task.Sleep(waitTime); diff --git a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/SteamP2PClient.cs b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/SteamP2PClient.cs index 67131778..0013cebd 100644 --- a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/SteamP2PClient.cs +++ b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/SteamP2PClient.cs @@ -153,7 +153,7 @@ public void Connect(CSteamID hostId, bool pendCreates = false) // Send the accept headers to the server to validate Client.Send(connectHeader, connectHeader.Length, hostId, EP2PSend.k_EP2PSendUnreliable); Thread.Sleep(3000); - } while (!initialConnectHeaderExchanged && IsBound && ++connectCounter < CONNECT_TRIES); + } while (!initialConnectHeaderExchanged && IsActiveSession && ++connectCounter < CONNECT_TRIES); if (connectCounter >= CONNECT_TRIES) { diff --git a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/SteamP2PServer.cs b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/SteamP2PServer.cs index d0d8efee..576f3b4a 100644 --- a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/SteamP2PServer.cs +++ b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/SteamP2PServer.cs @@ -335,7 +335,7 @@ private void ReadClients() BMSByte packet = null; // Intentional infinite loop - while (IsBound) + while (IsActiveSession) { // If the read has been flagged to be canceled then break from this loop if (IsReadThreadCancelPending) diff --git a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/UDPClient.cs b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/UDPClient.cs index 926c1018..dc750a8a 100644 --- a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/UDPClient.cs +++ b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/UDPClient.cs @@ -1,7 +1,6 @@ using System; using System.Net; using System.Net.Sockets; -using System.Threading; using BeardedManStudios.Forge.Networking.Frame; using BeardedManStudios.Forge.Networking.Nat; using BeardedManStudios.Threading; @@ -101,7 +100,7 @@ private ushort FindAvailablePort(ushort clientPort, ushort port) return clientPort; } - private void AttemptServerConnection(object _) + private void AttemptServerConnection() { int connectCounter = 0; @@ -112,8 +111,8 @@ private void AttemptServerConnection(object _) { // Send the accept headers to the server to validate Client.Send(connectHeader, connectHeader.Length, ServerPlayer.IPEndPointHandle); - Thread.Sleep(3000); - } while (!initialConnectHeaderExchanged && IsBound && ++connectCounter < CONNECT_TRIES); + Task.Sleep(3000); + } while (!initialConnectHeaderExchanged && IsActiveSession && ++connectCounter < CONNECT_TRIES); if (connectCounter >= CONNECT_TRIES && connectAttemptFailed != null) connectAttemptFailed(this); @@ -172,7 +171,7 @@ private void BindAndConnect(ushort overrideBindingPort, ushort port, string natH SetNetworkBindings(overrideBindingPort, port, natHost, host, natPort); CreateTheNetworkingPlayer(host, port); SetupConnectingState(); - ThreadPool.QueueUserWorkItem(AttemptServerConnection); + Task.Queue(AttemptServerConnection); } /// @@ -245,7 +244,7 @@ private void ReadNetwork() { try { - while (IsBound) + while (IsActiveSession) { // If the read has been flagged to be canceled then break from this loop if (IsReadThreadCancelPending) diff --git a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/UDPServer.cs b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/UDPServer.cs index e0526e05..4fde6dd8 100644 --- a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/UDPServer.cs +++ b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Forge/Networking/UDPServer.cs @@ -310,7 +310,7 @@ private void ReadClients() BMSByte packet = null; // Intentional infinite loop - while (IsBound) + while (IsActiveSession) { // If the read has been flagged to be canceled then break from this loop if (IsReadThreadCancelPending) diff --git a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Threading/Task.cs b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Threading/Task.cs index 2411d403..63ccae24 100644 --- a/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Threading/Task.cs +++ b/ForgeUnity/Assets/BeardedManStudios/Scripts/Networking/Threading/Task.cs @@ -1,5 +1,6 @@ using System; using System.Threading; +using BeardedManStudios.Forge.Logging; namespace BeardedManStudios.Threading { @@ -25,12 +26,21 @@ public static bool IsMainThread /// public static class Task { + /// + /// Event that is signaled when all task threads have completed their execution + /// + public static ManualResetEvent threadsJoinedEvent = new ManualResetEvent(true); + + private static int numRunningTasks = 0; + /// /// Sets the method that is to be executed on the separate thread /// /// The method that is to be called on the newly created thread private static void QueueExpression(WaitCallback expression) { + Interlocked.Increment(ref numRunningTasks); + threadsJoinedEvent.Reset(); ThreadPool.QueueUserWorkItem(expression); } @@ -51,6 +61,10 @@ public static void Queue(Action expression, int delayOrSleep = 0) // Call the requested method expression(); + if (Interlocked.Decrement(ref numRunningTasks) == 0) + { + threadsJoinedEvent.Set(); + } }; // Set the method to be called on the separate thread to be the inline method we have just created @@ -61,5 +75,16 @@ public static void Sleep(int milliseconds) { Thread.Sleep(milliseconds); } + + /// + /// Block execution until all enqueued tasks have completed + /// + public static void WaitAll() + { + while (!threadsJoinedEvent.WaitOne(1000, false)) + { + BMSLog.LogFormat("Task: WaitAll waited for 1s"); + } + } } }