Skip to content

concurrency: alloc an async-let task with the parent's allocator. #36993

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion include/swift/ABI/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ class TaskGroup;
extern FullMetadata<DispatchClassMetadata> jobHeapMetadata;

/// A schedulable job.
class alignas(2 * alignof(void*)) Job : public HeapObject {
class alignas(2 * alignof(void*)) Job :
// For async-let tasks, the refcount bits are initialized as "immortal"
// because such a task is allocated with the parent's stack allocator.
public HeapObject {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, thanks for the comment!

public:
// Indices into SchedulerPrivate, for use by the runtime.
enum {
Expand Down Expand Up @@ -88,6 +91,14 @@ class alignas(2 * alignof(void*)) Job : public HeapObject {
assert(isAsyncTask() && "wrong constructor for a non-task job");
}

/// Create a job with "immortal" reference counts.
/// Used for async let tasks.
Job(JobFlags flags, TaskContinuationFunction *invoke,
const HeapMetadata *metadata, InlineRefCounts::Immortal_t immortal)
: HeapObject(metadata, immortal), Flags(flags), ResumeTask(invoke) {
assert(isAsyncTask() && "wrong constructor for a non-task job");
}

bool isAsyncTask() const {
return Flags.isAsyncTask();
}
Expand Down Expand Up @@ -201,6 +212,21 @@ class AsyncTask : public Job {
assert(flags.isAsyncTask());
}

/// Create a task with "immortal" reference counts.
/// Used for async let tasks.
AsyncTask(const HeapMetadata *metadata, InlineRefCounts::Immortal_t immortal,
JobFlags flags,
TaskContinuationFunction *run,
AsyncContext *initialContext)
: Job(flags, run, metadata, immortal),
ResumeContext(initialContext),
Status(ActiveTaskStatus()),
Local(TaskLocal::Storage()) {
assert(flags.isAsyncTask());
}

~AsyncTask();

/// Given that we've already fully established the job context
/// in the current thread, start running this task. To establish
/// the job context correctly, call swift_job_run or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ OVERRIDE_TASK(task_create_group_future_common, AsyncTaskAndContext, , , ,
(JobFlags flags, TaskGroup *group,
const Metadata *futureResultType,
FutureAsyncSignature::FunctionType *function,
void *closureContext, bool owningClosureContext,
void *closureContext, bool isAsyncLetTask,
size_t initialContextSize),
(flags, group, futureResultType, function, closureContext,
owningClosureContext, initialContextSize))
isAsyncLetTask, initialContextSize))

