Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/Solana.Unity.Rpc/ClientFactory.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using NativeWebSocket;
using Solana.Unity.Rpc.Utilities;
using System.Net.Http;
using System.Net.WebSockets;
Expand Down Expand Up @@ -161,6 +162,20 @@ public static IStreamingRpcClient GetStreamingClient(
};
return GetStreamingClient(url, logger);
}

public static IStreamingRpcClient GetStreamingClient(
Cluster cluster,
IWebSocket socket)
{
var url = cluster switch
{
Cluster.DevNet => StreamingRpcDevNet,
Cluster.TestNet => StreamingRpcTestNet,
Cluster.LocalNet => StreamingRpcLocalNet,
_ => StreamingRpcMainNet,
};
return GetStreamingClient(url, null, socket, null);
}

/// <summary>
/// Instantiate a streaming client.
Expand All @@ -169,7 +184,7 @@ public static IStreamingRpcClient GetStreamingClient(
/// <param name="logger">The logger.</param>
/// <param name="clientWebSocket">A ClientWebSocket instance. If null, a new instance will be created.</param>
/// <returns>The streaming client.</returns>
public static IStreamingRpcClient GetStreamingClient(string url, object logger = null, ClientWebSocket clientWebSocket = null)
public static IStreamingRpcClient GetStreamingClient(string url, object logger = null, IWebSocket socket = null, ClientWebSocket clientWebSocket = null)
{
return new SolanaStreamingRpcClient(url, logger, null, clientWebSocket);
}
Expand Down
22 changes: 19 additions & 3 deletions src/Solana.Unity.Rpc/Core/Sockets/StreamingRpcClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Solana.Unity.Rpc.Types;
using System;
using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -58,14 +57,30 @@ protected StreamingRpcClient(string url, object logger, IWebSocket socket = defa
_connectionStats = new ConnectionStats();
ClientSocket.ConnectionStateChangedEvent += (sender, state) => ConnectionStateChangedEvent?.Invoke(sender, state);
}

/// <summary>
/// Constructor that setups the client with a IWebSocket instance.
/// </summary>
/// <param name="url"></param>
/// <param name="socket"></param>
protected StreamingRpcClient(string url, IWebSocket socket)
{
NodeAddress = new Uri(url);
ClientSocket = socket ?? new WebSocketWrapper();
_logger = null;
_sem = new SemaphoreSlim(1, 1);
_connectionStats = new ConnectionStats();
ClientSocket.ConnectionStateChangedEvent += (sender, state) => ConnectionStateChangedEvent?.Invoke(sender, state);
}

/// <summary>
/// Initializes the websocket connection and starts receiving messages asynchronously.
/// </summary>
/// <returns>Returns the task representing the asynchronous task.</returns>
public async Task ConnectAsync()
{
_sem.Wait();
if (ClientSocket.State == WebSocketState.Open) return;
await _sem.WaitAsync().ConfigureAwait(false);
try
{
if (ClientSocket.State != WebSocketState.Open)
Expand Down Expand Up @@ -94,7 +109,8 @@ private void DispatchMessage(byte[] message)
/// <inheritdoc cref="IStreamingRpcClient.DisconnectAsync"/>
public async Task DisconnectAsync()
{
_sem.Wait();
if (ClientSocket.State == WebSocketState.Closed) return;
await _sem.WaitAsync().ConfigureAwait(false);
try
{
if (ClientSocket.State == WebSocketState.Open)
Expand Down
4 changes: 4 additions & 0 deletions src/Solana.Unity.Rpc/Core/Sockets/WebSocketWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
webSocket = WebSocket.Create(uri.AbsoluteUri);
webSocket.OnOpen += () =>
{
MainThreadUtil.Run(() =>
{
_webSocketConnectionTask.TrySetResult(true);
});
_webSocketConnectionTask.TrySetResult(true);
webSocket.OnMessage += MessageReceived;
ConnectionStateChangedEvent?.Invoke(this, State);
Expand Down
1 change: 1 addition & 0 deletions src/Solana.Unity.Rpc/SolanaStreamingRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ internal SolanaStreamingRpcClient(string url, object logger = null, IWebSocket w
};
}


/// <summary>
/// Try Reconnect to the server and reopening the confirmed subscription.
/// </summary>
Expand Down
Loading