Skip to content

Commit 52084dc

Browse files
committed
[Concurrency] Add Swift Task Stealers
When Swift Tasks are enqueued onto the cooperative pool, the queue they are placed onto is based on their current priority. If the Task is further escalated, it will be marked as such so that when it runs, it will self-escalate the thread to the required priority, but while it is enqueued, nothing will happen. If the Task was originally enqueued at a low priority, it may not be able to run because it is waiting on other items ahead and the system doesn’t have enough room to do low priority work. This is a priority inversion since this now-higher-priority Task is unable to run. This commit implements a "stealer" object for Tasks that is similar to the existing Actor stealer call AsyncTaskStealer. These would be used only when Dispatch is the default global executor and priority escalation is enabled. To an executor of any kind (Dispatch or custom), stealer objects would look like any other Job. This avoids edge cases when a Task is enqueued to multiple types of exectutors through its lifetime (e.g. global, Actor, custom). Stealer objects have to be created any time the Task needs to be enqueued while the Task object itself is still enqueued somewhere (which in Dispatch executors is via an intrusive linked list thus only allowing the Task to be enqueued in one place). If it does eventually become dequeued, it may be enqueued directly the next time it needs to be enqueued. In order to ensure that the Task is only run when it should, it needs to contain a counter that increments each time it finishes an execution. Each stealer object contains a field indicating at which value of the counter is it able to run and the Task itself has one in its private data for the direct enqueued copy. If the Task or stealer is run but the Task’s current counter is not equal to the one that is allowed to be run, the invocation will do nothing. Because flagAsRunning can now fail and a previous commit introduced an opaque return value, there is a split between two cases where flagAsRunning can be called. When the Task is being resumed immediatly after suspention without being enqueued again, flagAsRunning is infailable and will always return zero for the opaque value. When initially running from swift_task_run, it is failable and may return an opaque value. I've refactored flagAsRunning to separate these cases and try my best to reduce some redundant code. rdar://160967177
1 parent 96cd2e1 commit 52084dc

File tree

7 files changed

+608
-141
lines changed

7 files changed

+608
-141
lines changed

include/swift/ABI/MetadataValues.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2692,6 +2692,7 @@ enum class JobKind : size_t {
26922692
DefaultActorOverride,
26932693
NullaryContinuation,
26942694
IsolatedDeinit,
2695+
TaskStealer,
26952696
};
26962697

26972698
/// The priority of a job. Higher priorities are larger values.

include/swift/ABI/Task.h

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -430,21 +430,53 @@ class AsyncTask : public Job {
430430
/// Generally this should be done immediately after updating
431431
/// ActiveTask.
432432
///
433-
/// When Dispatch is used for the default executor:
434-
/// * If the return value is non-zero, it must be passed
435-
/// to swift_dispatch_thread_reset_override_self
433+
/// This function returns two values. The first, boolean, value
434+
/// reports if the Task was successfully marked as running. The second
435+
/// is an opaque value used to later reset some properties of the
436+
/// thread. There are two cases described below for their meaning.
437+
///
438+
/// When Dispatch is used as the default
439+
/// executor and priority escalation is enabled:
440+
/// * If the opaque value is non-zero, it must be passed
441+
/// to swift_dispatch_thread_reset_override_self.
436442
/// before returning to the executor.
437-
/// * If the return value is zero, it may be ignored or passed to
443+
/// * If the opaque value is zero, it may be ignored or passed to
438444
/// the aforementioned function (which will ignore values of zero).
439-
/// The current implementation will always return zero
440-
/// if you call flagAsRunning again before calling
441-
/// swift_dispatch_thread_reset_override_self with the
442-
/// initial value. This supports suspending and immediately
443-
/// resuming a Task without returning up the callstack.
445+
/// * This function is failable. If this function returns
446+
/// false, no more setup of the environment should be done
447+
/// and runInFullyEstablishedContext must not be called.
448+
/// * If this function is being called directly from swift_task_run (i.e.
449+
/// not indirect through an AsyncTaskStealer), then removeEnqueued must
450+
/// be true since the Task must be marked as no longer enqueued on either
451+
/// the success or failure path to allow enqueuing the Task directly
452+
/// again. If this function is being called from an AsyncTaskStealer
453+
/// (we did not dequeue the Task directly), then this argument must
454+
/// be false so that the Task's enqueued status is not changed.
455+
/// * If this function is called via an AsyncTaskStealer, the
456+
/// allowedExclusionValue must be the one encoded in the stealer.
457+
/// * Otherwise, when this function is called from
458+
/// the Task directly, allowedExclusionValue must be
459+
/// the value encoded in the Task's private data.
460+
/// * The exclusion value allows both the Task's intrusive link
461+
/// and one or more AsyncTaskStealers to be enqueued at the same
462+
/// time, with different exclusion values, and all but the one
463+
/// most recently enqueued will have this function return false.
444464
///
445-
/// For all other default executors, flagAsRunning
446-
/// will return zero which may be ignored.
447-
uint32_t flagAsRunning();
465+
/// For all other default executors, or
466+
/// when priority escalation is not enabled:
467+
/// * This function will always return true.
468+
/// * allowedExclusionValue is ignored (and is expected to always be zero).
469+
/// * The opaque return value will always be zero and may be ignored.
470+
std::pair<bool, uint32_t> flagAsRunning(uint8_t allowedExclusionValue, bool removeEnqueued);
471+
472+
/// This variant of flagAsRunning may be called if you are resumming
473+
/// immediately after suspending. That is, you are on the same thread,
474+
/// you have not enqueued onto any executor, and you have not called
475+
/// swift_dispatch_thread_reset_override_self or done any other
476+
/// cleanup work. This is intended for situations such as awaiting
477+
/// where you may mark yourself as suspended but find out during
478+
/// atomic state update that you may actually resume immediately.
479+
void flagAsRunningImmediately();
448480

449481
/// Flag that this task is now suspended with information about what it is
450482
/// waiting on.

stdlib/public/Concurrency/Actor.cpp

Lines changed: 67 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
///===----------------------------------------------------------------------===///
1717

1818
#include "swift/Runtime/Concurrency.h"
19+
#include "swift/Runtime/DispatchShims.h"
1920
#include <atomic>
2021
#include <new>
2122
#if __has_feature(ptrauth_calls)
@@ -221,26 +222,26 @@ ExecutorTrackingInfo::ActiveInfoInThread;
221222

222223
} // end anonymous namespace
223224

