From d73ff9b6ac9bbd802ef6861a81a9bca4710b9f98 Mon Sep 17 00:00:00 2001 From: Gabriele Picco Date: Wed, 19 Mar 2025 13:19:46 +0100 Subject: [PATCH] fix: streaming rpc --- src/Solana.Unity.Rpc/ClientFactory.cs | 17 +++++++++++++- .../Core/Sockets/StreamingRpcClient.cs | 22 ++++++++++++++++--- .../Core/Sockets/WebSocketWrapper.cs | 4 ++++ .../SolanaStreamingRpcClient.cs | 1 + 4 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/Solana.Unity.Rpc/ClientFactory.cs b/src/Solana.Unity.Rpc/ClientFactory.cs index d0df9bb9..c925fee0 100644 --- a/src/Solana.Unity.Rpc/ClientFactory.cs +++ b/src/Solana.Unity.Rpc/ClientFactory.cs @@ -1,3 +1,4 @@ +using NativeWebSocket; using Solana.Unity.Rpc.Utilities; using System.Net.Http; using System.Net.WebSockets; @@ -161,6 +162,20 @@ public static IStreamingRpcClient GetStreamingClient( }; return GetStreamingClient(url, logger); } + + public static IStreamingRpcClient GetStreamingClient( + Cluster cluster, + IWebSocket socket) + { + var url = cluster switch + { + Cluster.DevNet => StreamingRpcDevNet, + Cluster.TestNet => StreamingRpcTestNet, + Cluster.LocalNet => StreamingRpcLocalNet, + _ => StreamingRpcMainNet, + }; + return GetStreamingClient(url, null, socket, null); + } /// /// Instantiate a streaming client. @@ -169,7 +184,7 @@ public static IStreamingRpcClient GetStreamingClient( /// The logger. /// A ClientWebSocket instance. If null, a new instance will be created. /// The streaming client. - public static IStreamingRpcClient GetStreamingClient(string url, object logger = null, ClientWebSocket clientWebSocket = null) + public static IStreamingRpcClient GetStreamingClient(string url, object logger = null, IWebSocket socket = null, ClientWebSocket clientWebSocket = null) { return new SolanaStreamingRpcClient(url, logger, null, clientWebSocket); } diff --git a/src/Solana.Unity.Rpc/Core/Sockets/StreamingRpcClient.cs b/src/Solana.Unity.Rpc/Core/Sockets/StreamingRpcClient.cs index ba707656..b7e2f197 100644 --- a/src/Solana.Unity.Rpc/Core/Sockets/StreamingRpcClient.cs +++ b/src/Solana.Unity.Rpc/Core/Sockets/StreamingRpcClient.cs @@ -1,6 +1,5 @@ using Solana.Unity.Rpc.Types; using System; -using System.IO; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; @@ -58,6 +57,21 @@ protected StreamingRpcClient(string url, object logger, IWebSocket socket = defa _connectionStats = new ConnectionStats(); ClientSocket.ConnectionStateChangedEvent += (sender, state) => ConnectionStateChangedEvent?.Invoke(sender, state); } + + /// + /// Constructor that setups the client with a IWebSocket instance. + /// + /// + /// + protected StreamingRpcClient(string url, IWebSocket socket) + { + NodeAddress = new Uri(url); + ClientSocket = socket ?? new WebSocketWrapper(); + _logger = null; + _sem = new SemaphoreSlim(1, 1); + _connectionStats = new ConnectionStats(); + ClientSocket.ConnectionStateChangedEvent += (sender, state) => ConnectionStateChangedEvent?.Invoke(sender, state); + } /// /// Initializes the websocket connection and starts receiving messages asynchronously. @@ -65,7 +79,8 @@ protected StreamingRpcClient(string url, object logger, IWebSocket socket = defa /// Returns the task representing the asynchronous task. public async Task ConnectAsync() { - _sem.Wait(); + if (ClientSocket.State == WebSocketState.Open) return; + await _sem.WaitAsync().ConfigureAwait(false); try { if (ClientSocket.State != WebSocketState.Open) @@ -94,7 +109,8 @@ private void DispatchMessage(byte[] message) /// public async Task DisconnectAsync() { - _sem.Wait(); + if (ClientSocket.State == WebSocketState.Closed) return; + await _sem.WaitAsync().ConfigureAwait(false); try { if (ClientSocket.State == WebSocketState.Open) diff --git a/src/Solana.Unity.Rpc/Core/Sockets/WebSocketWrapper.cs b/src/Solana.Unity.Rpc/Core/Sockets/WebSocketWrapper.cs index 216c7454..58dcda41 100644 --- a/src/Solana.Unity.Rpc/Core/Sockets/WebSocketWrapper.cs +++ b/src/Solana.Unity.Rpc/Core/Sockets/WebSocketWrapper.cs @@ -45,6 +45,10 @@ public Task ConnectAsync(Uri uri, CancellationToken cancellationToken) webSocket = WebSocket.Create(uri.AbsoluteUri); webSocket.OnOpen += () => { + MainThreadUtil.Run(() => + { + _webSocketConnectionTask.TrySetResult(true); + }); _webSocketConnectionTask.TrySetResult(true); webSocket.OnMessage += MessageReceived; ConnectionStateChangedEvent?.Invoke(this, State); diff --git a/src/Solana.Unity.Rpc/SolanaStreamingRpcClient.cs b/src/Solana.Unity.Rpc/SolanaStreamingRpcClient.cs index ceb60956..f33cf1f0 100644 --- a/src/Solana.Unity.Rpc/SolanaStreamingRpcClient.cs +++ b/src/Solana.Unity.Rpc/SolanaStreamingRpcClient.cs @@ -60,6 +60,7 @@ internal SolanaStreamingRpcClient(string url, object logger = null, IWebSocket w }; } + /// /// Try Reconnect to the server and reopening the confirmed subscription. ///