From 8d1020fd155b45f80c15e9773a25df229b137696 Mon Sep 17 00:00:00 2001 From: jonsequitur Date: Sun, 13 Mar 2016 13:04:13 -0700 Subject: [PATCH 1/3] initial implementation of AsyncDispatchQueue --- Its.Log.UnitTests/AsyncDispatchQueue.cs | 111 +++++++++++++++++++++ Its.Log.UnitTests/AsyncTests.cs | 15 +-- Its.Log.UnitTests/DispatchQueueTests.cs | 43 ++++++++ Its.Log.UnitTests/Its.Log.UnitTests.csproj | 5 +- Its.Log.UnitTests/TestHelper.cs | 7 ++ Its.Log/AsyncDispatchQueue{T}.cs | 93 +++++++++++++++++ Its.Log/Its.Log.csproj | 3 + Its.Log/LogEntryDispatchQueue.cs | 18 ++++ Its.Log/TelemetryDispatchQueue.cs | 16 +++ 9 files changed, 296 insertions(+), 15 deletions(-) create mode 100644 Its.Log.UnitTests/AsyncDispatchQueue.cs create mode 100644 Its.Log.UnitTests/DispatchQueueTests.cs create mode 100644 Its.Log/AsyncDispatchQueue{T}.cs create mode 100644 Its.Log/LogEntryDispatchQueue.cs create mode 100644 Its.Log/TelemetryDispatchQueue.cs diff --git a/Its.Log.UnitTests/AsyncDispatchQueue.cs b/Its.Log.UnitTests/AsyncDispatchQueue.cs new file mode 100644 index 0000000..5b3cd9d --- /dev/null +++ b/Its.Log.UnitTests/AsyncDispatchQueue.cs @@ -0,0 +1,111 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Its.Log.Instrumentation.Extensions; + +namespace Its.Log.Instrumentation.UnitTests +{ + + public class LogEntryDispatchQueue : AsyncDispatchQueue + { + public LogEntryDispatchQueue(Func send, IObservable logEvents = null) : + base(send, logEvents ?? Log.Events()) + { + } + } + + public class TelemetryDispatchQueue : AsyncDispatchQueue + { + public TelemetryDispatchQueue(Func send, IObservable telemetryEvents = null) : + base(send, telemetryEvents ?? Log.TelemetryEvents()) + { + } + } + public abstract class AsyncDispatchQueue : IDisposable + { + private readonly Func send; + private readonly IDisposable subscription; + private readonly BlockingCollection blockingCollection; + private readonly Thread dispatcherThread; + + protected AsyncDispatchQueue(Func send, IObservable events) + { + if (send == null) + { + throw new ArgumentNullException(nameof(send)); + } + if (events == null) + { + throw new ArgumentNullException(nameof(events)); + } + + this.send = send; + blockingCollection = new BlockingCollection(); + subscription = events.Subscribe(new Observer(blockingCollection)); + + dispatcherThread = new Thread(Send); + dispatcherThread.Start(); + } + + private void Send() + { + while (true) + { + try + { + var value = blockingCollection.Take(); + send(value).Wait(); + } + catch (InvalidOperationException) + { + // collection is completed + break; + } + } + } + + private async Task Drain() + { + while (!blockingCollection.IsEmpty()) + { + await Task.Delay(50); + } + } + + public void Dispose() + { + blockingCollection.CompleteAdding(); + subscription.Dispose(); + Drain().Wait(TimeSpan.FromSeconds(30)); + } + + private class Observer : IObserver + { + private readonly BlockingCollection blockingCollection; + + public Observer(BlockingCollection blockingCollection) + { + if (blockingCollection == null) + { + throw new ArgumentNullException(nameof(blockingCollection)); + } + this.blockingCollection = blockingCollection; + } + + public void OnNext(T value) => blockingCollection.Add(value); + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + } + } +} \ No newline at end of file diff --git a/Its.Log.UnitTests/AsyncTests.cs b/Its.Log.UnitTests/AsyncTests.cs index 9bded53..2780fb6 100644 --- a/Its.Log.UnitTests/AsyncTests.cs +++ b/Its.Log.UnitTests/AsyncTests.cs @@ -95,10 +95,7 @@ public class AsyncTestHelper public IAsyncResult BeginSomething(AsyncCallback callback, object state) { var activity = state as ILogActivity; - if (activity != null) - { - activity.Trace(() => "hello"); - } + activity?.Trace(() => "hello"); return new AsyncResult { @@ -111,14 +108,8 @@ public void EndSomething(IAsyncResult result) var state = result.AsyncState as Tuple; if (state != null) { - if (state.Item2 != null) - { - state.Item2.Trace(() => "hello"); - } - if (state.Item1 != null) - { - state.Item1.Invoke(result); - } + state.Item2?.Trace(() => "hello"); + state.Item1?.Invoke(result); } } diff --git a/Its.Log.UnitTests/DispatchQueueTests.cs b/Its.Log.UnitTests/DispatchQueueTests.cs new file mode 100644 index 0000000..1f3f769 --- /dev/null +++ b/Its.Log.UnitTests/DispatchQueueTests.cs @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Collections.Generic; +using FluentAssertions; +using System.Threading.Tasks; +using Its.Log.Instrumentation.Extensions; +using NUnit.Framework; + +namespace Its.Log.Instrumentation.UnitTests +{ + [TestFixture] + public class DispatchQueueTests + { + [Test] + public async Task Queue_can_dispatch_log_entries() + { + var log = new List(); + using (new LogEntryDispatchQueue(send: async e => log.Add(e))) + { + using (Log.Enter(() => { })) + { + } + } + + log.Should().HaveCount(2); + } + + [Test] + public async Task Queue_can_dispatch_telemetry_events() + { + var log = new List(); + using (new TelemetryDispatchQueue(send: async e => log.Add(e))) + { + using (Log.With().Enter(() => { })) + { + } + } + + log.Should().HaveCount(1); + } + } +} \ No newline at end of file diff --git a/Its.Log.UnitTests/Its.Log.UnitTests.csproj b/Its.Log.UnitTests/Its.Log.UnitTests.csproj index 93f7b36..b74b4c7 100644 --- a/Its.Log.UnitTests/Its.Log.UnitTests.csproj +++ b/Its.Log.UnitTests/Its.Log.UnitTests.csproj @@ -105,9 +105,6 @@ False ..\packages\Rx-PlatformServices.2.1.30214.0\lib\Net40\System.Reactive.PlatformServices.dll - - 3.0 - @@ -119,11 +116,13 @@ Properties\ProductInfo.cs + + diff --git a/Its.Log.UnitTests/TestHelper.cs b/Its.Log.UnitTests/TestHelper.cs index e3e57b7..bb6b260 100644 --- a/Its.Log.UnitTests/TestHelper.cs +++ b/Its.Log.UnitTests/TestHelper.cs @@ -4,6 +4,7 @@ using System; using System.Linq; using System.Reactive.Linq; +using Its.Log.Instrumentation.Extensions; namespace Its.Log.Instrumentation.UnitTests { @@ -34,6 +35,12 @@ public static IDisposable SubscribeToLogInternalErrors(this IObserver return subscription; } + public static IDisposable SubscribeToTelemetryEvents(this IObserver observer) + { + var subscription = Log.TelemetryEvents().Subscribe(observer); + return subscription; + } + public static IDisposable OnEntryPosted(Action doSomething) { return Observable diff --git a/Its.Log/AsyncDispatchQueue{T}.cs b/Its.Log/AsyncDispatchQueue{T}.cs new file mode 100644 index 0000000..6b549d3 --- /dev/null +++ b/Its.Log/AsyncDispatchQueue{T}.cs @@ -0,0 +1,93 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; + +namespace Its.Log.Instrumentation +{ + public abstract class AsyncDispatchQueue : IDisposable + { + private readonly Func send; + private readonly IDisposable subscription; + private readonly BlockingCollection blockingCollection; + private readonly Thread dispatcherThread; + + protected AsyncDispatchQueue(Func send, IObservable events) + { + if (send == null) + { + throw new ArgumentNullException(nameof(send)); + } + if (events == null) + { + throw new ArgumentNullException(nameof(events)); + } + + this.send = send; + blockingCollection = new BlockingCollection(); + subscription = events.Subscribe(new Observer(blockingCollection)); + + dispatcherThread = new Thread(Send); + dispatcherThread.Start(); + } + + private void Send() + { + while (true) + { + try + { + var value = blockingCollection.Take(); + send(value).Wait(); + } + catch (InvalidOperationException) + { + // collection is completed + break; + } + } + } + + private async Task Drain() + { + while (!blockingCollection.IsEmpty()) + { + await Task.Delay(50); + } + } + + public void Dispose() + { + blockingCollection.CompleteAdding(); + subscription.Dispose(); + Drain().Wait(TimeSpan.FromSeconds(30)); + } + + private class Observer : IObserver + { + private readonly BlockingCollection blockingCollection; + + public Observer(BlockingCollection blockingCollection) + { + if (blockingCollection == null) + { + throw new ArgumentNullException(nameof(blockingCollection)); + } + this.blockingCollection = blockingCollection; + } + + public void OnNext(T value) => blockingCollection.Add(value); + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + } + } +} \ No newline at end of file diff --git a/Its.Log/Its.Log.csproj b/Its.Log/Its.Log.csproj index 8ad01ca..41b094d 100644 --- a/Its.Log/Its.Log.csproj +++ b/Its.Log/Its.Log.csproj @@ -93,6 +93,7 @@ Code + Code @@ -126,10 +127,12 @@ Code + + diff --git a/Its.Log/LogEntryDispatchQueue.cs b/Its.Log/LogEntryDispatchQueue.cs new file mode 100644 index 0000000..fc934d5 --- /dev/null +++ b/Its.Log/LogEntryDispatchQueue.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Threading.Tasks; + +namespace Its.Log.Instrumentation +{ + public class LogEntryDispatchQueue : AsyncDispatchQueue + { + public LogEntryDispatchQueue( + Func send, + IObservable logEvents = null) : + base(send, logEvents ?? Log.Events()) + { + } + } +} \ No newline at end of file diff --git a/Its.Log/TelemetryDispatchQueue.cs b/Its.Log/TelemetryDispatchQueue.cs new file mode 100644 index 0000000..884cf8f --- /dev/null +++ b/Its.Log/TelemetryDispatchQueue.cs @@ -0,0 +1,16 @@ +using System; +using System.Threading.Tasks; +using Its.Log.Instrumentation.Extensions; + +namespace Its.Log.Instrumentation +{ + public class TelemetryDispatchQueue : AsyncDispatchQueue + { + public TelemetryDispatchQueue( + Func send, + IObservable telemetryEvents = null) : + base(send, telemetryEvents ?? Log.TelemetryEvents()) + { + } + } +} \ No newline at end of file From 4f0f8e68c93d37f8a8b1af71c499deb6b3baf068 Mon Sep 17 00:00:00 2001 From: jonsequitur Date: Sun, 13 Mar 2016 14:49:00 -0700 Subject: [PATCH 2/3] add AsyncDispatchQueue error resiliency --- Its.Log.UnitTests/AsyncDispatchQueue.cs | 111 --------------------- Its.Log.UnitTests/DispatchQueueTests.cs | 48 +++++++-- Its.Log.UnitTests/Its.Log.UnitTests.csproj | 1 - Its.Log/AsyncDispatchQueue{T}.cs | 14 ++- 4 files changed, 51 insertions(+), 123 deletions(-) delete mode 100644 Its.Log.UnitTests/AsyncDispatchQueue.cs diff --git a/Its.Log.UnitTests/AsyncDispatchQueue.cs b/Its.Log.UnitTests/AsyncDispatchQueue.cs deleted file mode 100644 index 5b3cd9d..0000000 --- a/Its.Log.UnitTests/AsyncDispatchQueue.cs +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using System; -using System.Collections.Concurrent; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Its.Log.Instrumentation.Extensions; - -namespace Its.Log.Instrumentation.UnitTests -{ - - public class LogEntryDispatchQueue : AsyncDispatchQueue - { - public LogEntryDispatchQueue(Func send, IObservable logEvents = null) : - base(send, logEvents ?? Log.Events()) - { - } - } - - public class TelemetryDispatchQueue : AsyncDispatchQueue - { - public TelemetryDispatchQueue(Func send, IObservable telemetryEvents = null) : - base(send, telemetryEvents ?? Log.TelemetryEvents()) - { - } - } - public abstract class AsyncDispatchQueue : IDisposable - { - private readonly Func send; - private readonly IDisposable subscription; - private readonly BlockingCollection blockingCollection; - private readonly Thread dispatcherThread; - - protected AsyncDispatchQueue(Func send, IObservable events) - { - if (send == null) - { - throw new ArgumentNullException(nameof(send)); - } - if (events == null) - { - throw new ArgumentNullException(nameof(events)); - } - - this.send = send; - blockingCollection = new BlockingCollection(); - subscription = events.Subscribe(new Observer(blockingCollection)); - - dispatcherThread = new Thread(Send); - dispatcherThread.Start(); - } - - private void Send() - { - while (true) - { - try - { - var value = blockingCollection.Take(); - send(value).Wait(); - } - catch (InvalidOperationException) - { - // collection is completed - break; - } - } - } - - private async Task Drain() - { - while (!blockingCollection.IsEmpty()) - { - await Task.Delay(50); - } - } - - public void Dispose() - { - blockingCollection.CompleteAdding(); - subscription.Dispose(); - Drain().Wait(TimeSpan.FromSeconds(30)); - } - - private class Observer : IObserver - { - private readonly BlockingCollection blockingCollection; - - public Observer(BlockingCollection blockingCollection) - { - if (blockingCollection == null) - { - throw new ArgumentNullException(nameof(blockingCollection)); - } - this.blockingCollection = blockingCollection; - } - - public void OnNext(T value) => blockingCollection.Add(value); - - public void OnError(Exception error) - { - } - - public void OnCompleted() - { - } - } - } -} \ No newline at end of file diff --git a/Its.Log.UnitTests/DispatchQueueTests.cs b/Its.Log.UnitTests/DispatchQueueTests.cs index 1f3f769..98e6f52 100644 --- a/Its.Log.UnitTests/DispatchQueueTests.cs +++ b/Its.Log.UnitTests/DispatchQueueTests.cs @@ -1,8 +1,10 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System; using System.Collections.Generic; using FluentAssertions; +using System.Linq; using System.Threading.Tasks; using Its.Log.Instrumentation.Extensions; using NUnit.Framework; @@ -13,31 +15,61 @@ namespace Its.Log.Instrumentation.UnitTests public class DispatchQueueTests { [Test] - public async Task Queue_can_dispatch_log_entries() + public async Task log_entries_can_be_queued() { var log = new List(); using (new LogEntryDispatchQueue(send: async e => log.Add(e))) { - using (Log.Enter(() => { })) - { - } + WriteInParallel(10); } - log.Should().HaveCount(2); + log.Should().HaveCount(10); } [Test] - public async Task Queue_can_dispatch_telemetry_events() + public async Task telemetry_events_can_be_queued() { var log = new List(); + using (new TelemetryDispatchQueue(send: async e => log.Add(e))) { - using (Log.With().Enter(() => { })) + WriteInParallel(10, _ => { + using (Log.With().Enter(() => { })) + { + } + }); + } + + log.Should().HaveCount(10); + } + + [Test] + public async Task When_the_sender_throws_then_other_events_continue_to_be_sent() + { + var log = new List(); + var count = 0; + + using (new LogEntryDispatchQueue(send: async e => + { + if (++count == 1) + { + throw new Exception("oops"); } + log.Add(e); + })) + { + WriteInParallel(10); } - log.Should().HaveCount(1); + log.Should().HaveCount(9); + } + + public void WriteInParallel(int degreesOfParallelism = 5, Action write = null) + { + Parallel.ForEach(Enumerable.Range(1, degreesOfParallelism), + write ?? + (i => Log.Write(() => i))); } } } \ No newline at end of file diff --git a/Its.Log.UnitTests/Its.Log.UnitTests.csproj b/Its.Log.UnitTests/Its.Log.UnitTests.csproj index b74b4c7..45fd69f 100644 --- a/Its.Log.UnitTests/Its.Log.UnitTests.csproj +++ b/Its.Log.UnitTests/Its.Log.UnitTests.csproj @@ -116,7 +116,6 @@ Properties\ProductInfo.cs - diff --git a/Its.Log/AsyncDispatchQueue{T}.cs b/Its.Log/AsyncDispatchQueue{T}.cs index 6b549d3..a5f7687 100644 --- a/Its.Log/AsyncDispatchQueue{T}.cs +++ b/Its.Log/AsyncDispatchQueue{T}.cs @@ -3,12 +3,13 @@ using System; using System.Collections.Concurrent; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; namespace Its.Log.Instrumentation { - public abstract class AsyncDispatchQueue : IDisposable + public class AsyncDispatchQueue : IDisposable { private readonly Func send; private readonly IDisposable subscription; @@ -46,14 +47,21 @@ private void Send() catch (InvalidOperationException) { // collection is completed - break; + if (blockingCollection.IsCompleted) + { + break; + } + } + catch (Exception exception) + { + Debug.WriteLine(exception); } } } private async Task Drain() { - while (!blockingCollection.IsEmpty()) + while (!blockingCollection.IsCompleted) { await Task.Delay(50); } From 3c0e2a8f857a1e92e4ec3b7190f09e98a86a122c Mon Sep 17 00:00:00 2001 From: jonsequitur Date: Mon, 14 Mar 2016 08:10:37 -0700 Subject: [PATCH 3/3] change ctor param order --- Its.Log/AsyncDispatchQueue{T}.cs | 15 +++------------ Its.Log/LogEntryDispatchQueue.cs | 2 +- Its.Log/TelemetryDispatchQueue.cs | 2 +- 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/Its.Log/AsyncDispatchQueue{T}.cs b/Its.Log/AsyncDispatchQueue{T}.cs index a5f7687..cde5d9a 100644 --- a/Its.Log/AsyncDispatchQueue{T}.cs +++ b/Its.Log/AsyncDispatchQueue{T}.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Concurrent; -using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -16,7 +15,7 @@ public class AsyncDispatchQueue : IDisposable private readonly BlockingCollection blockingCollection; private readonly Thread dispatcherThread; - protected AsyncDispatchQueue(Func send, IObservable events) + protected AsyncDispatchQueue(IObservable events, Func send) { if (send == null) { @@ -37,24 +36,16 @@ protected AsyncDispatchQueue(Func send, IObservable events) private void Send() { - while (true) + while (!blockingCollection.IsCompleted) { try { var value = blockingCollection.Take(); send(value).Wait(); } - catch (InvalidOperationException) - { - // collection is completed - if (blockingCollection.IsCompleted) - { - break; - } - } catch (Exception exception) { - Debug.WriteLine(exception); + exception.RaiseErrorEvent(); } } } diff --git a/Its.Log/LogEntryDispatchQueue.cs b/Its.Log/LogEntryDispatchQueue.cs index fc934d5..78ec764 100644 --- a/Its.Log/LogEntryDispatchQueue.cs +++ b/Its.Log/LogEntryDispatchQueue.cs @@ -11,7 +11,7 @@ public class LogEntryDispatchQueue : AsyncDispatchQueue public LogEntryDispatchQueue( Func send, IObservable logEvents = null) : - base(send, logEvents ?? Log.Events()) + base(logEvents ?? Log.Events(), send) { } } diff --git a/Its.Log/TelemetryDispatchQueue.cs b/Its.Log/TelemetryDispatchQueue.cs index 884cf8f..a002210 100644 --- a/Its.Log/TelemetryDispatchQueue.cs +++ b/Its.Log/TelemetryDispatchQueue.cs @@ -9,7 +9,7 @@ public class TelemetryDispatchQueue : AsyncDispatchQueue public TelemetryDispatchQueue( Func send, IObservable telemetryEvents = null) : - base(send, telemetryEvents ?? Log.TelemetryEvents()) + base(telemetryEvents ?? Log.TelemetryEvents(), send) { } }