Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@ public class OutboxEntityConfiguration : IEntityTypeConfiguration<OutboxEntity>
public void Configure(EntityTypeBuilder<OutboxEntity> builder)
{
builder.HasKey(x => x.Id);
//builder.HasIndex(x => x.ParentId).IsUnique(); todo: nullability problem

builder.HasOne(x => x.Parent)
.WithOne()
.HasForeignKey<OutboxEntity>(x => x.ParentId)
.IsRequired(false);

builder.HasIndex(x => x.Version);
builder.HasIndex(x => x.HandlerLock);
Expand All @@ -30,4 +24,4 @@ public void Configure(EntityTypeBuilder<OutboxDataEntity> builder)
builder.HasKey(x => x.Id);
builder.HasIndex(x => x.ScopeId);
}
}
}
7 changes: 2 additions & 5 deletions GenericOutbox/DataAccess/Entities/OutboxEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ namespace GenericOutbox.DataAccess.Entities;
public class OutboxEntity
{
public int Id { get; set; }
public Guid? Lock { get; set; }
public Guid Lock { get; set; }
public Guid? HandlerLock { get; set; }
public int? ParentId { get; set; }
public Guid ScopeId { get; set; }
public string Action { get; set; }
public byte[] Payload { get; set; }
Expand All @@ -21,7 +20,5 @@ public class OutboxEntity

public string? Metadata { get; set; }

public virtual OutboxEntity Parent { get; set; }

public string PayloadString => Encoding.UTF8.GetString(Payload);
}
}
136 changes: 48 additions & 88 deletions GenericOutbox/DataAccess/Services/OutboxDataAccess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class OutboxDataAccess<TDbContext>(TDbContext dbContext, OutboxOptions ou
OutboxRecordStatus.Completed
};

private int _rollingOutboxQueryType = 0; //0 = non-locked, 1 = locked
private int _rollingOutboxQueryType = 0;

