Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/swift/ABI/MetadataValues.h
Original file line number Diff line number Diff line change
Expand Up @@ -2692,6 +2692,7 @@ enum class JobKind : size_t {
DefaultActorOverride,
NullaryContinuation,
IsolatedDeinit,
TaskStealer,
};

/// The priority of a job. Higher priorities are larger values.
Expand Down
56 changes: 44 additions & 12 deletions include/swift/ABI/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -430,21 +430,53 @@ class AsyncTask : public Job {
/// Generally this should be done immediately after updating
/// ActiveTask.
///
/// When Dispatch is used for the default executor:
/// * If the return value is non-zero, it must be passed
/// to swift_dispatch_thread_reset_override_self
/// This function returns two values. The first, boolean, value
/// reports if the Task was successfully marked as running. The second
/// is an opaque value used to later reset some properties of the
/// thread. There are two cases described below for their meaning.
///
/// When Dispatch is used as the default
/// executor and priority escalation is enabled:
/// * If the opaque value is non-zero, it must be passed
/// to swift_dispatch_thread_reset_override_self.
/// before returning to the executor.
/// * If the return value is zero, it may be ignored or passed to
/// * If the opaque value is zero, it may be ignored or passed to
/// the aforementioned function (which will ignore values of zero).
/// The current implementation will always return zero
/// if you call flagAsRunning again before calling
/// swift_dispatch_thread_reset_override_self with the
/// initial value. This supports suspending and immediately
/// resuming a Task without returning up the callstack.
/// * This function is failable. If this function returns
/// false, no more setup of the environment should be done
/// and runInFullyEstablishedContext must not be called.
/// * If this function is being called directly from swift_task_run (i.e.
/// not indirect through an AsyncTaskStealer), then removeEnqueued must
/// be true since the Task must be marked as no longer enqueued on either
/// the success or failure path to allow enqueuing the Task directly
/// again. If this function is being called from an AsyncTaskStealer
/// (we did not dequeue the Task directly), then this argument must
/// be false so that the Task's enqueued status is not changed.
/// * If this function is called via an AsyncTaskStealer, the
/// allowedExclusionValue must be the one encoded in the stealer.
/// * Otherwise, when this function is called from
/// the Task directly, allowedExclusionValue must be
/// the value encoded in the Task's private data.
/// * The exclusion value allows both the Task's intrusive link
/// and one or more AsyncTaskStealers to be enqueued at the same
/// time, with different exclusion values, and all but the one
/// most recently enqueued will have this function return false.
///
/// For all other default executors, flagAsRunning
/// will return zero which may be ignored.
uint32_t flagAsRunning();
/// For all other default executors, or
/// when priority escalation is not enabled:
/// * This function will always return true.
/// * allowedExclusionValue is ignored (and is expected to always be zero).
/// * The opaque return value will always be zero and may be ignored.
std::pair<bool, uint32_t> flagAsRunning(uint8_t allowedExclusionValue, bool removeEnqueued);

/// This variant of flagAsRunning may be called if you are resumming
/// immediately after suspending. That is, you are on the same thread,
/// you have not enqueued onto any executor, and you have not called
/// swift_dispatch_thread_reset_override_self or done any other
/// cleanup work. This is intended for situations such as awaiting
/// where you may mark yourself as suspended but find out during
/// atomic state update that you may actually resume immediately.
void flagAsRunningImmediately();

/// Flag that this task is now suspended with information about what it is
/// waiting on.
Expand Down
85 changes: 67 additions & 18 deletions stdlib/public/Concurrency/Actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
///===----------------------------------------------------------------------===///

#include "swift/Runtime/Concurrency.h"
#include "swift/Runtime/DispatchShims.h"
#include <atomic>
#include <new>
#if __has_feature(ptrauth_calls)
Expand Down Expand Up @@ -221,26 +222,26 @@ ExecutorTrackingInfo::ActiveInfoInThread;

} // end anonymous namespace

