From b5ce9c890ea9040a6fb6e4f2f9b244fb3debbd4b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 24 Dec 2025 19:28:41 +0000 Subject: [PATCH 1/3] Initial plan From 563c9e4603f15139d7a189e48c17a31a85287f7f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 24 Dec 2025 20:02:07 +0000 Subject: [PATCH 2/3] fix: clean transient entities and state flags Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- src/Client/Core/Entities/EntityMetadata.cs | 68 ++++++-- src/Client/Grpc/GrpcDurableEntityClient.cs | 93 +++++++++-- .../GrpcDurableEntityClientTests.cs | 153 ++++++++++++++++++ 3 files changed, 287 insertions(+), 27 deletions(-) create mode 100644 test/Client/Grpc.Tests/GrpcDurableEntityClientTests.cs diff --git a/src/Client/Core/Entities/EntityMetadata.cs b/src/Client/Core/Entities/EntityMetadata.cs index f0064853a..78c346bda 100644 --- a/src/Client/Core/Entities/EntityMetadata.cs +++ b/src/Client/Core/Entities/EntityMetadata.cs @@ -11,12 +11,8 @@ namespace Microsoft.DurableTask.Client.Entities; /// Represents entity metadata. /// /// The type of state held by the metadata. -/// -/// Initializes a new instance of the class. -/// -/// The ID of the entity. [JsonConverter(typeof(EntityMetadataConverter))] -public class EntityMetadata(EntityInstanceId id) +public class EntityMetadata { readonly TState? state; @@ -25,17 +21,39 @@ public class EntityMetadata(EntityInstanceId id) /// /// The ID of the entity. /// The state of the entity. - public EntityMetadata(EntityInstanceId id, TState? state) - : this(id) + /// + /// A value indicating whether the entity state was included in the response, even when the state is . + /// + public EntityMetadata(EntityInstanceId id, TState? state, bool includesState) { - this.IncludesState = state is not null; + this.Id = Check.NotDefault(id); + this.IncludesState = includesState; this.state = state; } + /// + /// Initializes a new instance of the class. + /// + /// The ID of the entity. + /// The state of the entity. + public EntityMetadata(EntityInstanceId id, TState? state) + : this(id, state, state is not null) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The ID of the entity. + public EntityMetadata(EntityInstanceId id) + : this(id, default, false) + { + } + /// /// Gets the ID for this entity. /// - public EntityInstanceId Id { get; } = Check.NotDefault(id); + public EntityInstanceId Id { get; } /// /// Gets the time the entity was last modified. @@ -60,7 +78,7 @@ public EntityMetadata(EntityInstanceId id, TState? state) /// [MemberNotNullWhen(true, nameof(State))] [MemberNotNullWhen(true, nameof(state))] - public bool IncludesState { get; } = false; + public bool IncludesState { get; } /// /// Gets the state for this entity. @@ -90,13 +108,29 @@ public TState State /// /// Represents the metadata for a durable entity instance. /// -/// -/// Initializes a new instance of the class. -/// -/// The ID of the entity. -/// The state of this entity. [JsonConverter(typeof(EntityMetadataConverter))] -public sealed class EntityMetadata(EntityInstanceId id, SerializedData? state = null) - : EntityMetadata(id, state) +public sealed class EntityMetadata : EntityMetadata { + /// + /// Initializes a new instance of the class. + /// + /// The ID of the entity. + /// The state of this entity. + public EntityMetadata(EntityInstanceId id, SerializedData? state = null) + : base(id, state) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The ID of the entity. + /// The state of this entity. + /// + /// A value indicating whether the entity state was included in the response, even when the state is . + /// + public EntityMetadata(EntityInstanceId id, SerializedData? state, bool includesState) + : base(id, state, includesState) + { + } } diff --git a/src/Client/Grpc/GrpcDurableEntityClient.cs b/src/Client/Grpc/GrpcDurableEntityClient.cs index fd8e63926..d2c86dd74 100644 --- a/src/Client/Grpc/GrpcDurableEntityClient.cs +++ b/src/Client/Grpc/GrpcDurableEntityClient.cs @@ -1,7 +1,9 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using Microsoft.DurableTask.Client.Entities; using Microsoft.DurableTask.Entities; using Microsoft.Extensions.Logging; @@ -126,6 +128,11 @@ public override async Task CleanEntityStorageAsync( } while (continueUntilComplete && continuationToken != null); + if (continueUntilComplete && req.RemoveEmptyEntities) + { + emptyEntitiesRemoved += await this.PurgeTransientEntitiesAsync(cancellation); + } + return new CleanEntityStorageResult { ContinuationToken = continuationToken, @@ -220,12 +227,24 @@ EntityMetadata ToEntityMetadata(P.EntityMetadata metadata, bool includeState) { var coreEntityId = DTCore.Entities.EntityId.FromString(metadata.InstanceId); EntityInstanceId entityId = new(coreEntityId.Name, coreEntityId.Key); - bool hasState = metadata.SerializedState != null; + bool hasState = !string.IsNullOrEmpty(metadata.SerializedState); + + DateTimeOffset lastModified = metadata.LastModifiedTime?.ToDateTimeOffset() ?? default; + + if (includeState && hasState) + { + SerializedData data = new(metadata.SerializedState!, this.dataConverter); + return new EntityMetadata(entityId, data) + { + LastModifiedTime = lastModified, + BacklogQueueSize = metadata.BacklogQueueSize, + LockedBy = metadata.LockedBy, + }; + } - SerializedData? data = (includeState && hasState) ? new(metadata.SerializedState!, this.dataConverter) : null; - return new EntityMetadata(entityId, data) + return new EntityMetadata(entityId, null, includesState: includeState) { - LastModifiedTime = metadata.LastModifiedTime.ToDateTimeOffset(), + LastModifiedTime = lastModified, BacklogQueueSize = metadata.BacklogQueueSize, LockedBy = metadata.LockedBy, }; @@ -235,12 +254,12 @@ EntityMetadata ToEntityMetadata(P.EntityMetadata metadata, bool includeSta { var coreEntityId = DTCore.Entities.EntityId.FromString(metadata.InstanceId); EntityInstanceId entityId = new(coreEntityId.Name, coreEntityId.Key); - DateTimeOffset lastModified = metadata.LastModifiedTime.ToDateTimeOffset(); - bool hasState = metadata.SerializedState != null; + DateTimeOffset lastModified = metadata.LastModifiedTime?.ToDateTimeOffset() ?? default; + bool hasState = !string.IsNullOrEmpty(metadata.SerializedState); if (includeState && hasState) { - T? data = includeState ? this.dataConverter.Deserialize(metadata.SerializedState) : default; + T? data = this.dataConverter.Deserialize(metadata.SerializedState); return new EntityMetadata(entityId, data) { LastModifiedTime = lastModified, @@ -248,14 +267,68 @@ EntityMetadata ToEntityMetadata(P.EntityMetadata metadata, bool includeSta LockedBy = metadata.LockedBy, }; } - else - { - return new EntityMetadata(entityId) + + return includeState + ? new EntityMetadata(entityId, state: default, includesState: true) + { + LastModifiedTime = lastModified, + BacklogQueueSize = metadata.BacklogQueueSize, + LockedBy = metadata.LockedBy, + } + : new EntityMetadata(entityId) { LastModifiedTime = lastModified, BacklogQueueSize = metadata.BacklogQueueSize, LockedBy = metadata.LockedBy, }; + } + + async Task PurgeTransientEntitiesAsync(CancellationToken cancellation) + { + int removed = 0; + string? continuation = null; + + do + { + P.QueryEntitiesResponse response = await this.sidecarClient.QueryEntitiesAsync( + new P.QueryEntitiesRequest + { + Query = new P.EntityQuery + { + IncludeState = true, + IncludeTransient = true, + ContinuationToken = continuation, + }, + }, + cancellationToken: cancellation); + + List transientIds = response.Entities + .Where(IsEmptyTransientEntity) + .Select(e => e.InstanceId) + .ToList(); + + if (transientIds.Count > 0) + { + P.PurgeInstancesResponse purgeResponse = await this.sidecarClient.PurgeInstancesAsync( + new P.PurgeInstancesRequest + { + InstanceBatch = new P.InstanceBatch + { + InstanceIds = { transientIds }, + }, + }, + cancellationToken: cancellation); + + removed += purgeResponse.DeletedInstanceCount; + } + + continuation = response.ContinuationToken; } + while (continuation != null); + + return removed; } + + static bool IsEmptyTransientEntity(P.EntityMetadata entity) => + string.IsNullOrEmpty(entity.SerializedState) && string.IsNullOrEmpty(entity.LockedBy); } diff --git a/test/Client/Grpc.Tests/GrpcDurableEntityClientTests.cs b/test/Client/Grpc.Tests/GrpcDurableEntityClientTests.cs new file mode 100644 index 000000000..f9df7ee5c --- /dev/null +++ b/test/Client/Grpc.Tests/GrpcDurableEntityClientTests.cs @@ -0,0 +1,153 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Grpc.Core; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Converters; +using Microsoft.Extensions.Logging; +using System.Linq; +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.DurableTask.Client.Grpc.Tests; + +public class GrpcDurableEntityClientTests +{ + [Fact] + public async Task CleanEntityStorageAsync_PurgesTransientEntities() + { + // Arrange + var sidecar = new Mock(MockBehavior.Strict, (CallInvoker)new Mock().Object); + + sidecar + .Setup(c => c.CleanEntityStorageAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(CompletedAsyncUnaryCall(new P.CleanEntityStorageResponse())); + + sidecar + .Setup(c => c.QueryEntitiesAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(CompletedAsyncUnaryCall(new P.QueryEntitiesResponse + { + Entities = + { + new P.EntityMetadata + { + InstanceId = "@entity@one", + SerializedState = string.Empty, + LockedBy = string.Empty, + BacklogQueueSize = 0, + }, + new P.EntityMetadata + { + InstanceId = "@entity@two", + SerializedState = "state", + }, + }, + })); + + sidecar + .Setup(c => c.PurgeInstancesAsync( + It.Is(r => + r.InstanceBatch.InstanceIds.Count == 1 + && r.InstanceBatch.InstanceIds[0] == "@entity@one"), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(CompletedAsyncUnaryCall(new P.PurgeInstancesResponse + { + DeletedInstanceCount = 1, + })); + + GrpcDurableEntityClient client = this.CreateClient(sidecar.Object); + + // Act + CleanEntityStorageResult result = await client.CleanEntityStorageAsync( + new CleanEntityStorageRequest + { + RemoveEmptyEntities = true, + ReleaseOrphanedLocks = true, + }); + + // Assert + result.EmptyEntitiesRemoved.Should().Be(1); + result.OrphanedLocksReleased.Should().Be(0); + + sidecar.Verify( + c => c.QueryEntitiesAsync( + It.Is(r => r.Query.IncludeTransient && r.Query.IncludeState), + It.IsAny(), + It.IsAny(), + It.IsAny()), + Times.AtLeastOnce); + + sidecar.Verify( + c => c.PurgeInstancesAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny()), + Times.Once); + } + + [Fact] + public async Task GetAllEntitiesAsync_IncludeState_ReportsStateIncludedWhenMissing() + { + // Arrange + var sidecar = new Mock(MockBehavior.Strict, (CallInvoker)new Mock().Object); + + sidecar + .Setup(c => c.QueryEntitiesAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(CompletedAsyncUnaryCall(new P.QueryEntitiesResponse + { + Entities = + { + new P.EntityMetadata + { + InstanceId = "@entity@missing", + SerializedState = string.Empty, + LockedBy = string.Empty, + BacklogQueueSize = 0, + }, + }, + })); + + GrpcDurableEntityClient client = this.CreateClient(sidecar.Object); + + // Act + List results = await client + .GetAllEntitiesAsync(new EntityQuery { IncludeState = true, IncludeTransient = true }) + .ToListAsync(); + + // Assert + results.Should().ContainSingle(); + results[0].IncludesState.Should().BeTrue(); + results[0].State.Should().BeNull(); + } + + static AsyncUnaryCall CompletedAsyncUnaryCall(T response) + { + Task respTask = Task.FromResult(response); + return new AsyncUnaryCall( + respTask, + Task.FromResult(new Metadata()), + () => new Status(StatusCode.OK, string.Empty), + () => new Metadata(), + () => { }); + } + + GrpcDurableEntityClient CreateClient(P.TaskHubSidecarService.TaskHubSidecarServiceClient sidecar) + { + var logger = Mock.Of(); + return new GrpcDurableEntityClient("test", JsonDataConverter.Default, sidecar, logger); + } +} From 8218388dd374d74ed2c572a961a565e1b91046c5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 24 Dec 2025 20:03:19 +0000 Subject: [PATCH 3/3] chore: add docs and finalize cleaning changes Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- src/Client/Grpc/GrpcDurableEntityClient.cs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/Client/Grpc/GrpcDurableEntityClient.cs b/src/Client/Grpc/GrpcDurableEntityClient.cs index d2c86dd74..b237c7e10 100644 --- a/src/Client/Grpc/GrpcDurableEntityClient.cs +++ b/src/Client/Grpc/GrpcDurableEntityClient.cs @@ -283,6 +283,11 @@ EntityMetadata ToEntityMetadata(P.EntityMetadata metadata, bool includeSta }; } + /// + /// Iterates transient entity metadata and purges empty entities to fully clean storage. + /// + /// The cancellation token. + /// The number of entities removed. async Task PurgeTransientEntitiesAsync(CancellationToken cancellation) { int removed = 0; @@ -329,6 +334,11 @@ async Task PurgeTransientEntitiesAsync(CancellationToken cancellation) return removed; } + /// + /// Determines whether an entity is transient and contains no persisted state or locks. + /// + /// The entity metadata. + /// true when the entity is empty and transient; otherwise, false. static bool IsEmptyTransientEntity(P.EntityMetadata entity) => string.IsNullOrEmpty(entity.SerializedState) && string.IsNullOrEmpty(entity.LockedBy); }