Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions src/DurableTask.Netherite/OrchestrationService/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ class Client : TransportAbstraction.IClient
readonly WorkItemTraceHelper workItemTraceHelper;
readonly Stopwatch workItemStopwatch;
readonly CancellationTokenSource cts;

static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(5);
readonly TimeSpan defaultTimeout;
readonly TimeSpan createOrchestrationTimeout;
readonly TimeSpan sendMessageTimeout;

public Guid ClientId { get; private set; }
TransportAbstraction.ISender BatchSender { get; set; }
Expand Down Expand Up @@ -62,6 +63,10 @@ public Client(
this.account = host.StorageAccountName;
this.BatchSender = batchSender;
this.cts = CancellationTokenSource.CreateLinkedTokenSource(shutdownToken);
this.defaultTimeout = host.Settings.ClientTimeout;
this.createOrchestrationTimeout = host.Settings.CreateOrchestrationTimeout ?? this.defaultTimeout;
this.sendMessageTimeout = host.Settings.SendOrchestrationMessageTimeout ?? this.defaultTimeout;

this.ResponseTimeouts = new BatchTimer<PendingRequest>(this.cts.Token, this.Timeout, this.traceHelper.TraceTimerProgress);
this.ResponseWaiters = new ConcurrentDictionary<long, PendingRequest>();
this.Fragments = new Dictionary<string, (MemoryStream, int)>();
Expand Down Expand Up @@ -244,7 +249,17 @@ void Timeout(List<PendingRequest> pendingRequests)

// we align timeouts into buckets so we can process timeout storms more efficiently
const long ticksPerBucket = 2 * TimeSpan.TicksPerSecond;
DateTime GetTimeoutBucket(TimeSpan timeout) => new DateTime((((DateTime.UtcNow + timeout).Ticks / ticksPerBucket) * ticksPerBucket), DateTimeKind.Utc);
DateTime GetTimeoutBucket(TimeSpan timeout)
{
var timeAtTimeOut = DateTime.UtcNow + timeout;

// Do not align if doing so would have a significant effect on timeout duration
if (timeout.Ticks < ticksPerBucket * 5)
return timeAtTimeOut;

return new DateTime(((timeAtTimeOut.Ticks / ticksPerBucket) * ticksPerBucket), DateTimeKind.Utc);
}


Task<ClientEvent> PerformRequestWithTimeout(IClientRequestEvent request)
{
Expand Down Expand Up @@ -455,7 +470,7 @@ public async Task CreateTaskOrchestrationAsync(uint partitionId, TaskMessage cre
RequestId = Interlocked.Increment(ref this.SequenceNumber),
TaskMessage = creationMessage,
DedupeStatuses = dedupeStatuses,
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
TimeoutUtc = this.GetTimeoutBucket(this.createOrchestrationTimeout),
};

this.workItemTraceHelper.TraceWorkItemStarted(
Expand Down Expand Up @@ -486,7 +501,7 @@ public Task SendTaskOrchestrationMessageBatchAsync(uint partitionId, IEnumerable
ClientId = this.ClientId,
RequestId = Interlocked.Increment(ref this.SequenceNumber),
TaskMessages = messages.ToArray(),
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
TimeoutUtc = this.GetTimeoutBucket(this.sendMessageTimeout),
};

// we number the messages in this batch in order to create unique message ids for tracing purposes
Expand Down Expand Up @@ -562,7 +577,7 @@ public async Task<OrchestrationState> GetOrchestrationStateAsync(uint partitionI
InstanceId = instanceId,
IncludeInput = fetchInput,
IncludeOutput = fetchOutput,
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
TimeoutUtc = this.GetTimeoutBucket(this.defaultTimeout),
};

var response = await this.PerformRequestWithTimeout(request).ConfigureAwait(false);
Expand All @@ -582,7 +597,7 @@ public async Task<OrchestrationState> GetOrchestrationStateAsync(uint partitionI
ClientId = this.ClientId,
RequestId = Interlocked.Increment(ref this.SequenceNumber),
InstanceId = instanceId,
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
TimeoutUtc = this.GetTimeoutBucket(this.defaultTimeout),
};

var response = (HistoryResponseReceived)await this.PerformRequestWithTimeout(request).ConfigureAwait(false);
Expand Down Expand Up @@ -976,7 +991,7 @@ public Task ForceTerminateTaskOrchestrationAsync(uint partitionId, string instan
ClientId = this.ClientId,
RequestId = Interlocked.Increment(ref this.SequenceNumber),
TaskMessages = taskMessages,
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
TimeoutUtc = this.GetTimeoutBucket(this.defaultTimeout),
};

return this.PerformRequestWithTimeout(request);
Expand All @@ -995,7 +1010,7 @@ public async Task<int> DeleteAllDataForOrchestrationInstance(uint partitionId, s
ClientId = this.ClientId,
RequestId = Interlocked.Increment(ref this.SequenceNumber),
InstanceId = instanceId,
TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout),
TimeoutUtc = this.GetTimeoutBucket(this.defaultTimeout),
};

var response = await this.PerformRequestWithTimeout(request).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,21 @@ public class NetheriteOrchestrationServiceSettings
/// </summary>
public bool DisablePrefetchDuringReplay { get; set; } = false;

/// <summary>
/// Time limit for orchestration client operations.
/// </summary>
public TimeSpan ClientTimeout { get; set; } = TimeSpan.FromMinutes(5);

/// <summary>
/// Time limit for CreateTaskOrchestration client operation.
/// </summary>
public TimeSpan? CreateOrchestrationTimeout { get; set; }

/// <summary>
/// Time limit for SendTaskOrchestrationMessage client operation.
/// </summary>
public TimeSpan? SendOrchestrationMessageTimeout { get; set; }

/// <summary>
/// Allows attaching additional checkers and debuggers during testing.
/// </summary>
Expand Down
Loading