void swift::runJobInEstablishedExecutorContext(Job *job) {
_swift_tsan_acquire(job);
SWIFT_TASK_DEBUG_LOG("Run job in established context %p", job);

#if SWIFT_OBJC_INTEROP
auto pool = objc_autoreleasePoolPush();
#endif

if (auto task = dyn_cast<AsyncTask>(job)) {
/// This function establishes the Task's context and attempts to invoke
/// it. The invocation may fail and the Task may not be run if the
/// passed in exclusion value is not what is in the ActiveTaskStatus
/// during the cas loop to mark the Task as running. If Task
/// priority escalation is not enabled, this will always succeed.
SWIFT_ALWAYS_INLINE
static inline
void taskInvokeWithExclusionValue(AsyncTask *task,
uint8_t allowedStealerExclusionValue,
bool removeEnqueued) {
// Update the task status to say that it's running on the current
// thread. If the task suspends somewhere, it should update the
// task status appropriately; we don't need to update it afterwards.
[[maybe_unused]]
auto [mayRun, dispatchOpaquePriority] = task->flagAsRunning(allowedStealerExclusionValue, removeEnqueued);
if (mayRun) {
// Update the active task in the current thread.
auto oldTask = ActiveTask::swap(task);

// Update the task status to say that it's running on the
// current thread. If the task suspends somewhere, it should
// update the task status appropriately; we don't need to update
// it afterwards.
[[maybe_unused]]
uint32_t dispatchOpaquePriority = task->flagAsRunning();

auto traceHandle = concurrency::trace::job_run_begin(job);
auto traceHandle = concurrency::trace::job_run_begin(task);
task->runInFullyEstablishedContext();
concurrency::trace::job_run_end(traceHandle);

Expand All @@ -251,6 +252,37 @@ void swift::runJobInEstablishedExecutorContext(Job *job) {
assert(ActiveTask::get() == nullptr &&
"active task wasn't cleared before suspending?");
if (oldTask) ActiveTask::set(oldTask);
}
}

/// Runs the Task embedded in the stealer using the stealer's exclusion value.
/// The stealer holds a reference to the Task which is released here. Stealers
/// are not reference counted so the object is directly destroyed here.
inline void
AsyncTaskStealer::process(Job *_job) {
auto *stealer = cast<AsyncTaskStealer>(_job);

taskInvokeWithExclusionValue(stealer->Task, stealer->ExclusionValue, false);

// We are done with the Task at this point so we can release it
swift_release(stealer->Task);

swift_cxx_deleteObject(stealer);
}

void swift::runJobInEstablishedExecutorContext(Job *job) {
_swift_tsan_acquire(job);
SWIFT_TASK_DEBUG_LOG("Run job in established context %p", job);

#if SWIFT_OBJC_INTEROP
auto pool = objc_autoreleasePoolPush();
#endif

if (auto task = dyn_cast<AsyncTask>(job)) {
// taskRemoveEnqueued modifies the TaskActiveStatus so we could plumb
// through the result and avoid an extra reload later. It could also
// be pulled into the cas to flag as running in the successful case
taskInvokeWithExclusionValue(task, task->_private().LocalStealerExclusionValue, true);
} else {
// There's no extra bookkeeping to do for simple jobs besides swapping in
// the voucher.
Expand Down Expand Up @@ -1504,6 +1536,9 @@ TaskExecutorRef TaskExecutorRef::fromTaskExecutorPreference(Job *job) {
if (auto task = dyn_cast<AsyncTask>(job)) {
return task->getPreferredTaskExecutor();
}
if (auto stealer = dyn_cast<AsyncTaskStealer>(job)) {
return stealer->Task->getPreferredTaskExecutor();
}
return TaskExecutorRef::undefined();
}

Expand Down Expand Up @@ -2741,8 +2776,22 @@ swift_actor_escalate(DefaultActorImpl *actor, AsyncTask *task, JobPriority newPr
SWIFT_CC(swift)
void swift::swift_executor_escalate(SerialExecutorRef executor, AsyncTask *task,
JobPriority newPriority) {
SWIFT_TASK_DEBUG_LOG("Escalating executor %p to %#x",
(void*)executor.getIdentity(), newPriority);
if (executor.isGeneric()) {
// TODO (rokhinip): We'd push a stealer job for the task on the executor.
SWIFT_TASK_DEBUG_LOG("Enqueuing stealer for %p on %p",
(void*)task, (void*)executor.getIdentity());
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
// See the comment in enqueueDirectOrSteal
// for why async let Tasks aren't supported
if (!task->Flags.task_isAsyncLetTask()) {
// Even though we are in the "enqueue stealer" path, this could
// enqueue the original Task if another stealer had previously
// been enqueued and still is but the original Task did manage to
// run at some point (while rare, this wouldn't be unexpected)
taskEnqueueDirectOrSteal(task, executor, true);
}
#endif
return;
}

Expand Down
12 changes: 4 additions & 8 deletions stdlib/public/Concurrency/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,10 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
waitingTask, this);
_swift_tsan_acquire(static_cast<Job *>(this));
if (suspendedWaiter) {
// This will always return zero because we were just
// This is safe to call because we were just
// running this Task so its BasePriority (which is
// immutable) should've already been set on the thread.
[[maybe_unused]]
uint32_t opaque = waitingTask->flagAsRunning();
assert(opaque == 0);
waitingTask->flagAsRunningImmediately();
}
// The task is done; we don't need to wait.
return queueHead.getStatus();
Expand Down Expand Up @@ -1666,11 +1664,9 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
// we try to tail-call.
} while (false);
#else
// This will always return zero because we were just running this Task so its
// This is safe to call because we were just running this Task so its
// BasePriority (which is immutable) should've already been set on the thread.
[[maybe_unused]]
uint32_t opaque = task->flagAsRunning();
assert(opaque == 0);
task->flagAsRunningImmediately();
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */

if (context->isExecutorSwitchForced())
Expand Down
10 changes: 5 additions & 5 deletions stdlib/public/Concurrency/TaskGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1817,17 +1817,17 @@ reevaluate_if_taskgroup_has_results:;
assumed = TaskGroupStatus{assumedStatus};
continue; // We raced with something, try again.
}
SWIFT_TASK_DEBUG_LOG("poll, after CAS: %s", status.to_string().c_str());
#if !SWIFT_CONCURRENCY_EMBEDDED
SWIFT_TASK_DEBUG_LOG("poll, after CAS: %s", assumed.to_string(this).c_str());
#endif

// We're going back to running the task, so if we suspended before,
// we need to flag it as running again.
if (hasSuspended) {
// This will always return zero because we were just
// This is safe to call because we were just
// running this Task so its BasePriority (which is
// immutable) should've already been set on the thread.
[[maybe_unused]]
uint32_t opaque = waitingTask->flagAsRunning();
assert(opaque == 0);
waitingTask->flagAsRunningImmediately();
}

// Success! We are allowed to poll.
Expand Down
Loading