OVERRIDE_TASK(task_future_wait, void, SWIFT_EXPORT_FROM(swift_Concurrency),
SWIFT_CC(swiftasync), swift::,
Expand Down
11 changes: 8 additions & 3 deletions stdlib/public/Concurrency/AsyncLet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,13 @@ static void swift_asyncLet_startImpl(AsyncLet *alet,
flags.task_setIsFuture(true);
flags.task_setIsChildTask(true);

auto childTaskAndContext = swift_task_create_future_no_escaping(
auto childTaskAndContext = swift_task_create_async_let_future(
flags,
futureResultType,
closureEntryPoint,
closureContext);

AsyncTask *childTask = childTaskAndContext.Task;
swift_retain(childTask);

assert(childTask->isFuture());
assert(childTask->hasChildFragment());
Expand Down Expand Up @@ -165,7 +164,13 @@ static void swift_asyncLet_endImpl(AsyncLet *alet) {
// TODO: we need to implicitly await either before the end or here somehow.

// and finally, release the task and free the async-let
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// and finally, release the task and free the async-let
// and finally deallocate the async-let

no releasing anymore (yay!)

swift_release(task);
AsyncTask *parent = swift_task_getCurrent();
assert(parent && "async-let must have a parent task");

#if SWIFT_TASK_PRINTF_DEBUG
fprintf(stderr, "[%p] async let end of task %p, parent: %p\n", pthread_self(), task, parent);
#endif
_swift_task_dealloc_specific(parent, task);
}

// =============================================================================
Expand Down
128 changes: 93 additions & 35 deletions stdlib/public/Concurrency/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,31 @@ static void destroyJob(SWIFT_CONTEXT HeapObject *obj) {
assert(false && "A non-task job should never be destroyed as heap metadata.");
}

SWIFT_CC(swift)
static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
auto task = static_cast<AsyncTask*>(obj);
AsyncTask::~AsyncTask() {
// For a future, destroy the result.
if (task->isFuture()) {
task->futureFragment()->destroy();
if (isFuture()) {
futureFragment()->destroy();
}

// Release any objects potentially held as task local values.
task->Local.destroy(task);
Local.destroy(this);
}

SWIFT_CC(swift)
static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
auto task = static_cast<AsyncTask*>(obj);

task->~AsyncTask();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no idea if doing a destructor is nicer... am I missing something?
so far we've mostly had all destroyX things explicitly (but I'm by no means C++ or this codebase guru...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a matter of taste


// The task execution itself should always hold a reference to it, so
// if we get here, we know the task has finished running, which means
// swift_task_complete should have been run, which will have torn down
// the task-local allocator. There's actually nothing else to clean up
// here.

#if SWIFT_TASK_PRINTF_DEBUG
fprintf(stderr, "[%p] destroy task %p\n", pthread_self(), task);
#endif
free(task);
}

Expand Down Expand Up @@ -250,13 +258,9 @@ static FullMetadata<DispatchClassMetadata> taskHeapMetadata = {
const void *const swift::_swift_concurrency_debug_asyncTaskMetadata =
static_cast<Metadata *>(&taskHeapMetadata);

/// The function that we put in the context of a simple task
/// to handle the final return.
SWIFT_CC(swiftasync)
static void completeTask(SWIFT_ASYNC_CONTEXT AsyncContext *context,
SWIFT_CONTEXT SwiftError *error) {
// Set that there's no longer a running task in the current thread.
auto task = _swift_task_clearCurrent();
static void completeTaskImpl(AsyncTask *task,
AsyncContext *context,
SwiftError *error) {
assert(task && "completing task, but there is no active task registered");

// Store the error result.
Expand All @@ -277,15 +281,41 @@ static void completeTask(SWIFT_ASYNC_CONTEXT AsyncContext *context,
#endif

// Complete the future.
// Warning: This deallocates the task in case it's an async let task.
// The task must not be accessed afterwards.
if (task->isFuture()) {
task->completeFuture(context);
}

// TODO: set something in the status?
if (task->hasChildFragment()) {
// if (task->hasChildFragment()) {
// TODO: notify the parent somehow?
// TODO: remove this task from the child-task chain?
}
// }
}

/// The function that we put in the context of a simple task
/// to handle the final return.
SWIFT_CC(swiftasync)
static void completeTask(SWIFT_ASYNC_CONTEXT AsyncContext *context,
SWIFT_CONTEXT SwiftError *error) {
// Set that there's no longer a running task in the current thread.
auto task = _swift_task_clearCurrent();
assert(task && "completing task, but there is no active task registered");

completeTaskImpl(task, context, error);
}

/// The function that we put in the context of a simple task
/// to handle the final return.
SWIFT_CC(swiftasync)
static void completeTaskAndRelease(SWIFT_ASYNC_CONTEXT AsyncContext *context,
SWIFT_CONTEXT SwiftError *error) {
// Set that there's no longer a running task in the current thread.
auto task = _swift_task_clearCurrent();
assert(task && "completing task, but there is no active task registered");

completeTaskImpl(task, context, error);

// Release the task, balancing the retain that a running task has on itself.
// If it was a group child task, it will remain until the group returns it.
Expand All @@ -304,7 +334,7 @@ static void completeTaskWithClosure(SWIFT_ASYNC_CONTEXT AsyncContext *context,
swift_release((HeapObject *)asyncContextPrefix->closureContext);

// Clean up the rest of the task.
return completeTask(context, error);
return completeTaskAndRelease(context, error);
}

SWIFT_CC(swiftasync)
Expand Down Expand Up @@ -332,11 +362,16 @@ static void task_wait_throwing_resume_adapter(SWIFT_ASYNC_CONTEXT AsyncContext *
}

/// All `swift_task_create*` variants funnel into this common implementation.
///
/// If \p isAsyncLetTask is true, the \p closureContext is not heap allocated,
/// but stack-allocated (and must not be ref-counted).
/// Also, async-let tasks are not heap allcoated, but allcoated with the parent
/// task's stack allocator.
static AsyncTaskAndContext swift_task_create_group_future_commonImpl(
JobFlags flags, TaskGroup *group,
const Metadata *futureResultType,
FutureAsyncSignature::FunctionType *function,
void *closureContext, bool owningClosureContext,
void *closureContext, bool isAsyncLetTask,
size_t initialContextSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ABI wise my impression was we wanted something like [TaskCreateRecords] to be passed in and e.g. check their .kind == .isAsyncLet or similar to be future proof about adding more smarts to creating tasks.

But maybe we want to do that in a separate PR? (totally find by me to do later on!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, let's do that in a separate PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okey 👍

assert((futureResultType != nullptr) == flags.task_isFuture());
assert(!flags.task_isFuture() ||
Expand Down Expand Up @@ -378,10 +413,19 @@ static AsyncTaskAndContext swift_task_create_group_future_commonImpl(

assert(amountToAllocate % MaximumAlignment == 0);

// TODO: allow optionally passing in an allocation+sizeOfIt to reuse for the task
// if the necessary space is enough, we can initialize into it rather than malloc.
// this would allow us to stack-allocate async-let related tasks.
void *allocation = malloc(amountToAllocate);
constexpr unsigned initialSlabSize = 512;

void *allocation = nullptr;
if (isAsyncLetTask) {
assert(parent);
allocation = _swift_task_alloc_specific(parent,
amountToAllocate + initialSlabSize);
} else {
allocation = malloc(amountToAllocate);
}
#if SWIFT_TASK_PRINTF_DEBUG
fprintf(stderr, "[%p] allocate task %p, parent = %p\n", pthread_self(), allocation, parent);
#endif

AsyncContext *initialContext =
reinterpret_cast<AsyncContext*>(
Expand Down Expand Up @@ -417,9 +461,17 @@ static AsyncTaskAndContext swift_task_create_group_future_commonImpl(

// Initialize the task so that resuming it will run the given
// function on the initial context.
AsyncTask *task =
new(allocation) AsyncTask(&taskHeapMetadata, flags,
function, initialContext);
AsyncTask *task = nullptr;
if (isAsyncLetTask) {
// Initialize the refcount bits to "immortal", so that
// ARC operations don't have any effect on the task.
task = new(allocation) AsyncTask(&taskHeapMetadata,
InlineRefCounts::Immortal, flags,
function, initialContext);
} else {
task = new(allocation) AsyncTask(&taskHeapMetadata, flags,
function, initialContext);
}

// Initialize the child fragment if applicable.
if (parent) {
Expand Down Expand Up @@ -471,15 +523,21 @@ static AsyncTaskAndContext swift_task_create_group_future_commonImpl(
// as if they might be null, even though the only time they ever might
// be is the final hop. Store a signed null instead.
initialContext->Parent = nullptr;
initialContext->ResumeParent = reinterpret_cast<TaskContinuationFunction *>(
(closureContext && owningClosureContext) ? &completeTaskWithClosure :
&completeTask);
initialContext->Flags = AsyncContextKind::Ordinary;
initialContext->Flags.setShouldNotDeallocateInCallee(true);

// Initialize the task-local allocator.
// TODO: consider providing an initial pre-allocated first slab to the allocator.
_swift_task_alloc_initialize(task);
if (isAsyncLetTask) {
initialContext->ResumeParent = reinterpret_cast<TaskContinuationFunction *>(
&completeTask);
assert(parent);
void *initialSlab = (char*)allocation + amountToAllocate;
_swift_task_alloc_initialize_with_slab(task, initialSlab, initialSlabSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool! so even a bunch of tiny task-local allocs inside the child have a chance to stay in the initial slab... <3

} else {
initialContext->ResumeParent = reinterpret_cast<TaskContinuationFunction *>(
closureContext ? &completeTaskWithClosure : &completeTaskAndRelease);
_swift_task_alloc_initialize(task);
}

// TODO: if the allocator would be prepared earlier we could do this in some
// other existing if-parent if rather than adding another one here.
Expand All @@ -494,7 +552,7 @@ static AsyncTaskAndContext swift_task_create_group_future_commonImpl(
static AsyncTaskAndContext swift_task_create_group_future_common(
JobFlags flags, TaskGroup *group, const Metadata *futureResultType,
FutureAsyncSignature::FunctionType *function,
void *closureContext, bool owningClosureContext,
void *closureContext, bool isAsyncLetTask,
size_t initialContextSize);

AsyncTaskAndContext
Expand Down Expand Up @@ -524,7 +582,7 @@ AsyncTaskAndContext swift::swift_task_create_group_future_f(
return swift_task_create_group_future_common(flags, group,
futureResultType,
function, nullptr,
/*owningClosureContext*/ false,
/*isAsyncLetTask*/ false,
initialContextSize);
}

Expand Down Expand Up @@ -560,11 +618,11 @@ AsyncTaskAndContext swift::swift_task_create_future(JobFlags flags,
return swift_task_create_group_future_common(
flags, nullptr, futureResultType,
taskEntry, closureContext,
/*owningClosureContext*/ true,
/*isAsyncLetTask*/ false,
initialContextSize);
}

AsyncTaskAndContext swift::swift_task_create_future_no_escaping(JobFlags flags,
AsyncTaskAndContext swift::swift_task_create_async_let_future(JobFlags flags,
const Metadata *futureResultType,
void *closureEntry,
void *closureContext) {
Expand All @@ -579,7 +637,7 @@ AsyncTaskAndContext swift::swift_task_create_future_no_escaping(JobFlags flags,
return swift_task_create_group_future_common(
flags, nullptr, futureResultType,
taskEntry, closureContext,
/*owningClosureContext*/ false,
/*isAsyncLetTask*/ true,
initialContextSize);
}

Expand All @@ -599,7 +657,7 @@ swift::swift_task_create_group_future(
return swift_task_create_group_future_common(
flags, group, futureResultType,
taskEntry, closureContext,
/*owningClosureContext*/ true,
/*isAsyncLetTask*/ false,
initialContextSize);
}

Expand Down
6 changes: 6 additions & 0 deletions stdlib/public/Concurrency/TaskAlloc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ void swift::_swift_task_alloc_initialize(AsyncTask *task) {
new (task->AllocatorPrivate) TaskAllocator();
}

void swift::_swift_task_alloc_initialize_with_slab(AsyncTask *task,
void *firstSlabBuffer,
size_t bufferCapacity) {
new (task->AllocatorPrivate) TaskAllocator(firstSlabBuffer, bufferCapacity);
}

static TaskAllocator &allocator(AsyncTask *task) {
if (task)
return reinterpret_cast<TaskAllocator &>(task->AllocatorPrivate);
Expand Down
6 changes: 5 additions & 1 deletion stdlib/public/Concurrency/TaskPrivate.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class TaskGroup;
/// Initialize the task-local allocator in the given task.
void _swift_task_alloc_initialize(AsyncTask *task);

void _swift_task_alloc_initialize_with_slab(AsyncTask *task,
void *firstSlabBuffer,
size_t bufferCapacity);

/// Destroy the task-local allocator in the given task.
void _swift_task_alloc_destroy(AsyncTask *task);

Expand All @@ -55,7 +59,7 @@ void runJobInEstablishedExecutorContext(Job *job);
/// Clear the active task reference for the current thread.
AsyncTask *_swift_task_clearCurrent();

AsyncTaskAndContext swift_task_create_future_no_escaping(JobFlags flags,
AsyncTaskAndContext swift_task_create_async_let_future(JobFlags flags,
const Metadata *futureResultType,
void *closureEntry,
void *closureContext);
Expand Down