Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions Itmo.Dev.Platform.sln
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Locking", "Locking", "{49FF
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Itmo.Dev.Platform.Locking.Redis", "src\Itmo.Dev.Platform.Locking.Redis\Itmo.Dev.Platform.Locking.Redis.csproj", "{4C5DF002-21D7-4C9D-BAFD-748F672A0C62}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Itmo.Dev.Platform.Observability.Tests", "tests\Itmo.Dev.Platform.Observability.Tests\Itmo.Dev.Platform.Observability.Tests.csproj", "{A57ECCE4-255C-4357-90D8-5F8A43275AD9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Itmo.Dev.Platform.Observability.Tests.Startup", "tests\Itmo.Dev.Platform.Observability.Tests.Startup\Itmo.Dev.Platform.Observability.Tests.Startup.csproj", "{2CD6ACA3-3385-4CB0-86FE-BA9270735311}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -244,14 +240,6 @@ Global
{4C5DF002-21D7-4C9D-BAFD-748F672A0C62}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4C5DF002-21D7-4C9D-BAFD-748F672A0C62}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4C5DF002-21D7-4C9D-BAFD-748F672A0C62}.Release|Any CPU.Build.0 = Release|Any CPU
{A57ECCE4-255C-4357-90D8-5F8A43275AD9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A57ECCE4-255C-4357-90D8-5F8A43275AD9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A57ECCE4-255C-4357-90D8-5F8A43275AD9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A57ECCE4-255C-4357-90D8-5F8A43275AD9}.Release|Any CPU.Build.0 = Release|Any CPU
{2CD6ACA3-3385-4CB0-86FE-BA9270735311}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2CD6ACA3-3385-4CB0-86FE-BA9270735311}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2CD6ACA3-3385-4CB0-86FE-BA9270735311}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2CD6ACA3-3385-4CB0-86FE-BA9270735311}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{9C9FC028-1F10-42B3-8938-A3EAE0D07C1F} = {65E3EDFE-875D-42A1-A224-E06E746B8A5E}
Expand Down Expand Up @@ -292,7 +280,5 @@ Global
{49FF216F-A911-4470-82A3-B4285FDA3D23} = {8B6CE8C2-3319-44B9-A8B7-19F5F94B911D}
{D936C0C2-FCA9-4C53-83B4-25CC52A748DF} = {49FF216F-A911-4470-82A3-B4285FDA3D23}
{4C5DF002-21D7-4C9D-BAFD-748F672A0C62} = {49FF216F-A911-4470-82A3-B4285FDA3D23}
{A57ECCE4-255C-4357-90D8-5F8A43275AD9} = {B24A2836-A847-40AB-85A2-6590ABA8ACB2}
{2CD6ACA3-3385-4CB0-86FE-BA9270735311} = {B24A2836-A847-40AB-85A2-6590ABA8ACB2}
EndGlobalSection
EndGlobal
16 changes: 16 additions & 0 deletions src/Itmo.Dev.Platform.Common/Extensions/ActivityExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System.Diagnostics;

namespace Itmo.Dev.Platform.Common.Extensions;

public static class ActivityExtensions
{
public static Activity? WithDisplayName(this Activity? activity, string displayName)
{
if (activity is not null)
{
activity.DisplayName = displayName;
}

return activity;
}
}
10 changes: 10 additions & 0 deletions src/Itmo.Dev.Platform.Kafka/Consumer/Inbox/InboxConsumerHandler.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using Itmo.Dev.Platform.Common.Extensions;
using Itmo.Dev.Platform.Common.Models;
using Itmo.Dev.Platform.Kafka.Consumer.Models;
using Itmo.Dev.Platform.MessagePersistence;
using Itmo.Dev.Platform.MessagePersistence.Tools;
using System.Diagnostics;

namespace Itmo.Dev.Platform.Kafka.Consumer.Inbox;

Expand All @@ -26,6 +29,13 @@ public async ValueTask HandleAsync(
throw new InvalidOperationException(
$"Failed to write inbox message, invalid message type, expected = {expectedType}. You probably attempting some unaccounted platform tampering");
}

using var activity = PlatformMessagePersistenceActivitySource.Value
.StartActivity(
name: PlatformMessagePersistenceConstants.SpanName,
ActivityKind.Internal,
parentContext: default)
.WithDisplayName($"[inbox] {_messageName}");

var persistedMessages = consumerMessages
.Select(message => new PersistedMessage<Unit, KafkaConsumerMessage<TKey, TValue>>(Unit.Value, message))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Confluent.Kafka;
using Newtonsoft.Json;
using System.Text;

namespace Itmo.Dev.Platform.Kafka.Consumer.Models;