public async Task CommitExecutionResult(OutboxEntity outboxEntity, ExecutionResult executionResult)
{
Expand All @@ -24,11 +24,13 @@ public async Task CommitExecutionResult(OutboxEntity outboxEntity, ExecutionResu
await dbContext
.Set<OutboxEntity>()
.Where(x => x.Id == outboxEntity.Id)
.ExecuteUpdateAsync(s => s.SetProperty(r => r.RetriesCount, r => r.RetriesCount + 1)
.SetProperty(r => r.Status, executionResult.Status)
.SetProperty(r => r.RetryTimeoutUtc, executionResult.RetryTimeoutUtc)
.SetProperty(r => r.LastUpdatedUtc, now)
.SetProperty(r => r.HandlerLock, (Guid?)null)
.ExecuteUpdateAsync(
s => s
.SetProperty(r => r.RetriesCount, r => r.RetriesCount + 1)
.SetProperty(r => r.Status, executionResult.Status)
.SetProperty(r => r.RetryTimeoutUtc, executionResult.RetryTimeoutUtc)
.SetProperty(r => r.LastUpdatedUtc, now)
.SetProperty(r => r.HandlerLock, (Guid?)null)
);
}

Expand All @@ -38,17 +40,15 @@ public async Task<OutboxEntityDispatchModel[]> GetOutboxRecords(int maxCount)

var recordsToUpdateCount = _rollingOutboxQueryType switch
{
0 => await ReserveNonLockedRetryOutboxRecords(lockId, maxCount),
1 => await ReserveNonLockedOutboxRecords(lockId, maxCount),
2 => await ReserveLockedRetryOutboxRecords(lockId, maxCount),
3 => await ReserveLockedOutboxRecords(lockId, maxCount),
0 => await ReserveRetryOutboxRecords(lockId, maxCount),
1 => await ReserveOutboxRecords(lockId, maxCount),
_ => await ReserveStuckInProgressRecords(lockId, maxCount)
};

var stuckInProgress = _rollingOutboxQueryType == 4;
var stuckInProgress = _rollingOutboxQueryType == 2;

if (recordsToUpdateCount < maxCount)
_rollingOutboxQueryType = (_rollingOutboxQueryType + 1) % 5;
_rollingOutboxQueryType = (_rollingOutboxQueryType + 1) % 3;

if (recordsToUpdateCount == 0)
return Array.Empty<OutboxEntityDispatchModel>();
Expand All @@ -64,39 +64,39 @@ public async Task<OutboxEntityDispatchModel[]> GetOutboxRecords(int maxCount)
.ToArray();
}

private async Task<int> ReserveLockedOutboxRecords(Guid handlerLockId, int maxCount)
private async Task<int> ReserveOutboxRecords(Guid handlerLockId, int maxCount)
{
var now = DateTime.UtcNow;

return await dbContext
.Set<OutboxEntity>()
.Where(r =>
r.HandlerLock == null
&& r.Status == OutboxRecordStatus.ReadyToExecute
&& dbContext
.Set<OutboxEntity>()
.Where(x => x.Lock != null
&& !dbContext.Set<OutboxEntity>().Any(y =>
y.Lock == x.Lock && !s_unlockedStatuses.Contains(y.Status))
// Note: Using explicit subquery instead of x.Parent.Status navigation property
// because EF Core 10 ExecuteUpdateAsync cannot properly translate queries with navigation properties
&& (x.ParentId == null || dbContext.Set<OutboxEntity>().Any(p =>
p.Id == x.ParentId && p.Status == OutboxRecordStatus.Completed))
&& x.Status == OutboxRecordStatus.ReadyToExecute
&& x.Version == outboxOptions.Version
&& x.HandlerLock == null)
.GroupBy(x => x.Lock)
.Select(x => x.OrderBy(x => x.Id).FirstOrDefault().Id)
.OrderBy(x => x)
.Take(maxCount)
.Contains(r.Id))
.ExecuteUpdateAsync(s => s.SetProperty(r => r.Status, OutboxRecordStatus.InProgress)
.SetProperty(r => r.HandlerLock, handlerLockId)
.SetProperty(r => r.LastUpdatedUtc, now)
.Where(
r =>
r.HandlerLock == null
&& r.Status == OutboxRecordStatus.ReadyToExecute
&& dbContext
.Set<OutboxEntity>()
.Where(
x => !dbContext
.Set<OutboxEntity>()
.Any(y => y.Lock == x.Lock && !s_unlockedStatuses.Contains(y.Status))
&& x.Status == OutboxRecordStatus.ReadyToExecute
&& x.Version == outboxOptions.Version
&& x.HandlerLock == null)
.GroupBy(x => x.Lock)
.Select(x => x.OrderBy(x => x.Id).FirstOrDefault().Id)
.OrderBy(x => x)
.Take(maxCount)
.Contains(r.Id))
.ExecuteUpdateAsync(
s => s
.SetProperty(r => r.Status, OutboxRecordStatus.InProgress)
.SetProperty(r => r.HandlerLock, handlerLockId)
.SetProperty(r => r.LastUpdatedUtc, now)
);
}

private async Task<int> ReserveLockedRetryOutboxRecords(Guid handlerLockId, int maxCount)
private async Task<int> ReserveRetryOutboxRecords(Guid handlerLockId, int maxCount)
{
var now = DateTime.UtcNow;

Expand All @@ -109,8 +109,7 @@ private async Task<int> ReserveLockedRetryOutboxRecords(Guid handlerLockId, int
&& dbContext
.Set<OutboxEntity>()
.Where(
x => x.Lock != null
&& !dbContext.Set<OutboxEntity>().Any(y => y.Lock == x.Lock && !s_unlockedStatuses.Contains(y.Status))
x => !dbContext.Set<OutboxEntity>().Any(y => y.Lock == x.Lock && !s_unlockedStatuses.Contains(y.Status))
&& x.RetryTimeoutUtc < now
&& x.Version == outboxOptions.Version
&& x.HandlerLock == null)
Expand All @@ -119,51 +118,11 @@ private async Task<int> ReserveLockedRetryOutboxRecords(Guid handlerLockId, int
.OrderBy(x => x)
.Take(maxCount)
.Contains(r.Id))
.ExecuteUpdateAsync(s => s.SetProperty(r => r.Status, OutboxRecordStatus.InProgress)
.SetProperty(r => r.HandlerLock, handlerLockId)
.SetProperty(r => r.LastUpdatedUtc, now)
);
}

private async Task<int> ReserveNonLockedOutboxRecords(Guid handlerLockId, int maxCount)
{
var now = DateTime.UtcNow;

return await dbContext
.Set<OutboxEntity>()
.Where(x =>
x.RetryTimeoutUtc < now
&& x.Version == outboxOptions.Version
&& x.HandlerLock == null
&& x.Lock == null)
.OrderBy(x => x.Id)
.Take(maxCount)
.ExecuteUpdateAsync(s => s.SetProperty(r => r.Status, OutboxRecordStatus.InProgress)
.SetProperty(r => r.HandlerLock, handlerLockId)
.SetProperty(r => r.LastUpdatedUtc, now)
);
}

private async Task<int> ReserveNonLockedRetryOutboxRecords(Guid handlerLockId, int maxCount)
{
var now = DateTime.UtcNow;

return await dbContext
.Set<OutboxEntity>()
.Where(
// Note: Using explicit subquery instead of x.Parent.Status navigation property
// because EF Core 10 ExecuteUpdateAsync cannot properly translate queries with navigation properties
x => (x.ParentId == null || dbContext.Set<OutboxEntity>()
.Any(p => p.Id == x.ParentId && p.Status == OutboxRecordStatus.Completed))
&& x.Status == OutboxRecordStatus.ReadyToExecute
&& x.Version == outboxOptions.Version
&& x.HandlerLock == null
&& x.Lock == null)
.OrderBy(x => x.Id)
.Take(maxCount)
.ExecuteUpdateAsync(s => s.SetProperty(r => r.Status, OutboxRecordStatus.InProgress)
.SetProperty(r => r.HandlerLock, handlerLockId)
.SetProperty(r => r.LastUpdatedUtc, now)
.ExecuteUpdateAsync(
s => s
.SetProperty(r => r.Status, OutboxRecordStatus.InProgress)
.SetProperty(r => r.HandlerLock, handlerLockId)
.SetProperty(r => r.LastUpdatedUtc, now)
);
}

Expand All @@ -183,9 +142,10 @@ private async Task<int> ReserveStuckInProgressRecords(Guid handlerLockId, int ma
&& r.LastUpdatedUtc < stuckCutoff)
.OrderBy(x => x.Id)
.Take(maxCount)
.ExecuteUpdateAsync(s => s.SetProperty(r => r.HandlerLock, handlerLockId)
.SetProperty(r => r.LastUpdatedUtc, now)
.ExecuteUpdateAsync(
s => s
.SetProperty(r => r.HandlerLock, handlerLockId)
.SetProperty(r => r.LastUpdatedUtc, now)
);
;
}
}
}
27 changes: 15 additions & 12 deletions GenericOutbox/OutboxCreatorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ class OutboxCreatorContext<TDbContext> : IOutboxCreatorContext where TDbContext

private readonly LockClearer _lockClearer;

private int? _previousStepId;
private Guid? _lock;
private Guid _lock;
private readonly Guid _scopeId;
private readonly string _metadata;

Expand All @@ -23,29 +22,28 @@ public OutboxCreatorContext(TDbContext dbContext, ISerializer serializer, Outbox
_dbContext = dbContext;
_serializer = serializer;
_outboxOptions = outboxOptions;

var metadataProviderService = serviceProvider.GetService(typeof(IOutboxMetadataProvider));
if (metadataProviderService != null)
{
_metadata = ((IOutboxMetadataProvider)metadataProviderService).GetMetadata();
}

_lockClearer = new LockClearer(this);
_lock = Guid.NewGuid();
_lockClearer = new LockClearer(this, _lock);
_scopeId = Guid.NewGuid();
}

public void CreateOutboxRecord<T>(string action, T model)
{
var newRecord = CreateOutboxRecordInternal(action, model);
CreateOutboxRecordInternal(action, model);
_dbContext.SaveChanges();
_previousStepId = newRecord.Id;
}

public async Task CreateOutboxRecordAsync<T>(string action, T model)
{
var newRecord = CreateOutboxRecordInternal(action, model);
CreateOutboxRecordInternal(action, model);
await _dbContext.SaveChangesAsync();
_previousStepId = newRecord.Id;
}

public IDisposable Lock<T>(string entityName, T entityId)
Expand All @@ -57,6 +55,9 @@ public IDisposable Lock<T>(string entityName, T entityId)

public IDisposable Lock(Guid lockGuid)
{
if (_lockClearer.IsLocked)
throw new InvalidOperationException("Lock is already taken.");

_lock = lockGuid;
return _lockClearer;
}
Expand All @@ -70,7 +71,6 @@ private OutboxEntity CreateOutboxRecordInternal<T>(string action, T model)
Action = action,
ScopeId = _scopeId,
Payload = _serializer.Serialize(model),
ParentId = _previousStepId,
Version = _outboxOptions.Version,
Lock = _lock,
LastUpdatedUtc = utcNow,
Expand All @@ -86,15 +86,18 @@ private OutboxEntity CreateOutboxRecordInternal<T>(string action, T model)
class LockClearer : IDisposable
{
private readonly OutboxCreatorContext<TDbContext> _outboxCreatorContext;
private readonly Guid _defaultLock;
public bool IsLocked => _defaultLock != _outboxCreatorContext._lock;

public LockClearer(OutboxCreatorContext<TDbContext> outboxCreatorContext)
public LockClearer(OutboxCreatorContext<TDbContext> outboxCreatorContext, Guid defaultLock)
{
_outboxCreatorContext = outboxCreatorContext;
_defaultLock = defaultLock;
}

public void Dispose()
{
_outboxCreatorContext._lock = null;
_outboxCreatorContext._lock = _defaultLock;
}
}
}
}
2 changes: 1 addition & 1 deletion GenericOutbox/OutboxDispatcherHostedService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,4 @@ private static ExecutionResult GetRetryStrategyResolution(ILogger<OutboxDispatch

return executionResult;
}
}
}
Loading