diff --git a/Itmo.Dev.Platform.sln b/Itmo.Dev.Platform.sln index 8ee2c16..27e076a 100644 --- a/Itmo.Dev.Platform.sln +++ b/Itmo.Dev.Platform.sln @@ -102,10 +102,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Locking", "Locking", "{49FF EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Itmo.Dev.Platform.Locking.Redis", "src\Itmo.Dev.Platform.Locking.Redis\Itmo.Dev.Platform.Locking.Redis.csproj", "{4C5DF002-21D7-4C9D-BAFD-748F672A0C62}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Itmo.Dev.Platform.Observability.Tests", "tests\Itmo.Dev.Platform.Observability.Tests\Itmo.Dev.Platform.Observability.Tests.csproj", "{A57ECCE4-255C-4357-90D8-5F8A43275AD9}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Itmo.Dev.Platform.Observability.Tests.Startup", "tests\Itmo.Dev.Platform.Observability.Tests.Startup\Itmo.Dev.Platform.Observability.Tests.Startup.csproj", "{2CD6ACA3-3385-4CB0-86FE-BA9270735311}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -244,14 +240,6 @@ Global {4C5DF002-21D7-4C9D-BAFD-748F672A0C62}.Debug|Any CPU.Build.0 = Debug|Any CPU {4C5DF002-21D7-4C9D-BAFD-748F672A0C62}.Release|Any CPU.ActiveCfg = Release|Any CPU {4C5DF002-21D7-4C9D-BAFD-748F672A0C62}.Release|Any CPU.Build.0 = Release|Any CPU - {A57ECCE4-255C-4357-90D8-5F8A43275AD9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {A57ECCE4-255C-4357-90D8-5F8A43275AD9}.Debug|Any CPU.Build.0 = Debug|Any CPU - {A57ECCE4-255C-4357-90D8-5F8A43275AD9}.Release|Any CPU.ActiveCfg = Release|Any CPU - {A57ECCE4-255C-4357-90D8-5F8A43275AD9}.Release|Any CPU.Build.0 = Release|Any CPU - {2CD6ACA3-3385-4CB0-86FE-BA9270735311}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {2CD6ACA3-3385-4CB0-86FE-BA9270735311}.Debug|Any CPU.Build.0 = Debug|Any CPU - {2CD6ACA3-3385-4CB0-86FE-BA9270735311}.Release|Any CPU.ActiveCfg = Release|Any CPU - {2CD6ACA3-3385-4CB0-86FE-BA9270735311}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {9C9FC028-1F10-42B3-8938-A3EAE0D07C1F} = {65E3EDFE-875D-42A1-A224-E06E746B8A5E} @@ -292,7 +280,5 @@ Global {49FF216F-A911-4470-82A3-B4285FDA3D23} = {8B6CE8C2-3319-44B9-A8B7-19F5F94B911D} {D936C0C2-FCA9-4C53-83B4-25CC52A748DF} = {49FF216F-A911-4470-82A3-B4285FDA3D23} {4C5DF002-21D7-4C9D-BAFD-748F672A0C62} = {49FF216F-A911-4470-82A3-B4285FDA3D23} - {A57ECCE4-255C-4357-90D8-5F8A43275AD9} = {B24A2836-A847-40AB-85A2-6590ABA8ACB2} - {2CD6ACA3-3385-4CB0-86FE-BA9270735311} = {B24A2836-A847-40AB-85A2-6590ABA8ACB2} EndGlobalSection EndGlobal diff --git a/src/Itmo.Dev.Platform.Common/Extensions/ActivityExtensions.cs b/src/Itmo.Dev.Platform.Common/Extensions/ActivityExtensions.cs new file mode 100644 index 0000000..76b5ebf --- /dev/null +++ b/src/Itmo.Dev.Platform.Common/Extensions/ActivityExtensions.cs @@ -0,0 +1,16 @@ +using System.Diagnostics; + +namespace Itmo.Dev.Platform.Common.Extensions; + +public static class ActivityExtensions +{ + public static Activity? WithDisplayName(this Activity? activity, string displayName) + { + if (activity is not null) + { + activity.DisplayName = displayName; + } + + return activity; + } +} diff --git a/src/Itmo.Dev.Platform.Kafka/Consumer/Inbox/InboxConsumerHandler.cs b/src/Itmo.Dev.Platform.Kafka/Consumer/Inbox/InboxConsumerHandler.cs index cb3629c..db442c1 100644 --- a/src/Itmo.Dev.Platform.Kafka/Consumer/Inbox/InboxConsumerHandler.cs +++ b/src/Itmo.Dev.Platform.Kafka/Consumer/Inbox/InboxConsumerHandler.cs @@ -1,6 +1,9 @@ +using Itmo.Dev.Platform.Common.Extensions; using Itmo.Dev.Platform.Common.Models; using Itmo.Dev.Platform.Kafka.Consumer.Models; using Itmo.Dev.Platform.MessagePersistence; +using Itmo.Dev.Platform.MessagePersistence.Tools; +using System.Diagnostics; namespace Itmo.Dev.Platform.Kafka.Consumer.Inbox; @@ -26,6 +29,13 @@ public async ValueTask HandleAsync( throw new InvalidOperationException( $"Failed to write inbox message, invalid message type, expected = {expectedType}. You probably attempting some unaccounted platform tampering"); } + + using var activity = PlatformMessagePersistenceActivitySource.Value + .StartActivity( + name: PlatformMessagePersistenceConstants.SpanName, + ActivityKind.Internal, + parentContext: default) + .WithDisplayName($"[inbox] {_messageName}"); var persistedMessages = consumerMessages .Select(message => new PersistedMessage>(Unit.Value, message)) diff --git a/src/Itmo.Dev.Platform.Kafka/Consumer/Models/KafkaConsumerMessage.cs b/src/Itmo.Dev.Platform.Kafka/Consumer/Models/KafkaConsumerMessage.cs index 85e9a31..192bc16 100644 --- a/src/Itmo.Dev.Platform.Kafka/Consumer/Models/KafkaConsumerMessage.cs +++ b/src/Itmo.Dev.Platform.Kafka/Consumer/Models/KafkaConsumerMessage.cs @@ -1,5 +1,6 @@ using Confluent.Kafka; using Newtonsoft.Json; +using System.Text; namespace Itmo.Dev.Platform.Kafka.Consumer.Models; @@ -19,6 +20,10 @@ public KafkaConsumerMessage(IConsumer consumer, ConsumeResult header.Key, + header => Encoding.UTF8.GetString(header.GetValueBytes())); } /// @@ -31,13 +36,15 @@ private KafkaConsumerMessage( ConsumeResult result, TKey key, TValue value, - string topic) + string topic, + Dictionary headers) { _consumer = consumer; _result = result; Key = key; Value = value; Topic = topic; + Headers = headers; } public TKey Key { get; } @@ -52,6 +59,8 @@ private KafkaConsumerMessage( public Offset Offset { get; } + public Dictionary Headers { get; } + public void Commit() { _consumer.Commit(_result); diff --git a/src/Itmo.Dev.Platform.Kafka/Consumer/Services/BatchingKafkaConsumerService.cs b/src/Itmo.Dev.Platform.Kafka/Consumer/Services/BatchingKafkaConsumerService.cs index 4909d82..1ae08a5 100644 --- a/src/Itmo.Dev.Platform.Kafka/Consumer/Services/BatchingKafkaConsumerService.cs +++ b/src/Itmo.Dev.Platform.Kafka/Consumer/Services/BatchingKafkaConsumerService.cs @@ -92,11 +92,12 @@ private async Task ExecuteSingleAsync( var messageHandler = new ConsumerMessageHandler( consumerOptions, - _scopeFactory); + _scopeFactory, + logger: _serviceProvider.GetRequiredService>>()); await ParallelAction.ExecuteAsync( cancellationToken, new ParallelAction(1, c => messageReader.ReadAsync(channel.Writer, c)), new ParallelAction(consumerOptions.ParallelismDegree, c => messageHandler.HandleAsync(channel.Reader, c))); } -} \ No newline at end of file +} diff --git a/src/Itmo.Dev.Platform.Kafka/Consumer/Services/ConsumerMessageHandler.cs b/src/Itmo.Dev.Platform.Kafka/Consumer/Services/ConsumerMessageHandler.cs index ba1d4af..bc6b5d1 100644 --- a/src/Itmo.Dev.Platform.Kafka/Consumer/Services/ConsumerMessageHandler.cs +++ b/src/Itmo.Dev.Platform.Kafka/Consumer/Services/ConsumerMessageHandler.cs @@ -1,6 +1,8 @@ +using Itmo.Dev.Platform.Common.Extensions; using Itmo.Dev.Platform.Kafka.Consumer.Models; using Itmo.Dev.Platform.Kafka.Tools; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; using System.Diagnostics; using System.Threading.Channels; @@ -10,11 +12,16 @@ internal class ConsumerMessageHandler { private readonly KafkaConsumerOptions _consumerOptions; private readonly IServiceScopeFactory _scopeFactory; + private readonly ILogger> _logger; - public ConsumerMessageHandler(KafkaConsumerOptions consumerOptions, IServiceScopeFactory scopeFactory) + public ConsumerMessageHandler( + KafkaConsumerOptions consumerOptions, + IServiceScopeFactory scopeFactory, + ILogger> logger) { _consumerOptions = consumerOptions; _scopeFactory = scopeFactory; + _logger = logger; } public async Task HandleAsync( @@ -38,9 +45,13 @@ public async Task HandleAsync( .Where(x => x.Value is not null) .ToArray(); - using var activity = PlatformKafkaActivitySource.Value.StartActivity( - name: $"consume: {_consumerOptions.Topic}", - ActivityKind.Consumer); + using var activity = PlatformKafkaActivitySource.Value + .StartActivity( + name: "Kafka", + ActivityKind.Consumer, + parentContext: default, + links: [..EnumerateActivityLinks(chunk)]) + .WithDisplayName($"[consume] {_consumerOptions.Topic}"); await handler.HandleAsync(consumerMessages, cancellationToken); @@ -50,4 +61,35 @@ public async Task HandleAsync( } } } + + private IEnumerable EnumerateActivityLinks( + IEnumerable> messages) + { + var traceParentHeaders = messages + .SelectMany(message => message.Headers, (message, header) => (message, header)) + .Where(tuple => tuple.header.Key.Equals( + PlatformKafkaConstants.TraceParentHeaderName, + StringComparison.OrdinalIgnoreCase)); + + foreach (var (message, header) in traceParentHeaders) + { + if (ActivityContext.TryParse(header.Value, null, out var context)) + { + var tags = new ActivityTagsCollection + { + [PlatformKafkaConstants.TopicTagName] = message.Topic, + [PlatformKafkaConstants.PartitionTagName] = message.Partition.Value, + [PlatformKafkaConstants.OffsetTagName] = message.Offset.Value, + }; + + yield return new ActivityLink(context, tags); + } + else + { + _logger.LogWarning( + "Failed to parse activity context = '{TraceParent}'", + header.Value); + } + } + } } diff --git a/src/Itmo.Dev.Platform.Kafka/Producer/Outbox/AlwaysOutboxMessageProducer.cs b/src/Itmo.Dev.Platform.Kafka/Producer/Outbox/AlwaysOutboxMessageProducer.cs index f08dd44..5574274 100644 --- a/src/Itmo.Dev.Platform.Kafka/Producer/Outbox/AlwaysOutboxMessageProducer.cs +++ b/src/Itmo.Dev.Platform.Kafka/Producer/Outbox/AlwaysOutboxMessageProducer.cs @@ -1,4 +1,7 @@ +using Itmo.Dev.Platform.Common.Extensions; using Itmo.Dev.Platform.MessagePersistence; +using Itmo.Dev.Platform.MessagePersistence.Tools; +using System.Diagnostics; namespace Itmo.Dev.Platform.Kafka.Producer.Outbox; @@ -17,6 +20,13 @@ public async Task ProduceAsync( IAsyncEnumerable> messages, CancellationToken cancellationToken) { + using var activity = PlatformMessagePersistenceActivitySource.Value + .StartActivity( + name: PlatformMessagePersistenceConstants.SpanName, + ActivityKind.Internal, + parentContext: default) + .WithDisplayName($"[outbox] {_topicName}"); + var persistedMessages = await messages .Select(x => new PersistedMessage(x.Key, x.Value)) .ToArrayAsync(cancellationToken); diff --git a/src/Itmo.Dev.Platform.Kafka/Producer/Outbox/FallbackOutboxMessageProducer.cs b/src/Itmo.Dev.Platform.Kafka/Producer/Outbox/FallbackOutboxMessageProducer.cs index be3547d..358b0d7 100644 --- a/src/Itmo.Dev.Platform.Kafka/Producer/Outbox/FallbackOutboxMessageProducer.cs +++ b/src/Itmo.Dev.Platform.Kafka/Producer/Outbox/FallbackOutboxMessageProducer.cs @@ -1,5 +1,8 @@ +using Itmo.Dev.Platform.Common.Extensions; using Itmo.Dev.Platform.MessagePersistence; +using Itmo.Dev.Platform.MessagePersistence.Tools; using Microsoft.Extensions.DependencyInjection; +using System.Diagnostics; namespace Itmo.Dev.Platform.Kafka.Producer.Outbox; @@ -21,13 +24,20 @@ public async Task ProduceAsync( CancellationToken cancellationToken) { var messagesArray = await messages.ToArrayAsync(cancellationToken); - + try { await _producer.ProduceAsync(messagesArray.ToAsyncEnumerable(), cancellationToken); } catch { + using var activity = PlatformMessagePersistenceActivitySource.Value + .StartActivity( + name: PlatformMessagePersistenceConstants.SpanName, + ActivityKind.Internal, + parentContext: default) + .WithDisplayName($"[outbox] {_topicName}"); + var persistedMessages = messagesArray .Select(x => new PersistedMessage(x.Key, x.Value)) .ToArray(); @@ -38,4 +48,4 @@ await _consumer.ConsumeAsync( cancellationToken); } } -} \ No newline at end of file +} diff --git a/src/Itmo.Dev.Platform.Kafka/Producer/Services/KafkaMessageProducer.cs b/src/Itmo.Dev.Platform.Kafka/Producer/Services/KafkaMessageProducer.cs index f8ec28a..fec7a37 100644 --- a/src/Itmo.Dev.Platform.Kafka/Producer/Services/KafkaMessageProducer.cs +++ b/src/Itmo.Dev.Platform.Kafka/Producer/Services/KafkaMessageProducer.cs @@ -1,10 +1,12 @@ using Confluent.Kafka; +using Itmo.Dev.Platform.Common.Extensions; using Itmo.Dev.Platform.Kafka.Configuration; using Itmo.Dev.Platform.Kafka.Tools; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Diagnostics; +using System.Text; namespace Itmo.Dev.Platform.Kafka.Producer.Services; @@ -52,9 +54,14 @@ public async Task ProduceAsync( IAsyncEnumerable> messages, CancellationToken cancellationToken) { - using var activity = PlatformKafkaActivitySource.Value.StartActivity( - name: $"produce: {_options.Topic}", - ActivityKind.Producer); + using var activity = PlatformKafkaActivitySource.Value + .StartActivity( + name: "Kafka", + ActivityKind.Producer, + parentContext: default) + .WithDisplayName($"[produce] {_options.Topic}"); + + Headers headers = [..EnumerateHeaders()]; try { @@ -64,6 +71,7 @@ public async Task ProduceAsync( { Key = producerKafkaMessage.Key, Value = producerKafkaMessage.Value, + Headers = headers, }; await _producer.ProduceAsync(_options.Topic, message, cancellationToken); @@ -80,4 +88,12 @@ public void Dispose() { _producer.Dispose(); } + + private static IEnumerable
EnumerateHeaders() + { + if (Activity.Current is { Id: { } traceId }) + { + yield return new Header(PlatformKafkaConstants.TraceParentHeaderName, Encoding.UTF8.GetBytes(traceId)); + } + } } diff --git a/src/Itmo.Dev.Platform.Kafka/Tools/PlatformKafkaActivitySource.cs b/src/Itmo.Dev.Platform.Kafka/Tools/PlatformKafkaActivitySource.cs index b6f8419..6e53dda 100644 --- a/src/Itmo.Dev.Platform.Kafka/Tools/PlatformKafkaActivitySource.cs +++ b/src/Itmo.Dev.Platform.Kafka/Tools/PlatformKafkaActivitySource.cs @@ -4,6 +4,9 @@ namespace Itmo.Dev.Platform.Kafka.Tools; internal static class PlatformKafkaActivitySource { + /// + /// Required to be const, as Observability package depends on compile-time const string inlining + /// public const string Name = "Itmo.Dev.Platform.Kafka"; public static readonly ActivitySource Value = new(Name); diff --git a/src/Itmo.Dev.Platform.Kafka/Tools/PlatformKafkaConstants.cs b/src/Itmo.Dev.Platform.Kafka/Tools/PlatformKafkaConstants.cs new file mode 100644 index 0000000..cdb2ccc --- /dev/null +++ b/src/Itmo.Dev.Platform.Kafka/Tools/PlatformKafkaConstants.cs @@ -0,0 +1,12 @@ +namespace Itmo.Dev.Platform.Kafka.Tools; + +public static class PlatformKafkaConstants +{ + public const string TraceParentHeaderName = "traceparent"; + + public const string TopicTagName = "topic"; + + public const string PartitionTagName = "partition"; + + public const string OffsetTagName = "offset"; +} diff --git a/src/Itmo.Dev.Platform.MessagePersistence.Postgres/Migrations/1763246858_AddedMessageHeaders.cs b/src/Itmo.Dev.Platform.MessagePersistence.Postgres/Migrations/1763246858_AddedMessageHeaders.cs new file mode 100644 index 0000000..e3b02b5 --- /dev/null +++ b/src/Itmo.Dev.Platform.MessagePersistence.Postgres/Migrations/1763246858_AddedMessageHeaders.cs @@ -0,0 +1,31 @@ +using FluentMigrator; +using Itmo.Dev.Platform.MessagePersistence.Postgres.Configuration; +using Itmo.Dev.Platform.Persistence.Postgres.Migrations; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; + +namespace Itmo.Dev.Platform.MessagePersistence.Postgres.Migrations; + +[Migration(1763246858, "Added message headers")] +public sealed class AddedMessageHeaders : SqlMigration +{ + protected override string GetUpSql(IServiceProvider serviceProvider) + { + var options = serviceProvider.GetRequiredService>(); + + return $""" + ALTER TABLE {options.Value.SchemaName}.persisted_messages + ADD COLUMN persisted_message_headers jsonb null; + """; + } + + protected override string GetDownSql(IServiceProvider serviceProvider) + { + var options = serviceProvider.GetRequiredService>(); + + return $""" + ALTER TABLE {options.Value.SchemaName}.persisted_messages + DROP COLUMN persisted_message_headers; + """; + } +} diff --git a/src/Itmo.Dev.Platform.MessagePersistence.Postgres/Queries/MessagePersistenceQueryStorage.cs b/src/Itmo.Dev.Platform.MessagePersistence.Postgres/Queries/MessagePersistenceQueryStorage.cs index 467c136..d488497 100644 --- a/src/Itmo.Dev.Platform.MessagePersistence.Postgres/Queries/MessagePersistenceQueryStorage.cs +++ b/src/Itmo.Dev.Platform.MessagePersistence.Postgres/Queries/MessagePersistenceQueryStorage.cs @@ -10,7 +10,8 @@ internal class MessagePersistenceQueryStorage(MessagePersistenceQueryFactory fac persisted_message_key, persisted_message_value, persisted_message_retry_count, - persisted_message_buffering_step + persisted_message_buffering_step, + persisted_message_headers from {o.SchemaName}.persisted_messages where (cardinality(:ids) = 0 or persisted_message_id = any(:ids)) @@ -29,8 +30,9 @@ insert into {o.SchemaName}.persisted_messages persisted_message_state, persisted_message_key, persisted_message_value, - persisted_message_buffering_step) - select * from unnest(:names, :created_at, :states, :keys, :values, :buffering_steps) + persisted_message_buffering_step, + persisted_message_headers) + select * from unnest(:names, :created_at, :states, :keys, :values, :buffering_steps, :headers) returning persisted_message_id; """); @@ -38,8 +40,9 @@ insert into {o.SchemaName}.persisted_messages update {o.SchemaName}.persisted_messages set persisted_message_state = source.state, persisted_message_retry_count = source.retry_count, - persisted_message_buffering_step = source.buffering_step - from (select * from unnest(:ids, :states, :retry_counts, :buffering_steps)) as source(id, state, retry_count, buffering_step) + persisted_message_buffering_step = source.buffering_step, + persisted_message_headers = source.headers + from (select * from unnest(:ids, :states, :retry_counts, :buffering_steps, :headers)) as source(id, state, retry_count, buffering_step, headers) where persisted_message_id = source.id """); diff --git a/src/Itmo.Dev.Platform.MessagePersistence.Postgres/Repositories/MessagePersistenceRepository.cs b/src/Itmo.Dev.Platform.MessagePersistence.Postgres/Repositories/MessagePersistenceRepository.cs index 1fb516e..f04e06e 100644 --- a/src/Itmo.Dev.Platform.MessagePersistence.Postgres/Repositories/MessagePersistenceRepository.cs +++ b/src/Itmo.Dev.Platform.MessagePersistence.Postgres/Repositories/MessagePersistenceRepository.cs @@ -47,7 +47,8 @@ public async IAsyncEnumerable QueryAsync( key: reader.GetString("persisted_message_key"), value: reader.GetString("persisted_message_value"), retryCount: reader.GetInt32("persisted_message_retry_count"), - bufferingStep: reader.GetNullableString("persisted_message_buffering_step")); + bufferingStep: reader.GetNullableString("persisted_message_buffering_step"), + headers: reader.GetJsonFieldValue>("persisted_message_headers")); } } @@ -63,7 +64,8 @@ public async IAsyncEnumerable AddAsync( .AddParameter("states", messages.Select(message => message.State)) .AddJsonArrayParameter("keys", messages.Select(message => message.Key)) .AddJsonArrayParameter("values", messages.Select(message => message.Value)) - .AddParameter("buffering_steps", messages.Select(message => message.BufferingStep)); + .AddParameter("buffering_steps", messages.Select(message => message.BufferingStep)) + .AddJsonArrayParameter("headers", messages.Select(message => message.Headers)); await using var reader = await command.ExecuteReaderAsync(cancellationToken); @@ -81,7 +83,8 @@ public async Task UpdateAsync(IReadOnlyCollection messages, C .AddParameter("ids", messages.Select(message => message.Id)) .AddParameter("states", messages.Select(message => message.State)) .AddParameter("retry_counts", messages.Select(message => message.RetryCount)) - .AddParameter("buffering_steps", messages.Select(message => message.BufferingStep)); + .AddParameter("buffering_steps", messages.Select(message => message.BufferingStep)) + .AddJsonArrayParameter("headers", messages.Select(message => message.Headers)); await command.ExecuteNonQueryAsync(cancellationToken); } diff --git a/src/Itmo.Dev.Platform.MessagePersistence/Exceptions/SerializedMessageExtensions.cs b/src/Itmo.Dev.Platform.MessagePersistence/Exceptions/SerializedMessageExtensions.cs new file mode 100644 index 0000000..0451a40 --- /dev/null +++ b/src/Itmo.Dev.Platform.MessagePersistence/Exceptions/SerializedMessageExtensions.cs @@ -0,0 +1,28 @@ +using Itmo.Dev.Platform.MessagePersistence.Models; +using Itmo.Dev.Platform.MessagePersistence.Tools; +using System.Diagnostics; + +namespace Itmo.Dev.Platform.MessagePersistence.Exceptions; + +internal static class SerializedMessageExtensions +{ + public static IEnumerable GetActivityLinks(this SerializedMessage message) + { + var tags = new ActivityTagsCollection + { + [PlatformMessagePersistenceConstants.MessageIdTagName] = message.Id, + }; + + var traceHeaders = message.Headers.Where(header => header.Key.Equals( + MessagePersistenceConstants.DefaultPublisherName, + StringComparison.OrdinalIgnoreCase)); + + foreach (KeyValuePair header in traceHeaders) + { + if (ActivityContext.TryParse(header.Value, null, out var context)) + { + yield return new ActivityLink(context, tags); + } + } + } +} diff --git a/src/Itmo.Dev.Platform.MessagePersistence/Execution/MessagePersistenceBufferingExecutor.cs b/src/Itmo.Dev.Platform.MessagePersistence/Execution/MessagePersistenceBufferingExecutor.cs index aba4e5b..1b3a7bb 100644 --- a/src/Itmo.Dev.Platform.MessagePersistence/Execution/MessagePersistenceBufferingExecutor.cs +++ b/src/Itmo.Dev.Platform.MessagePersistence/Execution/MessagePersistenceBufferingExecutor.cs @@ -1,9 +1,12 @@ +using Itmo.Dev.Platform.Common.Extensions; using Itmo.Dev.Platform.MessagePersistence.Buffering; using Itmo.Dev.Platform.MessagePersistence.Exceptions; using Itmo.Dev.Platform.MessagePersistence.Models; using Itmo.Dev.Platform.MessagePersistence.Persistence; using Itmo.Dev.Platform.MessagePersistence.Services; +using Itmo.Dev.Platform.MessagePersistence.Tools; using Microsoft.Extensions.DependencyInjection; +using System.Diagnostics; namespace Itmo.Dev.Platform.MessagePersistence.Execution; @@ -46,10 +49,27 @@ public async Task ExecuteAsync( var messagesArray = messages.ToArray(); + using var activity = PlatformMessagePersistenceActivitySource.Value + .StartActivity( + name: PlatformMessagePersistenceConstants.SpanName, + ActivityKind.Internal, + parentContext: default, + tags: new Dictionary + { + [PlatformMessagePersistenceConstants.BufferingStepTagName] = bufferingStepName, + }, + links: messagesArray.SelectMany(message => message.GetActivityLinks())) + .WithDisplayName($"[buffer] {messageName}"); + foreach (SerializedMessage message in messagesArray) { message.State = MessageState.Published; message.BufferingStep = bufferingStepName; + + if (activity is { Id : not null }) + { + message.Headers[PlatformMessagePersistenceConstants.TraceParentHeaderName] = activity.Id; + } } await stepPublisher.PublishAsync(messagesArray, cancellationToken); diff --git a/src/Itmo.Dev.Platform.MessagePersistence/Execution/MessagePersistenceExecutor.cs b/src/Itmo.Dev.Platform.MessagePersistence/Execution/MessagePersistenceExecutor.cs index d7aa94d..0303cfb 100644 --- a/src/Itmo.Dev.Platform.MessagePersistence/Execution/MessagePersistenceExecutor.cs +++ b/src/Itmo.Dev.Platform.MessagePersistence/Execution/MessagePersistenceExecutor.cs @@ -1,8 +1,11 @@ +using Itmo.Dev.Platform.Common.Extensions; +using Itmo.Dev.Platform.MessagePersistence.Exceptions; using Itmo.Dev.Platform.MessagePersistence.Execution.FailureProcessors; using Itmo.Dev.Platform.MessagePersistence.Models; using Itmo.Dev.Platform.MessagePersistence.Options; using Itmo.Dev.Platform.MessagePersistence.Persistence; using Itmo.Dev.Platform.MessagePersistence.Services; +using Itmo.Dev.Platform.MessagePersistence.Tools; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -13,6 +16,8 @@ namespace Itmo.Dev.Platform.MessagePersistence.Execution; +#pragma warning disable CA1506 + internal class MessagePersistenceExecutor : IMessagePersistenceExecutor { private static readonly ConcurrentDictionary<(Type KeyType, Type ValueType), Type> ExecutorTypeCache = []; @@ -84,6 +89,14 @@ public async Task ExecuteAsync( IReadOnlyDictionary serializedMessages, CancellationToken cancellationToken) { + using var activity = PlatformMessagePersistenceActivitySource.Value + .StartActivity( + name: PlatformMessagePersistenceConstants.SpanName, + ActivityKind.Internal, + parentContext: default, + links: serializedMessages.Values.SelectMany(message => message.GetActivityLinks())) + .WithDisplayName($"[handle] {messageName}"); + var handlerOptions = _handlerOptions.Get(messageName); var messages = MapMessages( @@ -148,6 +161,14 @@ public async Task ExecuteAsync( } } + foreach (SerializedMessage serializedMessage in serializedMessages.Values) + { + if (activity is { Id: not null }) + { + serializedMessage.Headers[PlatformMessagePersistenceConstants.TraceParentHeaderName] = activity.Id; + } + } + // Caveat for future implementations of buffering that rely on DB: // If FailureProcessor is configured to throw exceptions and DB buffering implementation opens transaction // before calling the publisher, this call would not be commited. diff --git a/src/Itmo.Dev.Platform.MessagePersistence/Models/SerializedMessage.cs b/src/Itmo.Dev.Platform.MessagePersistence/Models/SerializedMessage.cs index 039c840..26e91a9 100644 --- a/src/Itmo.Dev.Platform.MessagePersistence/Models/SerializedMessage.cs +++ b/src/Itmo.Dev.Platform.MessagePersistence/Models/SerializedMessage.cs @@ -10,6 +10,7 @@ internal class SerializedMessage public MessageState State { get; set; } public int RetryCount { get; set; } public string? BufferingStep { get; set; } + public IDictionary Headers { get; } public SerializedMessage( long id, @@ -19,7 +20,8 @@ public SerializedMessage( string key, string value, int retryCount, - string? bufferingStep) + string? bufferingStep, + IDictionary headers) { Id = id; Name = name; @@ -29,5 +31,6 @@ public SerializedMessage( Value = value; RetryCount = retryCount; BufferingStep = bufferingStep; + Headers = headers; } } diff --git a/src/Itmo.Dev.Platform.MessagePersistence/Services/MessagePersistenceConsumer.cs b/src/Itmo.Dev.Platform.MessagePersistence/Services/MessagePersistenceConsumer.cs index 91057e9..4a3e886 100644 --- a/src/Itmo.Dev.Platform.MessagePersistence/Services/MessagePersistenceConsumer.cs +++ b/src/Itmo.Dev.Platform.MessagePersistence/Services/MessagePersistenceConsumer.cs @@ -1,9 +1,12 @@ using Itmo.Dev.Platform.Common.DateTime; +using Itmo.Dev.Platform.Common.Extensions; using Itmo.Dev.Platform.MessagePersistence.Models; using Itmo.Dev.Platform.MessagePersistence.Persistence; +using Itmo.Dev.Platform.MessagePersistence.Tools; using Itmo.Dev.Platform.Persistence.Abstractions.Transactions; using Newtonsoft.Json; using System.Data; +using System.Diagnostics; using System.Runtime.CompilerServices; namespace Itmo.Dev.Platform.MessagePersistence.Services; @@ -45,6 +48,13 @@ public async IAsyncEnumerable ConsumeInternalAsync( var createdAt = _dateTimeProvider.Current; + using var activity = PlatformMessagePersistenceActivitySource.Value + .StartActivity( + name: PlatformMessagePersistenceConstants.SpanName, + ActivityKind.Internal, + parentContext: default) + .WithDisplayName($"[persist] {messageName}"); + var serializedMessages = messages .Select(message => new SerializedMessage( id: default, @@ -54,7 +64,8 @@ public async IAsyncEnumerable ConsumeInternalAsync( key: JsonConvert.SerializeObject(message.Key, _serializerSettings), value: JsonConvert.SerializeObject(message.Value, _serializerSettings), retryCount: 0, - bufferingStep: null)) + bufferingStep: null, + headers: new Dictionary(EnumerateHeaders()))) .ToArray(); await using var transaction = await _transactionProvider @@ -69,4 +80,14 @@ public async IAsyncEnumerable ConsumeInternalAsync( await transaction.CommitAsync(cancellationToken); } + + private static IEnumerable> EnumerateHeaders() + { + if (Activity.Current is { Id: { } traceId }) + { + yield return new KeyValuePair( + PlatformMessagePersistenceConstants.TraceParentHeaderName, + traceId); + } + } } diff --git a/src/Itmo.Dev.Platform.MessagePersistence/Tools/PlatformMessagePersistenceActivitySource.cs b/src/Itmo.Dev.Platform.MessagePersistence/Tools/PlatformMessagePersistenceActivitySource.cs new file mode 100644 index 0000000..d5046b1 --- /dev/null +++ b/src/Itmo.Dev.Platform.MessagePersistence/Tools/PlatformMessagePersistenceActivitySource.cs @@ -0,0 +1,13 @@ +using System.Diagnostics; + +namespace Itmo.Dev.Platform.MessagePersistence.Tools; + +public static class PlatformMessagePersistenceActivitySource +{ + /// + /// Required to be const, as Observability package depends on compile-time const string inlining + /// + public const string Name = "Itmo.Dev.Platform.MessagePersistence"; + + public static readonly ActivitySource Value = new(Name); +} diff --git a/src/Itmo.Dev.Platform.MessagePersistence/Tools/PlatformMessagePersistenceConstants.cs b/src/Itmo.Dev.Platform.MessagePersistence/Tools/PlatformMessagePersistenceConstants.cs new file mode 100644 index 0000000..c8afdb8 --- /dev/null +++ b/src/Itmo.Dev.Platform.MessagePersistence/Tools/PlatformMessagePersistenceConstants.cs @@ -0,0 +1,12 @@ +namespace Itmo.Dev.Platform.MessagePersistence.Tools; + +public static class PlatformMessagePersistenceConstants +{ + public const string SpanName = "Message Persistence"; + + public const string TraceParentHeaderName = "traceparent"; + + public const string BufferingStepTagName = "step"; + + public const string MessageIdTagName = "message_id"; +} diff --git a/src/Itmo.Dev.Platform.Observability/Extensions/ActivityExtensions.cs b/src/Itmo.Dev.Platform.Observability/Extensions/ActivityExtensions.cs new file mode 100644 index 0000000..2c207cd --- /dev/null +++ b/src/Itmo.Dev.Platform.Observability/Extensions/ActivityExtensions.cs @@ -0,0 +1,18 @@ +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; + +namespace Itmo.Dev.Platform.Observability.Extensions; + +internal static class ActivityExtensions +{ + public static void Suppress(this Activity activity) + { + activity.ActivityTraceFlags &= ~ActivityTraceFlags.Recorded; + } + + public static bool TryGetTag(this Activity activity, string key, [NotNullWhen(true)] out string? value) + { + value = activity.Tags.FirstOrDefault(kvp => kvp.Key == key).Value; + return value is not null; + } +} diff --git a/src/Itmo.Dev.Platform.Observability/Itmo.Dev.Platform.Observability.csproj b/src/Itmo.Dev.Platform.Observability/Itmo.Dev.Platform.Observability.csproj index 667cc64..e90a9f4 100644 --- a/src/Itmo.Dev.Platform.Observability/Itmo.Dev.Platform.Observability.csproj +++ b/src/Itmo.Dev.Platform.Observability/Itmo.Dev.Platform.Observability.csproj @@ -21,6 +21,7 @@ + diff --git a/src/Itmo.Dev.Platform.Observability/Tracing/Processors/DbStatementActivityFilter.cs b/src/Itmo.Dev.Platform.Observability/Tracing/Processors/DbStatementActivityFilter.cs new file mode 100644 index 0000000..e80c75e --- /dev/null +++ b/src/Itmo.Dev.Platform.Observability/Tracing/Processors/DbStatementActivityFilter.cs @@ -0,0 +1,33 @@ +using Itmo.Dev.Platform.Observability.Extensions; +using OpenTelemetry; +using System.Buffers; +using System.Diagnostics; + +namespace Itmo.Dev.Platform.Observability.Tracing.Processors; + +internal sealed class DbStatementActivityFilter : BaseProcessor +{ + private const string DbStatementTagName = "db.statement"; + + private static readonly SearchValues IgnoredStatementParts = SearchValues.Create( + [ + "information_schema", + "\"public\".\"VersionInfo\"", + "\"hangfire\".", + ], + StringComparison.OrdinalIgnoreCase); + + public override void OnEnd(Activity data) + { + if (data is not { OperationName: "postgres", Recorded: true }) + return; + + if (data.TryGetTag(DbStatementTagName, out var statement) is false) + return; + + if (statement.AsSpan().ContainsAny(IgnoredStatementParts)) + { + data.Suppress(); + } + } +} diff --git a/src/Itmo.Dev.Platform.Observability/Tracing/Processors/MetricsActivityFilter.cs b/src/Itmo.Dev.Platform.Observability/Tracing/Processors/MetricsActivityFilter.cs new file mode 100644 index 0000000..90dc495 --- /dev/null +++ b/src/Itmo.Dev.Platform.Observability/Tracing/Processors/MetricsActivityFilter.cs @@ -0,0 +1,22 @@ +using Itmo.Dev.Platform.Observability.Extensions; +using OpenTelemetry; +using System.Diagnostics; + +namespace Itmo.Dev.Platform.Observability.Tracing.Processors; + +internal sealed class MetricsActivityFilter : BaseProcessor +{ + public override void OnEnd(Activity data) + { + if (data.Recorded is false) + return; + + if (data.TryGetTag("url.scheme", out var scheme) is false || scheme is not "http") + return; + + if (data.TryGetTag("url.path", out var path) is false || path is not "/metrics") + return; + + data.Suppress(); + } +} diff --git a/src/Itmo.Dev.Platform.Observability/Tracing/TracingConfigurationPlugin.cs b/src/Itmo.Dev.Platform.Observability/Tracing/TracingConfigurationPlugin.cs index 3982c92..fb910e6 100644 --- a/src/Itmo.Dev.Platform.Observability/Tracing/TracingConfigurationPlugin.cs +++ b/src/Itmo.Dev.Platform.Observability/Tracing/TracingConfigurationPlugin.cs @@ -1,5 +1,7 @@ using Itmo.Dev.Platform.Common.Options; using Itmo.Dev.Platform.Kafka.Tools; +using Itmo.Dev.Platform.MessagePersistence.Tools; +using Itmo.Dev.Platform.Observability.Tracing.Processors; using Microsoft.Extensions.Options; using Npgsql; using OpenTelemetry.Resources; @@ -41,17 +43,21 @@ public void Configure(WebApplicationBuilder builder) tracing .ConfigureResource(x => x.AddService(_platformOptions.ServiceName)) .AddSource(PlatformKafkaActivitySource.Name) + .AddSource(PlatformMessagePersistenceActivitySource.Name) .SetSampler(new AlwaysOnSampler()) .AddAspNetCoreInstrumentation(x => x.RecordException = true) .AddGrpcCoreInstrumentation() .AddGrpcClientInstrumentation() - .AddNpgsql(); + .AddNpgsql() + .AddHttpClientInstrumentation(options => + { + options.FilterHttpRequestMessage = message => message.Version.Major >= 1; + options.RecordException = true; + }); - tracing.AddHttpClientInstrumentation(options => - { - options.FilterHttpRequestMessage = message => message.Version.Major >= 1; - options.RecordException = true; - }); + tracing + .AddProcessor() + .AddProcessor(); foreach (string source in _options.Sources ?? []) { @@ -66,4 +72,4 @@ public void Configure(WebApplicationBuilder builder) _logger.LogInformation("OpenTelemetry tracing initialized"); } -} \ No newline at end of file +} diff --git a/src/Itmo.Dev.Platform.Persistence.Postgres/Extensions/DbDataReaderExtensions.cs b/src/Itmo.Dev.Platform.Persistence.Postgres/Extensions/DbDataReaderExtensions.cs index 4211479..c700069 100644 --- a/src/Itmo.Dev.Platform.Persistence.Postgres/Extensions/DbDataReaderExtensions.cs +++ b/src/Itmo.Dev.Platform.Persistence.Postgres/Extensions/DbDataReaderExtensions.cs @@ -1,7 +1,6 @@ using Newtonsoft.Json; using System.Data; using System.Data.Common; -using System.Diagnostics.CodeAnalysis; namespace Itmo.Dev.Platform.Persistence.Postgres.Extensions; @@ -37,7 +36,7 @@ public static T GetJsonFieldValue( public static T GetJsonFieldValue( this DbDataReader reader, string name, - JsonSerializerSettings serializerSettings) + JsonSerializerSettings? serializerSettings = null) where T : notnull { var serialized = reader.GetString(name); diff --git a/tests/Itmo.Dev.Platform.Kafka.Tests.Startup/Itmo.Dev.Platform.Kafka.Tests.Startup.csproj b/tests/Itmo.Dev.Platform.Kafka.Tests.Startup/Itmo.Dev.Platform.Kafka.Tests.Startup.csproj index 18a5388..50f2b7c 100644 --- a/tests/Itmo.Dev.Platform.Kafka.Tests.Startup/Itmo.Dev.Platform.Kafka.Tests.Startup.csproj +++ b/tests/Itmo.Dev.Platform.Kafka.Tests.Startup/Itmo.Dev.Platform.Kafka.Tests.Startup.csproj @@ -3,10 +3,12 @@ + + diff --git a/tests/Itmo.Dev.Platform.Kafka.Tests.Startup/Program.cs b/tests/Itmo.Dev.Platform.Kafka.Tests.Startup/Program.cs index 54dac73..ae31526 100644 --- a/tests/Itmo.Dev.Platform.Kafka.Tests.Startup/Program.cs +++ b/tests/Itmo.Dev.Platform.Kafka.Tests.Startup/Program.cs @@ -1,10 +1,21 @@ using Itmo.Dev.Platform.Common.Extensions; +using Itmo.Dev.Platform.Observability; using Microsoft.Extensions.Options; using Newtonsoft.Json; +using OpenTelemetry.Trace; using Serilog; var builder = WebApplication.CreateBuilder(args); +builder.Configuration.AddInMemoryCollection(new Dictionary +{ + ["Platform:ServiceName"] = "Test", + ["Platform:Observability:Tracing:IsEnabled"] = "true", +}); + +builder.AddPlatformObservability(); +builder.Services.AddOpenTelemetry().WithTracing(tracing => tracing.AddConsoleExporter()); + builder.Services.AddUtcDateTimeProvider(); builder.Services.AddSingleton(new JsonSerializerSettings()); builder.Services.AddLogging(x => x.AddSerilog()); @@ -16,6 +27,8 @@ var app = builder.Build(); +app.UsePlatformObservability(); + app.Run(); -public partial class Program; \ No newline at end of file +public partial class Program; diff --git a/tests/Itmo.Dev.Platform.Kafka.Tests/KafkaProducerTests.cs b/tests/Itmo.Dev.Platform.Kafka.Tests/KafkaProducerTests.cs index 365ee4f..301a5b3 100644 --- a/tests/Itmo.Dev.Platform.Kafka.Tests/KafkaProducerTests.cs +++ b/tests/Itmo.Dev.Platform.Kafka.Tests/KafkaProducerTests.cs @@ -1,12 +1,17 @@ using Confluent.Kafka; using FluentAssertions; +using Itmo.Dev.Platform.Kafka.Consumer; using Itmo.Dev.Platform.Kafka.Extensions; using Itmo.Dev.Platform.Kafka.Producer; using Itmo.Dev.Platform.Kafka.Tests.Extensions; using Itmo.Dev.Platform.Kafka.Tests.Fixtures; +using Itmo.Dev.Platform.Kafka.Tests.Tools; using Itmo.Dev.Platform.Kafka.Tools; +using Itmo.Dev.Platform.Testing.ApplicationFactories; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; using Serilog; using Xunit; using Xunit.Abstractions; @@ -74,7 +79,7 @@ public async Task ProduceAsync_ShouldWriteMessage(KafkaProducerMessage opt .Including(x => x.Key) .Including(x => x.Value))); - + // Dispose consumer.Close(); } @@ -137,4 +142,4 @@ public Task DisposeAsync() { return Task.CompletedTask; } -} \ No newline at end of file +} diff --git a/tests/Itmo.Dev.Platform.Kafka.Tests/MessagePersistenceKafkaBufferingTests.cs b/tests/Itmo.Dev.Platform.Kafka.Tests/MessagePersistenceKafkaBufferingTests.cs index 6d12a49..fb80ffb 100644 --- a/tests/Itmo.Dev.Platform.Kafka.Tests/MessagePersistenceKafkaBufferingTests.cs +++ b/tests/Itmo.Dev.Platform.Kafka.Tests/MessagePersistenceKafkaBufferingTests.cs @@ -161,7 +161,9 @@ public async Task Consume_ShouldHandleMessageThroughBuffer_WhenKafkaBufferingCon kafkaConsumeResult.Message.Value.Message .Should() - .BeEquivalentTo(serializedMessage, options => options.Excluding(message => message.State)); + .BeEquivalentTo(serializedMessage, options => options + .Excluding(message => message.State) + .Excluding(message => message.Headers)); } [Fact] @@ -284,13 +286,16 @@ await testContext.Message .BeEquivalentTo(serializedMessage, options => options .Excluding(message => message.State) - .Excluding(message => message.RetryCount)); + .Excluding(message => message.RetryCount) + .Excluding(message => message.Headers)); firstMessageConsumeResult.Message.Value.Message.RetryCount.Should().Be(0); secondMessageConsumeResult.Message.Value.Message .Should() - .BeEquivalentTo(serializedMessage, options => options.Excluding(message => message.State)); + .BeEquivalentTo(serializedMessage, options => options + .Excluding(message => message.State) + .Excluding(message => message.Headers)); } [Fact] @@ -422,7 +427,8 @@ await testContext.Message .BeEquivalentTo(serializedMessage, options => options .Excluding(message => message.State) - .Excluding(message => message.RetryCount)); + .Excluding(message => message.RetryCount) + .Excluding(message => message.Headers)); firstMessageConsumeResult.Message.Value.Message.RetryCount.Should().Be(0); } diff --git a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Controllers/HelloController.cs b/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Controllers/HelloController.cs deleted file mode 100644 index d94365d..0000000 --- a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Controllers/HelloController.cs +++ /dev/null @@ -1,16 +0,0 @@ -using Grpc.Core; - -namespace Itmo.Dev.Platform.Observability.Tests.Startup.Controllers; - -public class HelloController : HelloService.HelloServiceBase -{ - public override Task Hello(HelloRequest request, ServerCallContext context) - { - var response = new HelloResponse - { - Message = $"Hello, {request.Name}!", - }; - - return Task.FromResult(response); - } -} diff --git a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Itmo.Dev.Platform.Observability.Tests.Startup.csproj b/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Itmo.Dev.Platform.Observability.Tests.Startup.csproj deleted file mode 100644 index 986ec66..0000000 --- a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Itmo.Dev.Platform.Observability.Tests.Startup.csproj +++ /dev/null @@ -1,20 +0,0 @@ - - - - - - - - - - - - - - - - - - - - diff --git a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Models/SpanInfo.cs b/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Models/SpanInfo.cs deleted file mode 100644 index f906864..0000000 --- a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Models/SpanInfo.cs +++ /dev/null @@ -1,3 +0,0 @@ -namespace Itmo.Dev.Platform.Observability.Tests.Startup.Models; - -public sealed record SpanInfo(string SpanId, object Context); diff --git a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Program.cs b/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Program.cs deleted file mode 100644 index 5269b7c..0000000 --- a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Program.cs +++ /dev/null @@ -1,91 +0,0 @@ -#pragma warning disable CA1506 -using Itmo.Dev.Platform.Common.Extensions; -using Itmo.Dev.Platform.Grpc.Clients; -using Itmo.Dev.Platform.Grpc.Services; -using Itmo.Dev.Platform.Observability; -using Itmo.Dev.Platform.Observability.Tests; -using Itmo.Dev.Platform.Observability.Tests.Startup; -using Itmo.Dev.Platform.Observability.Tests.Startup.Controllers; -using Itmo.Dev.Platform.Observability.Tests.Startup.Tools; -using Microsoft.AspNetCore.Mvc; -using Microsoft.AspNetCore.Server.Kestrel.Core; -using Newtonsoft.Json; -using OpenTelemetry.Trace; - -var builder = WebApplication.CreateBuilder(args); - -bool isGrpcServer = - bool.Parse(Environment.GetEnvironmentVariable(TestingConstants.IsGrpcServiceEnvVariableName) ?? "false"); - -builder.Configuration.AddInMemoryCollection(new Dictionary -{ - ["Platform:ServiceName"] = "test", - ["Platform:Observability:Tracing:IsEnabled"] = "true", -}); - -builder.Services.AddUtcDateTimeProvider(); -builder.Services.AddSingleton(new JsonSerializerSettings()); -builder.Services.AddSingleton(); - -builder.Services.AddPlatform(); - -builder.Services.AddOpenTelemetry().WithTracing(x => x.AddConsoleExporter()); - -if (isGrpcServer) -{ - builder.Services.AddPlatformGrpcServices(grpc => grpc - .AddInterceptor()); - - builder.WebHost.UseUrls(TestingConstants.GrpcServiceUrl); - builder.WebHost.UseKestrel(x => x.ConfigureEndpointDefaults(e => e.Protocols = HttpProtocols.Http2)); -} -else -{ - builder.AddPlatformObservability(observability => observability - .AddTracingPlugin()); - - builder.Services.AddPlatformGrpcClients(grpc => grpc - .AddService(service => service - .Called("test") - .WithConfiguration("TestServer") - .WithClient() - .WithInterceptor())); - - builder.Services.AddOpenTelemetry() - .WithTracing(tracing => tracing - .AddProcessor(p => p.GetRequiredService())); - - builder.Configuration.AddInMemoryCollection(new Dictionary - { - ["TestServer:Address"] = TestingConstants.GrpcServiceUrl, - }); - - builder.WebHost.UseUrls(TestingConstants.HttpServiceUrl); -} - -WebApplication app = builder.Build(); - -app.UseRouting(); - -if (isGrpcServer) -{ - app.MapGrpcReflectionService(); - app.MapGrpcService(); -} -else -{ - app.MapGet("hello", - async ([FromQuery] string name, [FromServices] HelloService.HelloServiceClient client) => - { - var request = new HelloRequest { Name = name }; - var response = await client.HelloAsync(request); - - return Results.Ok(response.Message); - }); -} - -app.UsePlatformObservability(); - -app.Run(); - -public partial class Program; diff --git a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/TestingConstants.cs b/tests/Itmo.Dev.Platform.Observability.Tests.Startup/TestingConstants.cs deleted file mode 100644 index fa24157..0000000 --- a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/TestingConstants.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace Itmo.Dev.Platform.Observability.Tests.Startup; - -public static class TestingConstants -{ - public const string IsGrpcServiceEnvVariableName = "ITMO_PLATFORM_IS_GRPC_SERVICE"; - public const string SpanIdFileVariableName = "ITMO_PLATFORM_SPAN_FILE"; - - public const string GrpcServiceUrl = "http://127.0.0.1:8080"; - public const string HttpServiceUrl = "http://127.0.0.1:8081"; -} diff --git a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Tools/DebugClientInterceptor.cs b/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Tools/DebugClientInterceptor.cs deleted file mode 100644 index 634a47c..0000000 --- a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Tools/DebugClientInterceptor.cs +++ /dev/null @@ -1,32 +0,0 @@ -using Grpc.Core; -using Grpc.Core.Interceptors; - -namespace Itmo.Dev.Platform.Observability.Tests.Startup.Tools; - -public sealed class DebugClientInterceptor : Interceptor -{ - private readonly ILogger _logger; - - public DebugClientInterceptor(ILogger logger) - { - _logger = logger; - } - - public override AsyncUnaryCall AsyncUnaryCall( - TRequest request, - ClientInterceptorContext context, - AsyncUnaryCallContinuation continuation) - { - AsyncUnaryCall call = continuation(request, context); - call.ResponseHeadersAsync.ContinueWith(async task => - { - Metadata metadata = await task; - - _logger.LogInformation( - "Response metadata: {Value}", - string.Join(", ", metadata.Select(x => $"{x.Key}: {x.Value}"))); - }); - - return call; - } -} diff --git a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Tools/ParentSpanInterceptor.cs b/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Tools/ParentSpanInterceptor.cs deleted file mode 100644 index f036096..0000000 --- a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Tools/ParentSpanInterceptor.cs +++ /dev/null @@ -1,26 +0,0 @@ -using Grpc.Core; -using Grpc.Core.Interceptors; -using System.Diagnostics; - -namespace Itmo.Dev.Platform.Observability.Tests.Startup.Tools; - -public class ParentSpanInterceptor : Interceptor -{ - public static string? Span { get; private set; } - - public override async Task UnaryServerHandler( - TRequest request, - ServerCallContext context, - UnaryServerMethod continuation) - { - var header = context.RequestHeaders.Single(x => x.Key == "traceparent"); - Span = header.Value.Split('-')[2]; - Console.WriteLine($"Span: {Span}"); - - await File.WriteAllTextAsync( - Environment.GetEnvironmentVariable(TestingConstants.SpanIdFileVariableName) ?? string.Empty, - Span); - - return await continuation(request, context); - } -} diff --git a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Tools/TestTracingPlugin.cs b/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Tools/TestTracingPlugin.cs deleted file mode 100644 index 6637fcf..0000000 --- a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Tools/TestTracingPlugin.cs +++ /dev/null @@ -1,12 +0,0 @@ -using Itmo.Dev.Platform.Observability.Tracing; -using OpenTelemetry.Trace; - -namespace Itmo.Dev.Platform.Observability.Tests.Startup.Tools; - -public class TestTracingPlugin : ITracingConfigurationPlugin -{ - public void Configure(WebApplicationBuilder applicationBuilder, TracerProviderBuilder tracerBuilder) - { - tracerBuilder.AddProcessor(p => p.GetRequiredService()); - } -} diff --git a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Tools/TestingActivityProcessor.cs b/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Tools/TestingActivityProcessor.cs deleted file mode 100644 index aaf02ce..0000000 --- a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/Tools/TestingActivityProcessor.cs +++ /dev/null @@ -1,26 +0,0 @@ -using Itmo.Dev.Platform.Observability.Tests.Startup.Models; -using Newtonsoft.Json; -using OpenTelemetry; -using System.Diagnostics; - -namespace Itmo.Dev.Platform.Observability.Tests.Startup.Tools; - -public class TestingActivityProcessor : BaseProcessor -{ - private readonly List _activities = []; - - public IReadOnlyCollection Activities => _activities; - - public override void OnStart(Activity data) - { - var spanInfo = new SpanInfo( - data.SpanId.ToString(), - data); - - File.AppendAllText( - Environment.GetEnvironmentVariable(TestingConstants.SpanIdFileVariableName) ?? string.Empty, - JsonConvert.SerializeObject(spanInfo) + Environment.NewLine); - - _activities.Add(data); - } -} diff --git a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/protos/hello.proto b/tests/Itmo.Dev.Platform.Observability.Tests.Startup/protos/hello.proto deleted file mode 100644 index 2ab19e4..0000000 --- a/tests/Itmo.Dev.Platform.Observability.Tests.Startup/protos/hello.proto +++ /dev/null @@ -1,16 +0,0 @@ -syntax = "proto3"; - -package itmo.dev.platform.observability.tests; -option csharp_namespace = "Itmo.Dev.Platform.Observability.Tests"; - -service HelloService { - rpc Hello(HelloRequest) returns (HelloResponse); -} - -message HelloRequest { - string name = 1; -} - -message HelloResponse { - string message = 1; -} \ No newline at end of file diff --git a/tests/Itmo.Dev.Platform.Observability.Tests/GrpcHeaderPropagationTests.cs b/tests/Itmo.Dev.Platform.Observability.Tests/GrpcHeaderPropagationTests.cs deleted file mode 100644 index d96b59f..0000000 --- a/tests/Itmo.Dev.Platform.Observability.Tests/GrpcHeaderPropagationTests.cs +++ /dev/null @@ -1,160 +0,0 @@ -using FluentAssertions; -using Itmo.Dev.Platform.Grpc.Clients; -using Itmo.Dev.Platform.Grpc.Services; -using Itmo.Dev.Platform.Observability.Tests.Startup; -using Itmo.Dev.Platform.Observability.Tests.Startup.Controllers; -using Itmo.Dev.Platform.Observability.Tests.Startup.Models; -using Itmo.Dev.Platform.Observability.Tests.Startup.Tools; -using Itmo.Dev.Platform.Testing.ApplicationFactories; -using Microsoft.AspNetCore.Builder; -using Microsoft.AspNetCore.Hosting; -using Microsoft.AspNetCore.Http; -using Microsoft.AspNetCore.Mvc; -using Microsoft.AspNetCore.Server.Kestrel.Core; -using Microsoft.Extensions.DependencyInjection; -using Newtonsoft.Json; -using OpenTelemetry.Trace; -using Serilog; -using System.Diagnostics; -using System.Text; -using Xunit; -using Xunit.Abstractions; - -namespace Itmo.Dev.Platform.Observability.Tests; - -#pragma warning disable CA1506 - -public class GrpcHeaderPropagationTests -{ - private readonly ITestOutputHelper _output; - - public GrpcHeaderPropagationTests(ITestOutputHelper output) - { - _output = output; - - Log.Logger = new LoggerConfiguration() - .WriteTo.TestOutput(output) - .CreateLogger(); - } - - [Fact] - public async Task HelloAsync_ShouldHaveSameTraceParent_WhenCalledFromHttpService() - { - // Arrange - var assemblyLocation = typeof(TestingConstants).Assembly.Location; - - var grpcServiceSpanFileName = Path.GetTempFileName(); - var httpServiceSpanFileName = Path.GetTempFileName(); - - using var grpcServiceProcess = new Process(); - using var httpServiceProcess = new Process(); - - grpcServiceProcess.StartInfo = new ProcessStartInfo("dotnet", [assemblyLocation]) - { - RedirectStandardOutput = true, - RedirectStandardError = true, - EnvironmentVariables = - { - [TestingConstants.IsGrpcServiceEnvVariableName] = "true", - [TestingConstants.SpanIdFileVariableName] = grpcServiceSpanFileName, - }, - }; - - httpServiceProcess.StartInfo = new ProcessStartInfo("dotnet", [assemblyLocation]) - { - RedirectStandardOutput = true, - RedirectStandardError = true, - EnvironmentVariables = - { - [TestingConstants.IsGrpcServiceEnvVariableName] = "false", - [TestingConstants.SpanIdFileVariableName] = httpServiceSpanFileName, - }, - }; - - httpServiceProcess.Start(); - grpcServiceProcess.Start(); - - using var grpcServiceLogs = new ProcessLogCollector("GRPC", grpcServiceProcess, _output); - using var httpServiceLogs = new ProcessLogCollector("HTTP", httpServiceProcess, _output); - - await Task.Delay(TimeSpan.FromMilliseconds(500)); - - using var client = new HttpClient(); - - // Act - await client.GetAsync($"{TestingConstants.HttpServiceUrl}/hello?name=ronimizy"); - grpcServiceProcess.Kill(); - httpServiceProcess.Kill(); - - // Assert - var grpcServiceSpans = await File.ReadAllLinesAsync(grpcServiceSpanFileName); - var httpServiceSpansText = await File.ReadAllLinesAsync(httpServiceSpanFileName); - - var httpServiceSpanInfos = httpServiceSpansText - .Select(JsonConvert.DeserializeObject) - .ToArray(); - - var httpServiceSpans = httpServiceSpanInfos.Select(x => x?.SpanId).ToArray(); - - File.Delete(grpcServiceSpanFileName); - File.Delete(httpServiceSpanFileName); - - Log.Information("Grpc span ids: {SpanIds}", string.Join(", ", grpcServiceSpans)); - Log.Information("Http span ids: {SpanIds}", string.Join(", ", httpServiceSpansText)); - - grpcServiceSpans.Should().AllSatisfy(grpcSpan => httpServiceSpans.Should().Contain(grpcSpan)); - } - - private class ProcessLogCollector : IDisposable - { - private readonly StringBuilder _builder; - private readonly ITestOutputHelper _output; - private readonly string _serviceName; - - private readonly Lock _lock = new(); - - public ProcessLogCollector(string serviceName, Process process, ITestOutputHelper output) - { - _serviceName = serviceName; - _output = output; - _builder = new StringBuilder(); - - _ = WriteStream(process.StandardOutput); - _ = WriteStream(process.StandardError); - } - - private async Task WriteStream(StreamReader reader) - { - try - { - while (true) - { - string? line = await reader.ReadLineAsync(); - - if (line is null) - break; - - lock (_lock) - { - _builder.AppendLine(line); - } - } - } - catch - { - // Ignore error - } - } - - public void Dispose() - { - const int paddingSize = 10; - var startPadding = string.Join("", Enumerable.Repeat('=', paddingSize)); - var endPadding = string.Join("", Enumerable.Repeat('=', (paddingSize * 2) + 4 + _serviceName.Length)); - - _output.WriteLine($"{startPadding} {_serviceName} {startPadding}"); - _output.WriteLine(_builder.ToString()); - _output.WriteLine(endPadding); - } - } -} diff --git a/tests/Itmo.Dev.Platform.Observability.Tests/Itmo.Dev.Platform.Observability.Tests.csproj b/tests/Itmo.Dev.Platform.Observability.Tests/Itmo.Dev.Platform.Observability.Tests.csproj deleted file mode 100644 index 6dbe4ac..0000000 --- a/tests/Itmo.Dev.Platform.Observability.Tests/Itmo.Dev.Platform.Observability.Tests.csproj +++ /dev/null @@ -1,28 +0,0 @@ - - - - net9.0 - - - - false - true - - - - $(NoWarn);CA1707;CA1002; - - - - - - - - - - - - - - -