diff --git a/src/DurableTask.Netherite/OrchestrationService/Client.cs b/src/DurableTask.Netherite/OrchestrationService/Client.cs index 1936160c..7a32cf55 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Client.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Client.cs @@ -29,8 +29,9 @@ class Client : TransportAbstraction.IClient readonly WorkItemTraceHelper workItemTraceHelper; readonly Stopwatch workItemStopwatch; readonly CancellationTokenSource cts; - - static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(5); + readonly TimeSpan defaultTimeout; + readonly TimeSpan createOrchestrationTimeout; + readonly TimeSpan sendMessageTimeout; public Guid ClientId { get; private set; } TransportAbstraction.ISender BatchSender { get; set; } @@ -62,6 +63,10 @@ public Client( this.account = host.StorageAccountName; this.BatchSender = batchSender; this.cts = CancellationTokenSource.CreateLinkedTokenSource(shutdownToken); + this.defaultTimeout = host.Settings.ClientTimeout; + this.createOrchestrationTimeout = host.Settings.CreateOrchestrationTimeout ?? this.defaultTimeout; + this.sendMessageTimeout = host.Settings.SendOrchestrationMessageTimeout ?? this.defaultTimeout; + this.ResponseTimeouts = new BatchTimer(this.cts.Token, this.Timeout, this.traceHelper.TraceTimerProgress); this.ResponseWaiters = new ConcurrentDictionary(); this.Fragments = new Dictionary(); @@ -244,7 +249,17 @@ void Timeout(List pendingRequests) // we align timeouts into buckets so we can process timeout storms more efficiently const long ticksPerBucket = 2 * TimeSpan.TicksPerSecond; - DateTime GetTimeoutBucket(TimeSpan timeout) => new DateTime((((DateTime.UtcNow + timeout).Ticks / ticksPerBucket) * ticksPerBucket), DateTimeKind.Utc); + DateTime GetTimeoutBucket(TimeSpan timeout) + { + var timeAtTimeOut = DateTime.UtcNow + timeout; + + // Do not align if doing so would have a significant effect on timeout duration + if (timeout.Ticks < ticksPerBucket * 5) + return timeAtTimeOut; + + return new DateTime(((timeAtTimeOut.Ticks / ticksPerBucket) * ticksPerBucket), DateTimeKind.Utc); + } + Task PerformRequestWithTimeout(IClientRequestEvent request) { @@ -455,7 +470,7 @@ public async Task CreateTaskOrchestrationAsync(uint partitionId, TaskMessage cre RequestId = Interlocked.Increment(ref this.SequenceNumber), TaskMessage = creationMessage, DedupeStatuses = dedupeStatuses, - TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout), + TimeoutUtc = this.GetTimeoutBucket(this.createOrchestrationTimeout), }; this.workItemTraceHelper.TraceWorkItemStarted( @@ -486,7 +501,7 @@ public Task SendTaskOrchestrationMessageBatchAsync(uint partitionId, IEnumerable ClientId = this.ClientId, RequestId = Interlocked.Increment(ref this.SequenceNumber), TaskMessages = messages.ToArray(), - TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout), + TimeoutUtc = this.GetTimeoutBucket(this.sendMessageTimeout), }; // we number the messages in this batch in order to create unique message ids for tracing purposes @@ -562,7 +577,7 @@ public async Task GetOrchestrationStateAsync(uint partitionI InstanceId = instanceId, IncludeInput = fetchInput, IncludeOutput = fetchOutput, - TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout), + TimeoutUtc = this.GetTimeoutBucket(this.defaultTimeout), }; var response = await this.PerformRequestWithTimeout(request).ConfigureAwait(false); @@ -582,7 +597,7 @@ public async Task GetOrchestrationStateAsync(uint partitionI ClientId = this.ClientId, RequestId = Interlocked.Increment(ref this.SequenceNumber), InstanceId = instanceId, - TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout), + TimeoutUtc = this.GetTimeoutBucket(this.defaultTimeout), }; var response = (HistoryResponseReceived)await this.PerformRequestWithTimeout(request).ConfigureAwait(false); @@ -976,7 +991,7 @@ public Task ForceTerminateTaskOrchestrationAsync(uint partitionId, string instan ClientId = this.ClientId, RequestId = Interlocked.Increment(ref this.SequenceNumber), TaskMessages = taskMessages, - TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout), + TimeoutUtc = this.GetTimeoutBucket(this.defaultTimeout), }; return this.PerformRequestWithTimeout(request); @@ -995,7 +1010,7 @@ public async Task DeleteAllDataForOrchestrationInstance(uint partitionId, s ClientId = this.ClientId, RequestId = Interlocked.Increment(ref this.SequenceNumber), InstanceId = instanceId, - TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout), + TimeoutUtc = this.GetTimeoutBucket(this.defaultTimeout), }; var response = await this.PerformRequestWithTimeout(request).ConfigureAwait(false); diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs index 8b15a170..cbb5f633 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs @@ -211,6 +211,21 @@ public class NetheriteOrchestrationServiceSettings /// public bool DisablePrefetchDuringReplay { get; set; } = false; + /// + /// Time limit for orchestration client operations. + /// + public TimeSpan ClientTimeout { get; set; } = TimeSpan.FromMinutes(5); + + /// + /// Time limit for CreateTaskOrchestration client operation. + /// + public TimeSpan? CreateOrchestrationTimeout { get; set; } + + /// + /// Time limit for SendTaskOrchestrationMessage client operation. + /// + public TimeSpan? SendOrchestrationMessageTimeout { get; set; } + /// /// Allows attaching additional checkers and debuggers during testing. ///