diff --git a/Its.Log.UnitTests/AsyncTests.cs b/Its.Log.UnitTests/AsyncTests.cs index 9bded53..2780fb6 100644 --- a/Its.Log.UnitTests/AsyncTests.cs +++ b/Its.Log.UnitTests/AsyncTests.cs @@ -95,10 +95,7 @@ public class AsyncTestHelper public IAsyncResult BeginSomething(AsyncCallback callback, object state) { var activity = state as ILogActivity; - if (activity != null) - { - activity.Trace(() => "hello"); - } + activity?.Trace(() => "hello"); return new AsyncResult { @@ -111,14 +108,8 @@ public void EndSomething(IAsyncResult result) var state = result.AsyncState as Tuple; if (state != null) { - if (state.Item2 != null) - { - state.Item2.Trace(() => "hello"); - } - if (state.Item1 != null) - { - state.Item1.Invoke(result); - } + state.Item2?.Trace(() => "hello"); + state.Item1?.Invoke(result); } } diff --git a/Its.Log.UnitTests/DispatchQueueTests.cs b/Its.Log.UnitTests/DispatchQueueTests.cs new file mode 100644 index 0000000..98e6f52 --- /dev/null +++ b/Its.Log.UnitTests/DispatchQueueTests.cs @@ -0,0 +1,75 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using FluentAssertions; +using System.Linq; +using System.Threading.Tasks; +using Its.Log.Instrumentation.Extensions; +using NUnit.Framework; + +namespace Its.Log.Instrumentation.UnitTests +{ + [TestFixture] + public class DispatchQueueTests + { + [Test] + public async Task log_entries_can_be_queued() + { + var log = new List(); + using (new LogEntryDispatchQueue(send: async e => log.Add(e))) + { + WriteInParallel(10); + } + + log.Should().HaveCount(10); + } + + [Test] + public async Task telemetry_events_can_be_queued() + { + var log = new List(); + + using (new TelemetryDispatchQueue(send: async e => log.Add(e))) + { + WriteInParallel(10, _ => + { + using (Log.With().Enter(() => { })) + { + } + }); + } + + log.Should().HaveCount(10); + } + + [Test] + public async Task When_the_sender_throws_then_other_events_continue_to_be_sent() + { + var log = new List(); + var count = 0; + + using (new LogEntryDispatchQueue(send: async e => + { + if (++count == 1) + { + throw new Exception("oops"); + } + log.Add(e); + })) + { + WriteInParallel(10); + } + + log.Should().HaveCount(9); + } + + public void WriteInParallel(int degreesOfParallelism = 5, Action write = null) + { + Parallel.ForEach(Enumerable.Range(1, degreesOfParallelism), + write ?? + (i => Log.Write(() => i))); + } + } +} \ No newline at end of file diff --git a/Its.Log.UnitTests/Its.Log.UnitTests.csproj b/Its.Log.UnitTests/Its.Log.UnitTests.csproj index 93f7b36..45fd69f 100644 --- a/Its.Log.UnitTests/Its.Log.UnitTests.csproj +++ b/Its.Log.UnitTests/Its.Log.UnitTests.csproj @@ -105,9 +105,6 @@ False ..\packages\Rx-PlatformServices.2.1.30214.0\lib\Net40\System.Reactive.PlatformServices.dll - - 3.0 - @@ -124,6 +121,7 @@ + diff --git a/Its.Log.UnitTests/TestHelper.cs b/Its.Log.UnitTests/TestHelper.cs index e3e57b7..bb6b260 100644 --- a/Its.Log.UnitTests/TestHelper.cs +++ b/Its.Log.UnitTests/TestHelper.cs @@ -4,6 +4,7 @@ using System; using System.Linq; using System.Reactive.Linq; +using Its.Log.Instrumentation.Extensions; namespace Its.Log.Instrumentation.UnitTests { @@ -34,6 +35,12 @@ public static IDisposable SubscribeToLogInternalErrors(this IObserver return subscription; } + public static IDisposable SubscribeToTelemetryEvents(this IObserver observer) + { + var subscription = Log.TelemetryEvents().Subscribe(observer); + return subscription; + } + public static IDisposable OnEntryPosted(Action doSomething) { return Observable diff --git a/Its.Log/AsyncDispatchQueue{T}.cs b/Its.Log/AsyncDispatchQueue{T}.cs new file mode 100644 index 0000000..cde5d9a --- /dev/null +++ b/Its.Log/AsyncDispatchQueue{T}.cs @@ -0,0 +1,92 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; + +namespace Its.Log.Instrumentation +{ + public class AsyncDispatchQueue : IDisposable + { + private readonly Func send; + private readonly IDisposable subscription; + private readonly BlockingCollection blockingCollection; + private readonly Thread dispatcherThread; + + protected AsyncDispatchQueue(IObservable events, Func send) + { + if (send == null) + { + throw new ArgumentNullException(nameof(send)); + } + if (events == null) + { + throw new ArgumentNullException(nameof(events)); + } + + this.send = send; + blockingCollection = new BlockingCollection(); + subscription = events.Subscribe(new Observer(blockingCollection)); + + dispatcherThread = new Thread(Send); + dispatcherThread.Start(); + } + + private void Send() + { + while (!blockingCollection.IsCompleted) + { + try + { + var value = blockingCollection.Take(); + send(value).Wait(); + } + catch (Exception exception) + { + exception.RaiseErrorEvent(); + } + } + } + + private async Task Drain() + { + while (!blockingCollection.IsCompleted) + { + await Task.Delay(50); + } + } + + public void Dispose() + { + blockingCollection.CompleteAdding(); + subscription.Dispose(); + Drain().Wait(TimeSpan.FromSeconds(30)); + } + + private class Observer : IObserver + { + private readonly BlockingCollection blockingCollection; + + public Observer(BlockingCollection blockingCollection) + { + if (blockingCollection == null) + { + throw new ArgumentNullException(nameof(blockingCollection)); + } + this.blockingCollection = blockingCollection; + } + + public void OnNext(T value) => blockingCollection.Add(value); + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + } + } +} \ No newline at end of file diff --git a/Its.Log/Its.Log.csproj b/Its.Log/Its.Log.csproj index 8ad01ca..41b094d 100644 --- a/Its.Log/Its.Log.csproj +++ b/Its.Log/Its.Log.csproj @@ -93,6 +93,7 @@ Code + Code @@ -126,10 +127,12 @@ Code + + diff --git a/Its.Log/LogEntryDispatchQueue.cs b/Its.Log/LogEntryDispatchQueue.cs new file mode 100644 index 0000000..78ec764 --- /dev/null +++ b/Its.Log/LogEntryDispatchQueue.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Threading.Tasks; + +namespace Its.Log.Instrumentation +{ + public class LogEntryDispatchQueue : AsyncDispatchQueue + { + public LogEntryDispatchQueue( + Func send, + IObservable logEvents = null) : + base(logEvents ?? Log.Events(), send) + { + } + } +} \ No newline at end of file diff --git a/Its.Log/TelemetryDispatchQueue.cs b/Its.Log/TelemetryDispatchQueue.cs new file mode 100644 index 0000000..a002210 --- /dev/null +++ b/Its.Log/TelemetryDispatchQueue.cs @@ -0,0 +1,16 @@ +using System; +using System.Threading.Tasks; +using Its.Log.Instrumentation.Extensions; + +namespace Its.Log.Instrumentation +{ + public class TelemetryDispatchQueue : AsyncDispatchQueue + { + public TelemetryDispatchQueue( + Func send, + IObservable telemetryEvents = null) : + base(telemetryEvents ?? Log.TelemetryEvents(), send) + { + } + } +} \ No newline at end of file