Expand All @@ -19,6 +20,10 @@ public KafkaConsumerMessage(IConsumer<TKey, TValue> consumer, ConsumeResult<TKey
Topic = result.Topic;
Partition = result.Partition;
Offset = result.Offset;

Headers = result.Message.Headers.ToDictionary(
header => header.Key,
header => Encoding.UTF8.GetString(header.GetValueBytes()));
}

/// <summary>
Expand All @@ -31,13 +36,15 @@ private KafkaConsumerMessage(
ConsumeResult<TKey, TValue> result,
TKey key,
TValue value,
string topic)
string topic,
Dictionary<string, string> headers)
{
_consumer = consumer;
_result = result;
Key = key;
Value = value;
Topic = topic;
Headers = headers;
}

public TKey Key { get; }
Expand All @@ -52,6 +59,8 @@ private KafkaConsumerMessage(

public Offset Offset { get; }

public Dictionary<string, string> Headers { get; }

public void Commit()
{
_consumer.Commit(_result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,12 @@ private async Task ExecuteSingleAsync(

var messageHandler = new ConsumerMessageHandler<TKey, TValue>(
consumerOptions,
_scopeFactory);
_scopeFactory,
logger: _serviceProvider.GetRequiredService<ILogger<ConsumerMessageHandler<TKey, TValue>>>());

await ParallelAction.ExecuteAsync(
cancellationToken,
new ParallelAction(1, c => messageReader.ReadAsync(channel.Writer, c)),
new ParallelAction(consumerOptions.ParallelismDegree, c => messageHandler.HandleAsync(channel.Reader, c)));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using Itmo.Dev.Platform.Common.Extensions;
using Itmo.Dev.Platform.Kafka.Consumer.Models;
using Itmo.Dev.Platform.Kafka.Tools;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System.Diagnostics;
using System.Threading.Channels;

Expand All @@ -10,11 +12,16 @@ internal class ConsumerMessageHandler<TKey, TValue>
{
private readonly KafkaConsumerOptions _consumerOptions;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<ConsumerMessageHandler<TKey, TValue>> _logger;

public ConsumerMessageHandler(KafkaConsumerOptions consumerOptions, IServiceScopeFactory scopeFactory)
public ConsumerMessageHandler(
KafkaConsumerOptions consumerOptions,
IServiceScopeFactory scopeFactory,
ILogger<ConsumerMessageHandler<TKey, TValue>> logger)
{
_consumerOptions = consumerOptions;
_scopeFactory = scopeFactory;
_logger = logger;
}

public async Task HandleAsync(
Expand All @@ -38,9 +45,13 @@ public async Task HandleAsync(
.Where(x => x.Value is not null)
.ToArray();

using var activity = PlatformKafkaActivitySource.Value.StartActivity(
name: $"consume: {_consumerOptions.Topic}",
ActivityKind.Consumer);
using var activity = PlatformKafkaActivitySource.Value
.StartActivity(
name: "Kafka",
ActivityKind.Consumer,
parentContext: default,
links: [..EnumerateActivityLinks(chunk)])
.WithDisplayName($"[consume] {_consumerOptions.Topic}");

await handler.HandleAsync(consumerMessages, cancellationToken);

Expand All @@ -50,4 +61,35 @@ public async Task HandleAsync(
}
}
}

private IEnumerable<ActivityLink> EnumerateActivityLinks(
IEnumerable<KafkaConsumerMessage<TKey, TValue>> messages)
{
var traceParentHeaders = messages
.SelectMany(message => message.Headers, (message, header) => (message, header))
.Where(tuple => tuple.header.Key.Equals(
PlatformKafkaConstants.TraceParentHeaderName,
StringComparison.OrdinalIgnoreCase));

foreach (var (message, header) in traceParentHeaders)
{
if (ActivityContext.TryParse(header.Value, null, out var context))
{
var tags = new ActivityTagsCollection
{
[PlatformKafkaConstants.TopicTagName] = message.Topic,
[PlatformKafkaConstants.PartitionTagName] = message.Partition.Value,
[PlatformKafkaConstants.OffsetTagName] = message.Offset.Value,
};

yield return new ActivityLink(context, tags);
}
else
{
_logger.LogWarning(
"Failed to parse activity context = '{TraceParent}'",
header.Value);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using Itmo.Dev.Platform.Common.Extensions;
using Itmo.Dev.Platform.MessagePersistence;
using Itmo.Dev.Platform.MessagePersistence.Tools;
using System.Diagnostics;

namespace Itmo.Dev.Platform.Kafka.Producer.Outbox;

Expand All @@ -17,6 +20,13 @@ public async Task ProduceAsync(
IAsyncEnumerable<KafkaProducerMessage<TKey, TValue>> messages,
CancellationToken cancellationToken)
{
using var activity = PlatformMessagePersistenceActivitySource.Value
.StartActivity(
name: PlatformMessagePersistenceConstants.SpanName,
ActivityKind.Internal,
parentContext: default)
.WithDisplayName($"[outbox] {_topicName}");

var persistedMessages = await messages
.Select(x => new PersistedMessage<TKey, TValue>(x.Key, x.Value))
.ToArrayAsync(cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using Itmo.Dev.Platform.Common.Extensions;
using Itmo.Dev.Platform.MessagePersistence;
using Itmo.Dev.Platform.MessagePersistence.Tools;
using Microsoft.Extensions.DependencyInjection;
using System.Diagnostics;

namespace Itmo.Dev.Platform.Kafka.Producer.Outbox;

Expand All @@ -21,13 +24,20 @@ public async Task ProduceAsync(
CancellationToken cancellationToken)
{
var messagesArray = await messages.ToArrayAsync(cancellationToken);

try
{
await _producer.ProduceAsync(messagesArray.ToAsyncEnumerable(), cancellationToken);
}
catch
{
using var activity = PlatformMessagePersistenceActivitySource.Value
.StartActivity(
name: PlatformMessagePersistenceConstants.SpanName,
ActivityKind.Internal,
parentContext: default)
.WithDisplayName($"[outbox] {_topicName}");

var persistedMessages = messagesArray
.Select(x => new PersistedMessage<TKey, TValue>(x.Key, x.Value))
.ToArray();
Expand All @@ -38,4 +48,4 @@ await _consumer.ConsumeAsync(
cancellationToken);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using Confluent.Kafka;
using Itmo.Dev.Platform.Common.Extensions;
using Itmo.Dev.Platform.Kafka.Configuration;
using Itmo.Dev.Platform.Kafka.Tools;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Diagnostics;
using System.Text;

namespace Itmo.Dev.Platform.Kafka.Producer.Services;

Expand Down Expand Up @@ -52,9 +54,14 @@ public async Task ProduceAsync(
IAsyncEnumerable<KafkaProducerMessage<TKey, TValue>> messages,
CancellationToken cancellationToken)
{
using var activity = PlatformKafkaActivitySource.Value.StartActivity(
name: $"produce: {_options.Topic}",
ActivityKind.Producer);
using var activity = PlatformKafkaActivitySource.Value
.StartActivity(
name: "Kafka",
ActivityKind.Producer,
parentContext: default)
.WithDisplayName($"[produce] {_options.Topic}");

Headers headers = [..EnumerateHeaders()];

try
{
Expand All @@ -64,6 +71,7 @@ public async Task ProduceAsync(
{
Key = producerKafkaMessage.Key,
Value = producerKafkaMessage.Value,
Headers = headers,
};

await _producer.ProduceAsync(_options.Topic, message, cancellationToken);
Expand All @@ -80,4 +88,12 @@ public void Dispose()
{
_producer.Dispose();
}

private static IEnumerable<Header> EnumerateHeaders()
{
if (Activity.Current is { Id: { } traceId })
{
yield return new Header(PlatformKafkaConstants.TraceParentHeaderName, Encoding.UTF8.GetBytes(traceId));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ namespace Itmo.Dev.Platform.Kafka.Tools;

internal static class PlatformKafkaActivitySource
{
/// <summary>
/// Required to be const, as Observability package depends on compile-time const string inlining
/// </summary>
public const string Name = "Itmo.Dev.Platform.Kafka";

public static readonly ActivitySource Value = new(Name);
Expand Down
12 changes: 12 additions & 0 deletions src/Itmo.Dev.Platform.Kafka/Tools/PlatformKafkaConstants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Itmo.Dev.Platform.Kafka.Tools;

public static class PlatformKafkaConstants
{
public const string TraceParentHeaderName = "traceparent";

public const string TopicTagName = "topic";

public const string PartitionTagName = "partition";

public const string OffsetTagName = "offset";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using FluentMigrator;
using Itmo.Dev.Platform.MessagePersistence.Postgres.Configuration;
using Itmo.Dev.Platform.Persistence.Postgres.Migrations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;

namespace Itmo.Dev.Platform.MessagePersistence.Postgres.Migrations;

[Migration(1763246858, "Added message headers")]
public sealed class AddedMessageHeaders : SqlMigration
{
protected override string GetUpSql(IServiceProvider serviceProvider)
{
var options = serviceProvider.GetRequiredService<IOptions<MessagePersistencePostgresOptions>>();

return $"""
ALTER TABLE {options.Value.SchemaName}.persisted_messages
ADD COLUMN persisted_message_headers jsonb null;
""";
}

protected override string GetDownSql(IServiceProvider serviceProvider)
{
var options = serviceProvider.GetRequiredService<IOptions<MessagePersistencePostgresOptions>>();

return $"""
ALTER TABLE {options.Value.SchemaName}.persisted_messages
DROP COLUMN persisted_message_headers;
""";
}
}
Loading
Loading