Skip to content

Commit fde9b91

Browse files
committed
It did not automatically reconnect when server closed the connection. ConnectedState was stuck on read task's result.
1 parent d584d9c commit fde9b91

File tree

3 files changed

+86
-8
lines changed

3 files changed

+86
-8
lines changed

EventSource4Net.Test/EventSourceTest.cs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,52 @@ public void TestSuccesfulConnection()
8181
}
8282

8383

84+
[TestMethod]
85+
public void TestReConnectionAfterConnectionLost()
86+
{
87+
// setup
88+
Uri url = new Uri("http://test.com");
89+
CancellationTokenSource cts = new CancellationTokenSource();
90+
List<EventSourceState> states = new List<EventSourceState>();
91+
ServiceResponseMock serviceResponseMock = new ServiceResponseMock(url, System.Net.HttpStatusCode.OK);
92+
WebRequesterFactoryMock factory = new WebRequesterFactoryMock(serviceResponseMock);
93+
ManualResetEvent stateIsOpen = new ManualResetEvent(false);
94+
ManualResetEvent stateIsClosed = new ManualResetEvent(false);
95+
96+
TestableEventSource es = new TestableEventSource(url, factory);
97+
es.StateChanged += (o, e) =>
98+
{
99+
states.Add(e.State);
100+
if (e.State == EventSourceState.OPEN)
101+
{
102+
stateIsClosed.Reset();
103+
stateIsOpen.Set();
104+
}
105+
else if (e.State == EventSourceState.CLOSED)
106+
{
107+
stateIsOpen.Reset();
108+
stateIsClosed.Set();
109+
}
110+
};
111+
112+
113+
// act
114+
stateIsOpen.Reset();
115+
116+
es.Start(cts.Token);
117+
118+
stateIsOpen.WaitOne();
119+
states.Clear();
120+
121+
serviceResponseMock.DistantConnectionClose();
122+
123+
stateIsClosed.WaitOrThrow();
124+
stateIsOpen.WaitOrThrow();
125+
126+
// assert
127+
Assert.AreEqual(states[0], EventSourceState.CLOSED);
128+
Assert.AreEqual(states[1], EventSourceState.CONNECTING);
129+
Assert.AreEqual(states[2], EventSourceState.OPEN);
130+
}
84131
}
85132
}

EventSource4Net.Test/WebRequesterFactoryTest.cs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.IO;
44
using System.Linq;
55
using System.Net;
6+
using System.Net.Sockets;
67
using System.Text;
78
using System.Threading;
89
using System.Threading.Tasks;
@@ -18,7 +19,7 @@ public WebRequesterMock WebRequesterMock
1819
}
1920
public WebRequesterFactoryMock(ServiceResponseMock response)
2021
{
21-
this.WebRequesterMock = new WebRequesterMock(response);
22+
this.WebRequesterMock = new WebRequesterMock(response);
2223
}
2324
public IWebRequester Create()
2425
{
@@ -48,7 +49,7 @@ public System.Threading.Tasks.Task<IServerResponse> Get(Uri url)
4849

4950
class ServiceResponseMock : IServerResponse
5051
{
51-
private Stream mStream;
52+
private TestableStream mStream;
5253
private StreamWriter mStreamWriter;
5354
private Uri mUrl;
5455
private HttpStatusCode mStatusCode;
@@ -87,6 +88,11 @@ public void WriteTestTextToStream(string text)
8788
mStreamWriter.Write(text);
8889
mStreamWriter.Flush();
8990
}
91+
92+
public void DistantConnectionClose()
93+
{
94+
mStream.Throws(new SocketException(10054));
95+
}
9096
}
9197

9298
class GetIsCalledEventArgs : EventArgs
@@ -103,6 +109,13 @@ class TestableStream : Stream
103109
{
104110
long _pos = 0;
105111
System.Collections.Concurrent.BlockingCollection<string> _texts = new System.Collections.Concurrent.BlockingCollection<string>();
112+
private CancellationTokenSource _cancellationTokenSource;
113+
private Exception _throw;
114+
115+
public TestableStream()
116+
{
117+
_cancellationTokenSource = new CancellationTokenSource();
118+
}
106119

107120
public override bool CanRead
108121
{
@@ -143,11 +156,23 @@ public override long Position
143156

144157
public override int Read(byte[] buffer, int offset, int count)
145158
{
146-
string s = _texts.Take();
147-
148-
byte[] encodedText = Encoding.UTF8.GetBytes(s);
149-
encodedText.CopyTo(buffer, offset);
150-
return encodedText.Length;
159+
try
160+
{
161+
var s = _texts.Take(_cancellationTokenSource.Token);
162+
byte[] encodedText = Encoding.UTF8.GetBytes(s);
163+
encodedText.CopyTo(buffer, offset);
164+
return encodedText.Length;
165+
}
166+
catch (OperationCanceledException)
167+
{
168+
if (_throw != null)
169+
{
170+
var ex = _throw;
171+
_throw = null;
172+
throw ex;
173+
}
174+
return 0;
175+
}
151176
}
152177

153178
public override long Seek(long offset, SeekOrigin origin)
@@ -166,6 +191,12 @@ public override void Write(byte[] buffer, int offset, int count)
166191
_texts.Add(s);
167192
//_texts.CompleteAdding();
168193
}
194+
195+
public void Throws(Exception exception)
196+
{
197+
_cancellationTokenSource.Cancel();
198+
_throw = exception;
199+
}
169200
}
170201

171202

EventSource4Net/ConnectedState.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public Task<IConnectionState> Run(Action<ServerSentEvent> msgReceived, Cancellat
4747
{
4848
_logger.Trace(ex, "ConnectedState.Run");
4949
}
50-
if (!cancelToken.IsCancellationRequested)
50+
if (!cancelToken.IsCancellationRequested && !taskRead.IsFaulted)
5151
{
5252
int bytesRead = taskRead.Result;
5353
if (bytesRead > 0) // stream has not reached the end yet

0 commit comments

Comments
 (0)