From 3e092dbc008afc6687941e05128d0c9e38e8ecf5 Mon Sep 17 00:00:00 2001 From: Bryce Wilson Date: Fri, 12 Dec 2025 16:56:57 -0800 Subject: [PATCH] [Concurrency] Add Swift Task Stealers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When Swift Tasks are enqueued onto the cooperative pool, the queue they are placed onto is based on their current priority. If the Task is further escalated, it will be marked as such so that when it runs, it will self-escalate the thread to the required priority, but while it is enqueued, nothing will happen. If the Task was originally enqueued at a low priority, it may not be able to run because it is waiting on other items ahead and the system doesn’t have enough room to do low priority work. This is a priority inversion since this now-higher-priority Task is unable to run. This commit implements a "stealer" object for Tasks that is similar to the existing Actor stealer called AsyncTaskStealer. These would be used only when Dispatch is the default global executor and priority escalation is enabled. To an executor of any kind (Dispatch or custom), stealer objects would look like any other Job. This avoids edge cases when a Task is enqueued to multiple types of executors through its lifetime (e.g. global, Actor, custom). Stealer objects have to be created any time the Task needs to be enqueued while the Task object itself is still enqueued somewhere (which in Dispatch executors is via an intrusive linked list thus only allowing the Task to be enqueued in one place). If it does eventually become dequeued, it may be enqueued directly the next time it needs to be enqueued. In order to ensure that the Task is only run when it should, it needs to contain a counter that increments each time it finishes an execution. Each stealer object contains a field indicating at which value of the counter is it able to run and the Task itself has one in its private data for the direct enqueued copy. If the Task or stealer is run but the Task’s current counter is not equal to the one that is allowed to be run, the invocation will do nothing. Because flagAsRunning can now fail and a previous commit introduced an opaque return value, there is a split between two cases where flagAsRunning can be called. When the Task is being resumed immediately after suspension without being enqueued again, flagAsRunning is infailable and will always return zero for the opaque value. When initially running from swift_task_run, it is failable and may return an opaque value. I've refactored flagAsRunning to separate these cases and try my best to reduce some redundant code. rdar://160967177 --- include/swift/ABI/MetadataValues.h | 1 + include/swift/ABI/Task.h | 56 ++- stdlib/public/Concurrency/Actor.cpp | 85 +++- stdlib/public/Concurrency/Task.cpp | 12 +- stdlib/public/Concurrency/TaskGroup.cpp | 10 +- stdlib/public/Concurrency/TaskPrivate.h | 484 ++++++++++++++++++----- stdlib/public/Concurrency/TaskStatus.cpp | 100 ++++- 7 files changed, 607 insertions(+), 141 deletions(-) diff --git a/include/swift/ABI/MetadataValues.h b/include/swift/ABI/MetadataValues.h index e3a5630117acd..5eb7c42ad239d 100644 --- a/include/swift/ABI/MetadataValues.h +++ b/include/swift/ABI/MetadataValues.h @@ -2692,6 +2692,7 @@ enum class JobKind : size_t { DefaultActorOverride, NullaryContinuation, IsolatedDeinit, + TaskStealer, }; /// The priority of a job. Higher priorities are larger values. diff --git a/include/swift/ABI/Task.h b/include/swift/ABI/Task.h index a587747658283..538076e1e5c77 100644 --- a/include/swift/ABI/Task.h +++ b/include/swift/ABI/Task.h @@ -430,21 +430,53 @@ class AsyncTask : public Job { /// Generally this should be done immediately after updating /// ActiveTask. /// - /// When Dispatch is used for the default executor: - /// * If the return value is non-zero, it must be passed - /// to swift_dispatch_thread_reset_override_self + /// This function returns two values. The first, boolean, value + /// reports if the Task was successfully marked as running. The second + /// is an opaque value used to later reset some properties of the + /// thread. There are two cases described below for their meaning. + /// + /// When Dispatch is used as the default + /// executor and priority escalation is enabled: + /// * If the opaque value is non-zero, it must be passed + /// to swift_dispatch_thread_reset_override_self. /// before returning to the executor. - /// * If the return value is zero, it may be ignored or passed to + /// * If the opaque value is zero, it may be ignored or passed to /// the aforementioned function (which will ignore values of zero). - /// The current implementation will always return zero - /// if you call flagAsRunning again before calling - /// swift_dispatch_thread_reset_override_self with the - /// initial value. This supports suspending and immediately - /// resuming a Task without returning up the callstack. + /// * This function is failable. If this function returns + /// false, no more setup of the environment should be done + /// and runInFullyEstablishedContext must not be called. + /// * If this function is being called directly from swift_task_run (i.e. + /// not indirect through an AsyncTaskStealer), then removeEnqueued must + /// be true since the Task must be marked as no longer enqueued on either + /// the success or failure path to allow enqueuing the Task directly + /// again. If this function is being called from an AsyncTaskStealer + /// (we did not dequeue the Task directly), then this argument must + /// be false so that the Task's enqueued status is not changed. + /// * If this function is called via an AsyncTaskStealer, the + /// allowedExclusionValue must be the one encoded in the stealer. + /// * Otherwise, when this function is called from + /// the Task directly, allowedExclusionValue must be + /// the value encoded in the Task's private data. + /// * The exclusion value allows both the Task's intrusive link + /// and one or more AsyncTaskStealers to be enqueued at the same + /// time, with different exclusion values, and all but the one + /// most recently enqueued will have this function return false. /// - /// For all other default executors, flagAsRunning - /// will return zero which may be ignored. - uint32_t flagAsRunning(); + /// For all other default executors, or + /// when priority escalation is not enabled: + /// * This function will always return true. + /// * allowedExclusionValue is ignored (and is expected to always be zero). + /// * The opaque return value will always be zero and may be ignored. + std::pair flagAsRunning(uint8_t allowedExclusionValue, bool removeEnqueued); + + /// This variant of flagAsRunning may be called if you are resumming + /// immediately after suspending. That is, you are on the same thread, + /// you have not enqueued onto any executor, and you have not called + /// swift_dispatch_thread_reset_override_self or done any other + /// cleanup work. This is intended for situations such as awaiting + /// where you may mark yourself as suspended but find out during + /// atomic state update that you may actually resume immediately. + void flagAsRunningImmediately(); /// Flag that this task is now suspended with information about what it is /// waiting on. diff --git a/stdlib/public/Concurrency/Actor.cpp b/stdlib/public/Concurrency/Actor.cpp index 915a64d273a17..b82e2bb7ba9a4 100644 --- a/stdlib/public/Concurrency/Actor.cpp +++ b/stdlib/public/Concurrency/Actor.cpp @@ -16,6 +16,7 @@ ///===----------------------------------------------------------------------===/// #include "swift/Runtime/Concurrency.h" +#include "swift/Runtime/DispatchShims.h" #include #include #if __has_feature(ptrauth_calls) @@ -221,26 +222,26 @@ ExecutorTrackingInfo::ActiveInfoInThread; } // end anonymous namespace -void swift::runJobInEstablishedExecutorContext(Job *job) { - _swift_tsan_acquire(job); - SWIFT_TASK_DEBUG_LOG("Run job in established context %p", job); - -#if SWIFT_OBJC_INTEROP - auto pool = objc_autoreleasePoolPush(); -#endif - - if (auto task = dyn_cast(job)) { +/// This function establishes the Task's context and attempts to invoke +/// it. The invocation may fail and the Task may not be run if the +/// passed in exclusion value is not what is in the ActiveTaskStatus +/// during the cas loop to mark the Task as running. If Task +/// priority escalation is not enabled, this will always succeed. +SWIFT_ALWAYS_INLINE +static inline +void taskInvokeWithExclusionValue(AsyncTask *task, + uint8_t allowedStealerExclusionValue, + bool removeEnqueued) { + // Update the task status to say that it's running on the current + // thread. If the task suspends somewhere, it should update the + // task status appropriately; we don't need to update it afterwards. + [[maybe_unused]] + auto [mayRun, dispatchOpaquePriority] = task->flagAsRunning(allowedStealerExclusionValue, removeEnqueued); + if (mayRun) { // Update the active task in the current thread. auto oldTask = ActiveTask::swap(task); - // Update the task status to say that it's running on the - // current thread. If the task suspends somewhere, it should - // update the task status appropriately; we don't need to update - // it afterwards. - [[maybe_unused]] - uint32_t dispatchOpaquePriority = task->flagAsRunning(); - - auto traceHandle = concurrency::trace::job_run_begin(job); + auto traceHandle = concurrency::trace::job_run_begin(task); task->runInFullyEstablishedContext(); concurrency::trace::job_run_end(traceHandle); @@ -251,6 +252,37 @@ void swift::runJobInEstablishedExecutorContext(Job *job) { assert(ActiveTask::get() == nullptr && "active task wasn't cleared before suspending?"); if (oldTask) ActiveTask::set(oldTask); + } +} + +/// Runs the Task embedded in the stealer using the stealer's exclusion value. +/// The stealer holds a reference to the Task which is released here. Stealers +/// are not reference counted so the object is directly destroyed here. +inline void +AsyncTaskStealer::process(Job *_job) { + auto *stealer = cast(_job); + + taskInvokeWithExclusionValue(stealer->Task, stealer->ExclusionValue, false); + + // We are done with the Task at this point so we can release it + swift_release(stealer->Task); + + swift_cxx_deleteObject(stealer); +} + +void swift::runJobInEstablishedExecutorContext(Job *job) { + _swift_tsan_acquire(job); + SWIFT_TASK_DEBUG_LOG("Run job in established context %p", job); + +#if SWIFT_OBJC_INTEROP + auto pool = objc_autoreleasePoolPush(); +#endif + + if (auto task = dyn_cast(job)) { + // taskRemoveEnqueued modifies the TaskActiveStatus so we could plumb + // through the result and avoid an extra reload later. It could also + // be pulled into the cas to flag as running in the successful case + taskInvokeWithExclusionValue(task, task->_private().LocalStealerExclusionValue, true); } else { // There's no extra bookkeeping to do for simple jobs besides swapping in // the voucher. @@ -1504,6 +1536,9 @@ TaskExecutorRef TaskExecutorRef::fromTaskExecutorPreference(Job *job) { if (auto task = dyn_cast(job)) { return task->getPreferredTaskExecutor(); } + if (auto stealer = dyn_cast(job)) { + return stealer->Task->getPreferredTaskExecutor(); + } return TaskExecutorRef::undefined(); } @@ -2741,8 +2776,22 @@ swift_actor_escalate(DefaultActorImpl *actor, AsyncTask *task, JobPriority newPr SWIFT_CC(swift) void swift::swift_executor_escalate(SerialExecutorRef executor, AsyncTask *task, JobPriority newPriority) { + SWIFT_TASK_DEBUG_LOG("Escalating executor %p to %#x", + (void*)executor.getIdentity(), newPriority); if (executor.isGeneric()) { - // TODO (rokhinip): We'd push a stealer job for the task on the executor. + SWIFT_TASK_DEBUG_LOG("Enqueuing stealer for %p on %p", + (void*)task, (void*)executor.getIdentity()); +#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION + // See the comment in enqueueDirectOrSteal + // for why async let Tasks aren't supported + if (!task->Flags.task_isAsyncLetTask()) { + // Even though we are in the "enqueue stealer" path, this could + // enqueue the original Task if another stealer had previously + // been enqueued and still is but the original Task did manage to + // run at some point (while rare, this wouldn't be unexpected) + taskEnqueueDirectOrSteal(task, executor, true); + } +#endif return; } diff --git a/stdlib/public/Concurrency/Task.cpp b/stdlib/public/Concurrency/Task.cpp index 8011df6247211..0fecf3698fb23 100644 --- a/stdlib/public/Concurrency/Task.cpp +++ b/stdlib/public/Concurrency/Task.cpp @@ -133,12 +133,10 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask, waitingTask, this); _swift_tsan_acquire(static_cast(this)); if (suspendedWaiter) { - // This will always return zero because we were just + // This is safe to call because we were just // running this Task so its BasePriority (which is // immutable) should've already been set on the thread. - [[maybe_unused]] - uint32_t opaque = waitingTask->flagAsRunning(); - assert(opaque == 0); + waitingTask->flagAsRunningImmediately(); } // The task is done; we don't need to wait. return queueHead.getStatus(); @@ -1666,11 +1664,9 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) { // we try to tail-call. } while (false); #else - // This will always return zero because we were just running this Task so its + // This is safe to call because we were just running this Task so its // BasePriority (which is immutable) should've already been set on the thread. - [[maybe_unused]] - uint32_t opaque = task->flagAsRunning(); - assert(opaque == 0); + task->flagAsRunningImmediately(); #endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ if (context->isExecutorSwitchForced()) diff --git a/stdlib/public/Concurrency/TaskGroup.cpp b/stdlib/public/Concurrency/TaskGroup.cpp index 2989ffcf89968..239f3c9f1c2d2 100644 --- a/stdlib/public/Concurrency/TaskGroup.cpp +++ b/stdlib/public/Concurrency/TaskGroup.cpp @@ -1817,17 +1817,17 @@ reevaluate_if_taskgroup_has_results:; assumed = TaskGroupStatus{assumedStatus}; continue; // We raced with something, try again. } - SWIFT_TASK_DEBUG_LOG("poll, after CAS: %s", status.to_string().c_str()); +#if !SWIFT_CONCURRENCY_EMBEDDED + SWIFT_TASK_DEBUG_LOG("poll, after CAS: %s", assumed.to_string(this).c_str()); +#endif // We're going back to running the task, so if we suspended before, // we need to flag it as running again. if (hasSuspended) { - // This will always return zero because we were just + // This is safe to call because we were just // running this Task so its BasePriority (which is // immutable) should've already been set on the thread. - [[maybe_unused]] - uint32_t opaque = waitingTask->flagAsRunning(); - assert(opaque == 0); + waitingTask->flagAsRunningImmediately(); } // Success! We are allowed to poll. diff --git a/stdlib/public/Concurrency/TaskPrivate.h b/stdlib/public/Concurrency/TaskPrivate.h index 5e2d353c98a2e..ebfc31cf5a89b 100644 --- a/stdlib/public/Concurrency/TaskPrivate.h +++ b/stdlib/public/Concurrency/TaskPrivate.h @@ -221,6 +221,16 @@ SWIFT_CC(swift) void removeStatusRecord(AsyncTask *task, TaskStatusRecord *record, llvm::function_reffn = nullptr); +/// Similar to the above functions, attempt to remove record from +/// task. If condition returns false, record is considered invalid +/// and won't be attempted to be removed. It may be called multiple +/// times to ensure it is checked atomically with the removal. +SWIFT_CC(swift) +bool removeStatusRecordIf(AsyncTask *task, TaskStatusRecord *record, + ActiveTaskStatus &status, + llvm::function_ref fn, + llvm::function_ref condition); + /// Update the status record by scanning through all records and removing /// those which match the condition. This can also be used to inspect /// "remaining" records. @@ -458,16 +468,13 @@ class alignas(2 * sizeof(void*)) ActiveTaskStatus { /// whether or not it is running. IsRunning = 0x800, #endif - /// Task is intrusively enqueued somewhere - either in the default executor - /// pool, or in an actor. Currently, due to lack of task stealers, this bit - /// is cleared when a task starts running on a thread, suspends or is - /// completed. + /// Task is intrusively enqueued somewhere - either + /// in the default executor pool, or in an actor. /// - /// TODO (rokhinip): Once we have task stealers, this bit refers to the - /// enqueued-ness on the specific queue that the task is linked into and - /// therefore, can only be cleared when the intrusively linkage is cleaned - /// up. The enqueued-ness then becomes orthogonal to the other states of - /// running/suspended/completed. + /// This bit refers to the enqueued-ness on the specific queue that + /// the task is linked into and therefore, can only be cleared when + /// the intrusively linkage is cleaned up. The enqueued-ness is + /// orthogonal to the other states of running/suspended/completed. IsEnqueued = 0x1000, /// Task has been completed. This is purely used to enable an assertion /// that the task is completed when we destroy it. @@ -486,6 +493,20 @@ class alignas(2 * sizeof(void*)) ActiveTaskStatus { /// use the task executor preference when we'd otherwise be running on /// the generic global pool. HasTaskExecutorPreference = 0x8000, + +#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION + /// The Task's intrusive link or a Stealer may only run if its exclusion + /// value is equal to this value. This number increases only when escalating + /// a Task while it is dependent on the Dispatch default global executor. + /// + /// This value is currently set to 8 bits because JobPriority is + /// 8 bits so in theory, that is the most number of times a Task + /// may be escalated in such a way that this value increases. + /// In reality, there are only 5 values of JobPriority so this + /// could just use 3 bits (along with PriorityMask) if needed. + StealerExclusionShift = 16, + StealerExclusionMask = 0xFF0000, +#endif }; // Note: this structure is mirrored by ActiveTaskStatusWithEscalation and @@ -683,7 +704,6 @@ class alignas(2 * sizeof(void*)) ActiveTaskStatus { #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION assert(ExecutionLock == DLOCK_OWNER_NULL); #endif - assert(!isEnqueued()); return ActiveTaskStatus(Record, (Flags & ~PriorityMask) | uintptr_t(priority)); } @@ -699,6 +719,24 @@ class alignas(2 * sizeof(void*)) ActiveTaskStatus { #endif } + uint8_t getStealerExclusionValue() const { +#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION + return (Flags & StealerExclusionMask) >> StealerExclusionShift; +#else + return 0; +#endif + } + + ActiveTaskStatus withStealerExclusionValue(uint8_t value) const { +#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION + auto NewFlags = Flags & ~StealerExclusionMask; + NewFlags |= (value << StealerExclusionShift); + return ActiveTaskStatus(Record, NewFlags, ExecutionLock); +#else + return *this; +#endif + } + ActiveTaskStatus withoutStoredPriorityEscalation() const { #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION return ActiveTaskStatus(Record, Flags & ~IsEscalated, ExecutionLock); @@ -811,11 +849,6 @@ struct AsyncTask::PrivateStorage { /// Currently one word. TaskLocal::Storage Local; - /// The top 32 bits of the task ID. The bottom 32 bits are in Job::Id. - uint32_t Id; - - // Another four bytes of padding here too (on 64-bit) - /// Base priority of Task - set only at creation time of task. /// Current max priority of task is ActiveTaskStatus. /// @@ -831,6 +864,13 @@ struct AsyncTask::PrivateStorage { // The lock used to protect more complicated operations on the task status. RecursiveMutex statusLock; + /// The top 32 bits of the task ID. The bottom 32 bits are in Job::Id. + uint32_t Id; + + uint8_t LocalStealerExclusionValue; + // On 64-bit, there are 3 bytes of padding + // and on 32-bit, there are 3 bytes of padding and 4 available bytes + // Always create an async task with max priority in ActiveTaskStatus = base // priority. It will be updated later if needed. PrivateStorage(JobPriority basePri) @@ -872,7 +912,6 @@ struct AsyncTask::PrivateStorage { // Remove drainer, enqueued and override bit if any auto newStatus = oldStatus.withRunning(false); newStatus = newStatus.withoutStoredPriorityEscalation(); - newStatus = newStatus.withoutEnqueued(); newStatus = newStatus.withComplete(); // This can fail since the task can still get concurrently cancelled or @@ -970,97 +1009,362 @@ inline bool AsyncTask::isCancelled() const { .isCancelled(); } -inline uint32_t AsyncTask::flagAsRunning() { +/// Remove the enqueued bit in the ActiveTaskStatus atomically. +/// This must be done when a Task's intrusive link is dequeued +/// but after reading the local stealer exclusion value. +/// This should not be done from any concurrent context. +static inline void taskRemoveEnqueued(AsyncTask *task) { + auto oldStatus = task->_private()._status().load(std::memory_order_relaxed); + auto newStatus = oldStatus; + do { + // This could be an atomic AND rather than a CAS loop + newStatus = oldStatus.withoutEnqueued(); + } while (!task->_private()._status().compare_exchange_weak(oldStatus, newStatus, + /* success */ std::memory_order_relaxed, + /* failure */ std::memory_order_relaxed)); +} #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION +struct ThreadPriorityManager { dispatch_thread_override_info_s threadOverrideInfo; - threadOverrideInfo = swift_dispatch_thread_get_current_override_qos_floor(); - qos_class_t overrideFloor = threadOverrideInfo.override_qos_floor; - qos_class_t basePriorityCeil = overrideFloor; - qos_class_t taskBasePriority = (qos_class_t) _private().BasePriority; + qos_class_t overrideFloor; + qos_class_t basePriorityCeil; + qos_class_t taskBasePriority; + uint32_t opaquePriority; + + ThreadPriorityManager(AsyncTask const& task) { + threadOverrideInfo = swift_dispatch_thread_get_current_override_qos_floor(); + overrideFloor = threadOverrideInfo.override_qos_floor; + basePriorityCeil = overrideFloor; + taskBasePriority = (qos_class_t) task._private().BasePriority; + opaquePriority = 0; + } + ThreadPriorityManager(ThreadPriorityManager&&) = delete; + + void overrideIfNeeded(JobPriority storedPriority) { + // If the base priority is not equal to the current override + // floor then dispqatch may need to apply the base priority + // to the thread. If the current priority is higher than + // the override floor, then dispatch may need to apply a + // self-override. In either case, call into dispatch to do this. + qos_class_t maxTaskPriority = (qos_class_t)storedPriority; + if (threadOverrideInfo.can_override && (taskBasePriority != basePriorityCeil || maxTaskPriority > overrideFloor)) { + SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task's max priority %#x and base priority %#x", + overrideFloor, maxTaskPriority, taskBasePriority); + + uint32_t previousPriority = swift_dispatch_thread_override_self_with_base(maxTaskPriority, taskBasePriority); + // Dispatch will only return a value if the base priority had to + // be set which should only happen the first time. Let's make sure + // that we don't overwrite it and that our assumption is correct. + if (!opaquePriority) { + opaquePriority = previousPriority; + } else { + assert(previousPriority == 0); + } + overrideFloor = maxTaskPriority; + basePriorityCeil = taskBasePriority; + } + } + + void resetIfNeeded() { + // At this point, we may have run the above update code + // so we might need to undo setting the base priority + if (opaquePriority) { + swift_dispatch_thread_reset_override_self(opaquePriority); + opaquePriority = 0; + } + } +}; +#endif + +static inline uint32_t taskFlagAsRunningWithoutDependency(AsyncTask &task, bool removeEnqueued = false) { +#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION + auto threadPriorityManager = ThreadPriorityManager(task); +#endif + + auto oldStatus = task._private()._status().load(std::memory_order_relaxed); + assert(!oldStatus.isRunning()); + assert(!oldStatus.isComplete()); + // This function isn't meant to be called if the function has + // been enqueued onto an executor since the last suspension + assert(!oldStatus.hasTaskDependency()); + + SWIFT_TASK_DEBUG_LOG("%p->flagAsRunning() with no task dependency", &task); + assert(task._private().dependencyRecord == nullptr); + + while (true) { +#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION + threadPriorityManager.overrideIfNeeded(oldStatus.getStoredPriority()); +#endif + // Set self as executor and remove escalation bit if any - the task's + // priority escalation has already been reflected on the thread. + auto newStatus = oldStatus.withRunning(true); + if (removeEnqueued) { + newStatus = newStatus.withoutEnqueued(); + } + newStatus = newStatus.withoutStoredPriorityEscalation(); + + if (task._private()._status().compare_exchange_weak(oldStatus, newStatus, + /* success */ std::memory_order_relaxed, + /* failure */ std::memory_order_relaxed)) { + newStatus.traceStatusChanged(&task, true, oldStatus.isRunning()); + adoptTaskVoucher(&task); + swift_task_enterThreadLocalContext( + (char *)&task._private().ExclusivityAccessSet[0]); + break; + } + } +#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION + return threadPriorityManager.opaquePriority; +#else + return 0; +#endif +} + +inline void AsyncTask::flagAsRunningImmediately() { + // The intention of this function is to only be called in places where + // the thread is already set up for the correct base priority. If the + // base priority doesn't need to change, dispatch should return zero + +#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION + auto threadPriorityManager = ThreadPriorityManager(*this); + // It shouldn't be possible for newer stealers to have been enqueued but we + // don't know what the original exclusion value was when this task started + // running so we can't double check in an assert that it's still accurate #endif auto oldStatus = _private()._status().load(std::memory_order_relaxed); + assert(!oldStatus.isRunning()); assert(!oldStatus.isComplete()); - uint32_t dispatchOpaquePriority = 0; + // We asserted above that the exclusion value is correct + // for oldStatus so the dependency record is correct if (!oldStatus.hasTaskDependency()) { - SWIFT_TASK_DEBUG_LOG("%p->flagAsRunning() with no task dependency", this); assert(_private().dependencyRecord == nullptr); + [[maybe_unused]] + uint32_t opaque = taskFlagAsRunningWithoutDependency(*this, true); + // In this function, we should always see zero + assert(opaque == 0); + return; + } - while (true) { + // In this function, the dependency record will always be + // accurate and is always a dependency on something other than an + // executor. If it was a dependency on an executor, that would + // make our assumptions about not racing with stealers invalid + auto dependencyRecord = _private().dependencyRecord; + SWIFT_TASK_DEBUG_LOG("[Dependency] %p->flagAsRunning() and remove dependencyRecord %p", + this, dependencyRecord); + // We can't directly assert that dependencyRecord->DependencyKind + // != EnqueuedOnExecutor but that is the expected condition here + + removeStatusRecord(this, dependencyRecord, oldStatus, [&](ActiveTaskStatus oldStatus, + ActiveTaskStatus &newStatus) { #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION - // If the base priority is not equal to the current override floor then - // dispqatch may need to apply the base priority to the thread. If the - // current priority is higher than the override floor, then dispatch may - // need to apply a self-override. In either case, call into dispatch to - // do this. - qos_class_t maxTaskPriority = (qos_class_t) oldStatus.getStoredPriority(); - if (threadOverrideInfo.can_override && (taskBasePriority != basePriorityCeil || maxTaskPriority > overrideFloor)) { - SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x and base priority %#x", - overrideFloor, this, maxTaskPriority, taskBasePriority); - - dispatchOpaquePriority = swift_dispatch_thread_override_self_with_base(maxTaskPriority, taskBasePriority); - overrideFloor = maxTaskPriority; - basePriorityCeil = taskBasePriority; - } + threadPriorityManager.overrideIfNeeded(oldStatus.getStoredPriority()); + assert(threadPriorityManager.opaquePriority == 0); #endif - // Set self as executor and remove escalation bit if any - the task's - // priority escalation has already been reflected on the thread. - auto newStatus = oldStatus.withRunning(true); - newStatus = newStatus.withoutStoredPriorityEscalation(); - newStatus = newStatus.withoutEnqueued(); + // Set self as executor and remove escalation bit if any - the task's + // priority escalation has already been reflected on the thread. + newStatus = newStatus.withRunning(true); + newStatus = newStatus.withoutStoredPriorityEscalation(); + newStatus = newStatus.withoutTaskDependency(); + }); - if (_private()._status().compare_exchange_weak(oldStatus, newStatus, - /* success */ std::memory_order_relaxed, - /* failure */ std::memory_order_relaxed)) { - newStatus.traceStatusChanged(this, true, oldStatus.isRunning()); - adoptTaskVoucher(this); - swift_task_enterThreadLocalContext( - (char *)&_private().ExclusivityAccessSet[0]); - break; - } + this->destroyTaskDependency(dependencyRecord); + + adoptTaskVoucher(this); + swift_task_enterThreadLocalContext( + (char *)&_private().ExclusivityAccessSet[0]); + + return; +} + +inline std::pair AsyncTask::flagAsRunning(uint8_t allowedExclusionValue, bool removeEnqueued) { +#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION + auto threadPriorityManager = ThreadPriorityManager(*this); + + SWIFT_TASK_DEBUG_LOG("%p run by stealer %d, exclusion value %d", this, !removeEnqueued, allowedExclusionValue); + + // Stealers are only enabled with priority escalation + auto oldStatus = _private()._status().load(std::memory_order_relaxed); + if (oldStatus.getStealerExclusionValue() != allowedExclusionValue) { + if (removeEnqueued) taskRemoveEnqueued(this); + return {false, 0}; + } +#else + auto oldStatus = _private()._status().load(std::memory_order_relaxed); +#endif + + assert(!oldStatus.isRunning()); + assert(!oldStatus.isComplete()); + + // We've already checked that the exclusion value is correct for oldStatus + // so whether we have a task dependency is correct. If we do not have + // one, then we haven't been enqueued so no newer stealer can exist + if (!oldStatus.hasTaskDependency()) { + assert(_private().dependencyRecord == nullptr); +#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION + assert(oldStatus.getStealerExclusionValue() == allowedExclusionValue); +#endif + return {true, taskFlagAsRunningWithoutDependency(*this, true)}; + } + + // In this case, we were enqueued so we may + // race with a stealer and have to bail out + + // The dependency record could be invalid here. If the exclusion value + // is allowed, then it will be valid and there will be no races. Thus, + // we can't even assert that the record isn't null here (because this + // value isn't atomic, if it was, we could bail out early if it was null). + auto dependencyRecord = _private().dependencyRecord; + SWIFT_TASK_DEBUG_LOG("[Dependency] %p->flagAsRunning() and remove dependencyRecord %p", + this, dependencyRecord); + + if (!removeStatusRecordIf(this, dependencyRecord, oldStatus, [&](ActiveTaskStatus oldStatus, + ActiveTaskStatus &newStatus) { +#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION + threadPriorityManager.overrideIfNeeded(oldStatus.getStoredPriority()); +#endif + // Set self as executor and remove escalation bit if any - the task's + // priority escalation has already been reflected on the thread. + newStatus = newStatus.withRunning(true); + newStatus = newStatus.withoutStoredPriorityEscalation(); + newStatus = newStatus.withoutTaskDependency(); + if (removeEnqueued) { + newStatus.withoutEnqueued(); } - } else { - auto dependencyRecord = _private().dependencyRecord; - assert(dependencyRecord != nullptr); - SWIFT_TASK_DEBUG_LOG("[Dependency] %p->flagAsRunning() and remove dependencyRecord %p", - this, dependencyRecord); + }, [&](ActiveTaskStatus toCheck) { + return toCheck.getStealerExclusionValue() == allowedExclusionValue; + })) { + // Status record not removed +#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION + threadPriorityManager.resetIfNeeded(); +#endif + if (removeEnqueued) { + taskRemoveEnqueued(this); + } + return {false, 0}; + } + + // Because we got here, we did not bail out early. Thus, the exclusion + // value is allowed meaning that the dependencyRecord that we + // obtained is correct and has been removed and may now be destroyed + this->destroyTaskDependency(dependencyRecord); - removeStatusRecord(this, dependencyRecord, oldStatus, [&](ActiveTaskStatus unused, - ActiveTaskStatus& newStatus) { + adoptTaskVoucher(this); + swift_task_enterThreadLocalContext( + (char *)&_private().ExclusivityAccessSet[0]); #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION - // If the base priority is not equal to the current override floor then - // dispqatch may need to apply the base priority to the thread. If the - // current priority is higher than the override floor, then dispatch may - // need to apply a self-override. In either case, call into dispatch to - // do this. - qos_class_t maxTaskPriority = (qos_class_t) oldStatus.getStoredPriority(); - if (threadOverrideInfo.can_override && (taskBasePriority != basePriorityCeil || maxTaskPriority > overrideFloor)) { - SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x and base priority %#x", - overrideFloor, this, maxTaskPriority, taskBasePriority); - - dispatchOpaquePriority = swift_dispatch_thread_override_self_with_base(maxTaskPriority, taskBasePriority); - overrideFloor = maxTaskPriority; - basePriorityCeil = taskBasePriority; - } + return {true, threadPriorityManager.opaquePriority}; +#else + return {true, 0}; #endif - // Set self as executor and remove escalation bit if any - the task's - // priority escalation has already been reflected on the thread. - newStatus = newStatus.withRunning(true); - newStatus = newStatus.withoutStoredPriorityEscalation(); - newStatus = newStatus.withoutEnqueued(); - newStatus = newStatus.withoutTaskDependency(); - }); - this->destroyTaskDependency(dependencyRecord); +} + +/// A Job that acts as a proxy for a Task when the Task itself can't be +/// enqueued. Holds a reference to the Task and an exclusion value that causes +/// this Job to do nothing if it doesn't match what's in the Task's Status. +class AsyncTaskStealer : public Job { +public: + AsyncTask *Task; + uint8_t ExclusionValue; + + AsyncTaskStealer(AsyncTask *task, JobPriority priority, uint8_t exclusionValue) + : Job({JobKind::TaskStealer, priority}, &process), + Task(static_cast(swift_retain(task))), + ExclusionValue{exclusionValue} {} + + // Implemented in Actor.cpp to make use of taskInvokeWithExclusionValue + SWIFT_CC(swiftasync) + static void process(Job *job); - adoptTaskVoucher(this); - swift_task_enterThreadLocalContext( - (char *)&_private().ExclusivityAccessSet[0]); + static bool classof(const Job *job) { + return job->Flags.getKind() == JobKind::TaskStealer; } - return dispatchOpaquePriority; +}; + +/// Enqueue this task on the provided executor either directly or with a +/// stealer. This must be called rather than swift_task_enqueue in case +/// the Task is already directly enqueued. This may only be called when +/// TaskStatus is locked or otherwise no concurrent calls may be made +static inline void +taskEnqueueDirectOrSteal(AsyncTask *task, SerialExecutorRef newExecutor, + bool updateStealerExclusionValue) { +#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION + SWIFT_TASK_DEBUG_LOG("Starting enqueue for %p on %p", + (void*)task, (void*)newExecutor.getIdentity()); + auto oldStatus = task->_private()._status().load(std::memory_order_relaxed); + // This is safe to access because we either have the + // status locked or aren't dependent on an executor + auto needsStealer = oldStatus.isEnqueued(); + // Async let tasks allocate within their parent's allocator so they may + // not outlive their parent which is possible if a stealer is introduced + auto allowsStealer = !task->Flags.task_isAsyncLetTask(); + // needsStealer implies allowsStealer + assert(!needsStealer || allowsStealer); + + auto updateStatus = (updateStealerExclusionValue || !needsStealer); + + ActiveTaskStatus newStatus = oldStatus; + // It shouldn't be possible to race here (i.e. none of the values we are + // modifying should be modified elsewhere, only different parts of the + // status could be modified) + do { + newStatus = oldStatus; + if (updateStealerExclusionValue) { + auto currentStealerExclusionValue = oldStatus.getStealerExclusionValue(); + auto newStealerExclusionValue = currentStealerExclusionValue; + if (__builtin_add_overflow(currentStealerExclusionValue, 1, + &newStealerExclusionValue)) { + assert(false && "Somehow overflowed stealer exclusion value"); + } + SWIFT_TASK_DEBUG_LOG("Updating exclusion value from %d to %d", + currentStealerExclusionValue, newStealerExclusionValue); + + newStatus = newStatus.withStealerExclusionValue(newStealerExclusionValue); + } + + if (!oldStatus.isEnqueued()) { + newStatus = newStatus.withEnqueued(); + needsStealer = false; + } else { + needsStealer = true; + } + + SWIFT_TASK_DEBUG_LOG( + "Needs to update based on %d || %d. Needs stealer %d. Exclusion value is %d", + updateStealerExclusionValue, !oldStatus.isEnqueued(), needsStealer, + newStatus.getStealerExclusionValue()); + // This can always be relaxed because it only needs to be read + // either by a thread syncronizing with the status lock or someone + // reading the stealer which we will later publish on this thread. + } while ((updateStealerExclusionValue || !oldStatus.isEnqueued()) && + !task->_private()._status().compare_exchange_weak( + oldStatus, newStatus, + /*success*/ std::memory_order_relaxed, + /*failure*/ std::memory_order_relaxed)); + + SWIFT_TASK_DEBUG_LOG("Update value: %d, needsStealer: %d", + updateStealerExclusionValue, needsStealer); + + if (needsStealer) { + auto *stealer = swift_cxx_newObject( + task, static_cast(newStatus.getStoredPriority()), + newStatus.getStealerExclusionValue()); + SWIFT_TASK_DEBUG_LOG( + "Enqueuing stealer %p at priority %#x with exclusion value %d", + (void*)stealer, newStatus.getStoredPriority(), stealer->ExclusionValue); + swift_task_enqueue(stealer, newExecutor); + } else { + task->_private().LocalStealerExclusionValue = newStatus.getStealerExclusionValue(); + swift_task_enqueue(task, newExecutor); + } +#else + swift_task_enqueue(task, newExecutor); +#endif } /// TODO (rokhinip): We need the handoff of the thread to the next executor to @@ -1086,7 +1390,6 @@ AsyncTask::flagAsAndEnqueueOnExecutor(SerialExecutorRef newExecutor) { assert(false && "Should not enqueue any tasks to execute in task-to-thread model"); #else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ auto oldStatus = _private()._status().load(std::memory_order_relaxed); - assert(!oldStatus.isEnqueued()); if (!oldStatus.isRunning() && oldStatus.hasTaskDependency()) { // Task went from suspended --> enqueued and has a previous @@ -1109,7 +1412,6 @@ AsyncTask::flagAsAndEnqueueOnExecutor(SerialExecutorRef newExecutor) { // Remove escalation bits + set enqueued bit newStatus = newStatus.withoutStoredPriorityEscalation(); - newStatus = newStatus.withEnqueued(); assert(newStatus.hasTaskDependency()); }); } else { @@ -1131,7 +1433,6 @@ AsyncTask::flagAsAndEnqueueOnExecutor(SerialExecutorRef newExecutor) { newStatus = newStatus.withRunning(false); newStatus = newStatus.withoutStoredPriorityEscalation(); - newStatus = newStatus.withEnqueued(); newStatus = newStatus.withTaskDependency(); return true; @@ -1160,7 +1461,10 @@ AsyncTask::flagAsAndEnqueueOnExecutor(SerialExecutorRef newExecutor) { Flags.task_isFuture(), Flags.task_isGroupChildTask(), Flags.task_isAsyncLetTask()); - swift_task_enqueue(this, newExecutor); + // Even though we are not in the "enqueue stealer" path, this + // may still enqueue a stealer because we may have previously + // run from a stealer and the original Task is still enqueued. + taskEnqueueDirectOrSteal(this, newExecutor, false); #endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ } @@ -1174,7 +1478,7 @@ void AsyncTask::flagAsSuspended(TaskDependencyStatusRecord *dependencyStatusReco auto oldStatus = _private()._status().load(std::memory_order_relaxed); // We can only be suspended if we were previously running. See state // transitions listed out in Task.h - assert(oldStatus.isRunning() && !oldStatus.isEnqueued()); + assert(oldStatus.isRunning()); addStatusRecord(this, dependencyStatusRecord, oldStatus, [&](ActiveTaskStatus unused, ActiveTaskStatus &newStatus) { diff --git a/stdlib/public/Concurrency/TaskStatus.cpp b/stdlib/public/Concurrency/TaskStatus.cpp index c4c5d849161c9..cfc21a48b3d4c 100644 --- a/stdlib/public/Concurrency/TaskStatus.cpp +++ b/stdlib/public/Concurrency/TaskStatus.cpp @@ -312,6 +312,90 @@ void swift::removeStatusRecord(AsyncTask *task, TaskStatusRecord *record, } } +// For when we are trying to remove a record but can only do +// so if a certain condition of the active task status is true +SWIFT_CC(swift) +bool swift::removeStatusRecordIf(AsyncTask *task, TaskStatusRecord *record, + ActiveTaskStatus &oldStatus, + llvm::function_ref fn, + llvm::function_ref condition) { + + SWIFT_TASK_DEBUG_LOG("remove status record = %p, from task = %p", + record, task); + + bool didRemove = false; + while (true) { + // If the record is locked, then either we wait for the lock or we own the + // lock. Either way, acquire the status record lock and perform the removal. + // If the record to be removed is not the innermost record, then we need to + // acquire the lock to safely remove it. + if (oldStatus.isStatusRecordLocked() || + oldStatus.getInnermostRecord() != record) { + withStatusRecordLock( + task, oldStatus, + [&](ActiveTaskStatus lockedStatus) { + // Now that we have locked the status, evaluate the condition + if (!condition(lockedStatus)) { + return; + } + didRemove = true; + // If the record is the innermost (always was, or became that way + // while we waited) then we have to remove it in the status change + // function, since changing the head of the list requires changing + // the status. If it's not the innermost then we can remove it here. + if (lockedStatus.getInnermostRecord() != record) { + removeNonInnermostStatusRecordLocked(lockedStatus, record); + } + }, + [&](ActiveTaskStatus oldStatus, ActiveTaskStatus &newStatus) { + // If the condition allows us to remove + // the record, update the ActiveTaskStatus + if (didRemove) { + // If this record is the innermost record, + // set a new status with the record removed. + if (newStatus.getInnermostRecord() == record) + newStatus = newStatus.withInnermostRecord(record->getParent()); + + // Requested status updates + if (fn) { + fn(oldStatus, newStatus); + } + } + }); + // Taking the lock never requires a retry + break; + } + + // Nobody holds the lock, and the record is the + // innermost record. Attempt to remove it locklessly. + if (!condition(oldStatus)) { + // Condition has decided we no longer want to attempt removal + break; + } + + // Remove the record + auto newStatus = oldStatus.withInnermostRecord(record->getParent()); + + // Requested status updates + if (fn) { + fn(oldStatus, newStatus); + } + + if (task->_private()._status().compare_exchange_weak( + oldStatus, newStatus, + /*success*/ std::memory_order_relaxed, + /*failure*/ std::memory_order_relaxed)) { + newStatus.traceStatusChanged(task, false, oldStatus.isRunning()); + didRemove = true; + break; + } + + // We failed to remove the record locklessly. Go back to the top and retry + // removing it. + } + return didRemove; +} + // For when we are trying to remove a record and also optionally trying to // modify some flags in the ActiveTaskStatus at the same time. SWIFT_CC(swift) @@ -1000,7 +1084,7 @@ static swift_task_escalateImpl(AsyncTask *task, JobPriority newPriority) { return oldPriority; } - if (oldStatus.isRunning() || oldStatus.isEnqueued()) { + if (oldStatus.isRunning()) { // Regardless of whether status record is locked or not, update the // priority and RO bit on the task status newStatus = oldStatus.withEscalatedPriority(newPriority); @@ -1040,14 +1124,14 @@ static swift_task_escalateImpl(AsyncTask *task, JobPriority newPriority) { } else if (newStatus.isEnqueued()) { // Task is not running, it's enqueued somewhere waiting to be run // - // TODO (rokhinip): Add a stealer to escalate the thread request for - // the task. Still mark the task has having been escalated so that the - // thread will self override when it starts draining the task + // When tasks are enqueued on any executor, they will have an + // EnqueuedOnExecutor TaskDependencyStatusRecord which will have + // performEscalationAction called on it below. This will call + // swift_executor_escalate which will enqueue a stealer if needed. // - // TODO (rokhinip): Add a signpost to flag that this is a potential - // priority inversion + // Still mark the task has having been escalated so that the + // thread will self override when it starts draining the task SWIFT_TASK_DEBUG_LOG("[Override] Escalating %p which is enqueued", task); - } if (newStatus.getInnermostRecord() == NULL) { @@ -1094,7 +1178,7 @@ void TaskDependencyStatusRecord::performEscalationAction( break; case EnqueuedOnExecutor: SWIFT_TASK_DEBUG_LOG("[Dependency] Escalate dependent executor %p noted in %p record from %#x to %#x", - this->DependentOn.Executor, this, oldPriority, newPriority); + (void*)this->DependentOn.Executor.getIdentity(), (void*)this, oldPriority, newPriority); swift_executor_escalate(this->DependentOn.Executor, this->WaitingTask, newPriority); break; }