224-
void swift::runJobInEstablishedExecutorContext(Job *job) {
225-
_swift_tsan_acquire(job);
226-
SWIFT_TASK_DEBUG_LOG("Run job in established context %p", job);
227-
228-
#if SWIFT_OBJC_INTEROP
229-
auto pool = objc_autoreleasePoolPush();
230-
#endif
231-
232-
if (auto task = dyn_cast<AsyncTask>(job)) {
225+
/// This function establishes the Task's context and attempts to invoke
226+
/// it. The invocation may fail and the Task may not be run if the
227+
/// passed in exclusion value is not what is in the ActiveTaskStatus
228+
/// during the cas loop to mark the Task as running. If Task
229+
/// priority escalation is not enabled, this will always succeed.
230+
SWIFT_ALWAYS_INLINE
231+
static inline
232+
void taskInvokeWithExclusionValue(AsyncTask *task,
233+
uint8_t allowedStealerExclusionValue,
234+
bool removeEnqueued) {
235+
// Update the task status to say that it's running on the current
236+
// thread. If the task suspends somewhere, it should update the
237+
// task status appropriately; we don't need to update it afterwards.
238+
[[maybe_unused]]
239+
auto [mayRun, dispatchOpaquePriority] = task->flagAsRunning(allowedStealerExclusionValue, removeEnqueued);
240+
if (mayRun) {
233241
// Update the active task in the current thread.
234242
auto oldTask = ActiveTask::swap(task);
235243

236-
// Update the task status to say that it's running on the
237-
// current thread. If the task suspends somewhere, it should
238-
// update the task status appropriately; we don't need to update
239-
// it afterwards.
240-
[[maybe_unused]]
241-
uint32_t dispatchOpaquePriority = task->flagAsRunning();
242-
243-
auto traceHandle = concurrency::trace::job_run_begin(job);
244+
auto traceHandle = concurrency::trace::job_run_begin(task);
244245
task->runInFullyEstablishedContext();
245246
concurrency::trace::job_run_end(traceHandle);
246247

@@ -251,6 +252,37 @@ void swift::runJobInEstablishedExecutorContext(Job *job) {
251252
assert(ActiveTask::get() == nullptr &&
252253
"active task wasn't cleared before suspending?");
253254
if (oldTask) ActiveTask::set(oldTask);
255+
}
256+
}
257+
258+
/// Runs the Task embedded in the stealer using the stealer's exclusion value.
259+
/// The stealer holds a reference to the Task which is released here. Stealers
260+
/// are not reference counted so the object is directly destroyed here.
261+
inline void
262+
AsyncTaskStealer::process(Job *_job) {
263+
auto *stealer = cast<AsyncTaskStealer>(_job);
264+
265+
taskInvokeWithExclusionValue(stealer->Task, stealer->ExclusionValue, false);
266+
267+
// We are done with the Task at this point so we can release it
268+
swift_release(stealer->Task);
269+
270+
swift_cxx_deleteObject(stealer);
271+
}
272+
273+
void swift::runJobInEstablishedExecutorContext(Job *job) {
274+
_swift_tsan_acquire(job);
275+
SWIFT_TASK_DEBUG_LOG("Run job in established context %p", job);
276+
277+
#if SWIFT_OBJC_INTEROP
278+
auto pool = objc_autoreleasePoolPush();
279+
#endif
280+
281+
if (auto task = dyn_cast<AsyncTask>(job)) {
282+
// taskRemoveEnqueued modifies the TaskActiveStatus so we could plumb
283+
// through the result and avoid an extra reload later. It could also
284+
// be pulled into the cas to flag as running in the successful case
285+
taskInvokeWithExclusionValue(task, task->_private().LocalStealerExclusionValue, true);
254286
} else {
255287
// There's no extra bookkeeping to do for simple jobs besides swapping in
256288
// the voucher.
@@ -1504,6 +1536,9 @@ TaskExecutorRef TaskExecutorRef::fromTaskExecutorPreference(Job *job) {
15041536
if (auto task = dyn_cast<AsyncTask>(job)) {
15051537
return task->getPreferredTaskExecutor();
15061538
}
1539+
if (auto stealer = dyn_cast<AsyncTaskStealer>(job)) {
1540+
return stealer->Task->getPreferredTaskExecutor();
1541+
}
15071542
return TaskExecutorRef::undefined();
15081543
}
15091544

@@ -2741,8 +2776,22 @@ swift_actor_escalate(DefaultActorImpl *actor, AsyncTask *task, JobPriority newPr
27412776
SWIFT_CC(swift)
27422777
void swift::swift_executor_escalate(SerialExecutorRef executor, AsyncTask *task,
27432778
JobPriority newPriority) {
2779+
SWIFT_TASK_DEBUG_LOG("Escalating executor %p to %#x",
2780+
(void*)executor.getIdentity(), newPriority);
27442781
if (executor.isGeneric()) {
2745-
// TODO (rokhinip): We'd push a stealer job for the task on the executor.
2782+
SWIFT_TASK_DEBUG_LOG("Enqueuing stealer for %p on %p",
2783+
(void*)task, (void*)executor.getIdentity());
2784+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
2785+
// See the comment in enqueueDirectOrSteal
2786+
// for why async let Tasks aren't supported
2787+
if (!task->Flags.task_isAsyncLetTask()) {
2788+
// Even though we are in the "enqueue stealer" path, this could
2789+
// enqueue the original Task if another stealer had previously
2790+
// been enqueued and still is but the original Task did manage to
2791+
// run at some point (while rare, this wouldn't be unexpected)
2792+
taskEnqueueDirectOrSteal(task, executor, true);
2793+
}
2794+
#endif
27462795
return;
27472796
}
27482797

stdlib/public/Concurrency/Task.cpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,10 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
133133
waitingTask, this);
134134
_swift_tsan_acquire(static_cast<Job *>(this));
135135
if (suspendedWaiter) {
136-
// This will always return zero because we were just
136+
// This is safe to call because we were just
137137
// running this Task so its BasePriority (which is
138138
// immutable) should've already been set on the thread.
139-
[[maybe_unused]]
140-
uint32_t opaque = waitingTask->flagAsRunning();
141-
assert(opaque == 0);
139+
waitingTask->flagAsRunningImmediately();
142140
}
143141
// The task is done; we don't need to wait.
144142
return queueHead.getStatus();
@@ -1666,11 +1664,9 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
16661664
// we try to tail-call.
16671665
} while (false);
16681666
#else
1669-
// This will always return zero because we were just running this Task so its
1667+
// This is safe to call because we were just running this Task so its
16701668
// BasePriority (which is immutable) should've already been set on the thread.
1671-
[[maybe_unused]]
1672-
uint32_t opaque = task->flagAsRunning();
1673-
assert(opaque == 0);
1669+
task->flagAsRunningImmediately();
16741670
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
16751671

16761672
if (context->isExecutorSwitchForced())

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1817,17 +1817,17 @@ reevaluate_if_taskgroup_has_results:;
18171817
assumed = TaskGroupStatus{assumedStatus};
18181818
continue; // We raced with something, try again.
18191819
}
1820-
SWIFT_TASK_DEBUG_LOG("poll, after CAS: %s", status.to_string().c_str());
1820+
#if !SWIFT_CONCURRENCY_EMBEDDED
1821+
SWIFT_TASK_DEBUG_LOG("poll, after CAS: %s", assumed.to_string(this).c_str());
1822+
#endif
18211823

18221824
// We're going back to running the task, so if we suspended before,
18231825
// we need to flag it as running again.
18241826
if (hasSuspended) {
1825-
// This will always return zero because we were just
1827+
// This is safe to call because we were just
18261828
// running this Task so its BasePriority (which is
18271829
// immutable) should've already been set on the thread.
1828-
[[maybe_unused]]
1829-
uint32_t opaque = waitingTask->flagAsRunning();
1830-
assert(opaque == 0);
1830+
waitingTask->flagAsRunningImmediately();
18311831
}
18321832

18331833
// Success! We are allowed to poll.

0 commit comments

Comments
 (0)