2
2
//
3
3
// This source file is part of the Swift.org open source project
4
4
//
5
- // Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
5
+ // Copyright (c) 2014 - 2021 Apple Inc. and the Swift project authors
6
6
// Licensed under Apache License v2.0 with Runtime Library Exception
7
7
//
8
8
// See https://swift.org/LICENSE.txt for license information
@@ -278,7 +278,7 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
278
278
279
279
private:
280
280
281
- // // TODO: move to lockless via the status atomic
281
+ // TODO: move to lockless via the status atomic (make readyQueue an mpsc_queue_t<ReadyQueueItem>)
282
282
mutable std::mutex mutex;
283
283
284
284
// / Used for queue management, counting number of waiting and ready tasks
@@ -289,7 +289,6 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
289
289
// / The low bits contain the status, the rest of the pointer is the
290
290
// / AsyncTask.
291
291
NaiveQueue<ReadyQueueItem> readyQueue;
292
- // mpsc_queue_t<ReadyQueueItem> readyQueue; // TODO: can we get away with an MPSC queue here once actor executors land?
293
292
294
293
// / Single waiting `AsyncTask` currently waiting on `group.next()`,
295
294
// / or `nullptr` if no task is currently waiting.
@@ -304,10 +303,8 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
304
303
: TaskGroupTaskStatusRecord(),
305
304
status(GroupStatus::initial().status),
306
305
readyQueue(),
307
- // readyQueue(ReadyQueueItem::get(ReadyStatus::Empty, nullptr)),
308
306
waitQueue(nullptr ), successType(T) {}
309
307
310
-
311
308
TaskGroupTaskStatusRecord *getTaskRecord () {
312
309
return reinterpret_cast <TaskGroupTaskStatusRecord *>(this );
313
310
}
@@ -471,11 +468,18 @@ static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T)
471
468
472
469
// =============================================================================
473
470
// ==== add / attachChild ------------------------------------------------------
471
+
474
472
SWIFT_CC (swift)
475
473
static void swift_taskGroup_attachChildImpl(TaskGroup *group,
476
474
AsyncTask *child) {
475
+ SWIFT_TASK_DEBUG_LOG (" attach child task = %p to group = %p\n " ,
476
+ child, group);
477
+
478
+ // The counterpart of this (detachChild) is performed by the group itself,
479
+ // when it offers the completed (child) task's value to a waiting task -
480
+ // during the implementation of `await group.next()`.
477
481
auto groupRecord = asImpl (group)->getTaskRecord ();
478
- return groupRecord->attachChild (child);
482
+ groupRecord->attachChild (child);
479
483
}
480
484
481
485
// =============================================================================
@@ -489,16 +493,8 @@ void TaskGroupImpl::destroy() {
489
493
// First, remove the group from the task and deallocate the record
490
494
swift_task_removeStatusRecord (getTaskRecord ());
491
495
492
- mutex.lock (); // TODO: remove lock, and use status for synchronization
493
- // Release all ready tasks which are kept retained, the group destroyed,
494
- // so no other task will ever await on them anymore;
495
- ReadyQueueItem item;
496
- bool taskDequeued = readyQueue.dequeue (item);
497
- while (taskDequeued) {
498
- swift_release (item.getTask ());
499
- taskDequeued = readyQueue.dequeue (item);
500
- }
501
- mutex.unlock (); // TODO: remove fragment lock, and use status for synchronization
496
+ // By the time we call destroy, all tasks inside the group must have been
497
+ // awaited on already; We handle this on the swift side.
502
498
}
503
499
504
500
// =============================================================================
@@ -520,9 +516,10 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
520
516
assert (false && " filling a waiting status?" );
521
517
return ;
522
518
523
- case PollStatus::Error:
524
- context->fillWithError (reinterpret_cast <SwiftError*>(result.storage ));
519
+ case PollStatus::Error: {
520
+ context->fillWithError (reinterpret_cast <SwiftError *>(result.storage ));
525
521
return ;
522
+ }
526
523
527
524
case PollStatus::Success: {
528
525
// Initialize the result as an Optional<Success>.
@@ -552,16 +549,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
552
549
assert (completedTask->hasChildFragment ());
553
550
assert (completedTask->hasGroupChildFragment ());
554
551
assert (completedTask->groupChildFragment ()->getGroup () == asAbstract (this ));
555
-
556
- // We retain the completed task, because we will either:
557
- // - (a) schedule the waiter to resume on the next() that it is waiting on, or
558
- // - (b) will need to store this task until the group task enters next() and
559
- // picks up this task.
560
- // either way, there is some time between us returning here, and the `completeTask`
561
- // issuing a swift_release on this very task. We need to keep it alive until
562
- // we have the chance to poll it from the queue (via the waiter task entering
563
- // calling next()).
564
- swift_retain (completedTask);
552
+ SWIFT_TASK_DEBUG_LOG (" offer task %p to group %p" , completedTask, this );
565
553
566
554
mutex.lock (); // TODO: remove fragment lock, and use status for synchronization
567
555
@@ -585,6 +573,8 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
585
573
// ==== a) has waiting task, so let us complete it right away
586
574
if (assumed.hasWaitingTask ()) {
587
575
auto waitingTask = waitQueue.load (std::memory_order_acquire);
576
+ SWIFT_TASK_DEBUG_LOG (" group has waiting task = %p, complete with = %p" ,
577
+ waitingTask, completedTask);
588
578
while (true ) {
589
579
// ==== a) run waiting task directly -------------------------------------
590
580
assert (assumed.hasWaitingTask ());
@@ -606,6 +596,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
606
596
waitingTask->ResumeContext );
607
597
608
598
fillGroupNextResult (waitingContext, result);
599
+ detachChild (result.retainedTask );
609
600
610
601
_swift_tsan_acquire (static_cast <Job *>(waitingTask));
611
602
@@ -614,6 +605,8 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
614
605
return ;
615
606
} // else, try again
616
607
}
608
+
609
+ llvm_unreachable (" should have enqueued and returned." );
617
610
}
618
611
619
612
// ==== b) enqueue completion ------------------------------------------------
@@ -622,10 +615,12 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
622
615
// queue when a task polls during next() it will notice that we have a value
623
616
// ready for it, and will process it immediately without suspending.
624
617
assert (!waitQueue.load (std::memory_order_relaxed));
625
-
618
+ SWIFT_TASK_DEBUG_LOG (" group has no waiting tasks, RETAIN and store ready task = %p" ,
619
+ completedTask);
626
620
// Retain the task while it is in the queue;
627
621
// it must remain alive until the task group is alive.
628
622
swift_retain (completedTask);
623
+
629
624
auto readyItem = ReadyQueueItem::get (
630
625
hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
631
626
completedTask
@@ -667,6 +662,7 @@ SWIFT_CC(swiftasync) static void workaround_function_swift_taskGroup_wait_next_t
667
662
668
663
// =============================================================================
669
664
// ==== group.next() implementation (wait_next and groupPoll) ------------------
665
+
670
666
SWIFT_CC (swiftasync)
671
667
static void swift_taskGroup_wait_next_throwingImpl(
672
668
OpaqueValue *resultPointer, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
@@ -690,6 +686,8 @@ static void swift_taskGroup_wait_next_throwingImpl(
690
686
PollResult polled = group->poll (waitingTask);
691
687
switch (polled.status ) {
692
688
case PollStatus::MustWait:
689
+ SWIFT_TASK_DEBUG_LOG (" poll group = %p, no ready tasks, waiting task = %p" ,
690
+ group, waitingTask);
693
691
// The waiting task has been queued on the channel,
694
692
// there were pending tasks so it will be woken up eventually.
695
693
#ifdef __ARM_ARCH_7K__
@@ -702,13 +700,22 @@ static void swift_taskGroup_wait_next_throwingImpl(
702
700
case PollStatus::Empty:
703
701
case PollStatus::Error:
704
702
case PollStatus::Success:
703
+ SWIFT_TASK_DEBUG_LOG (" poll group = %p, task = %p, ready task available = %p" ,
704
+ group, waitingTask, polled.retainedTask );
705
705
fillGroupNextResult (context, polled);
706
+ if (auto completedTask = polled.retainedTask ) {
707
+ // it would be null for PollStatus::Empty, then we don't need to release
708
+ group->detachChild (polled.retainedTask );
709
+ swift_release (polled.retainedTask );
710
+ }
711
+
706
712
return waitingTask->runInFullyEstablishedContext ();
707
713
}
708
714
}
709
715
710
716
PollResult TaskGroupImpl::poll (AsyncTask *waitingTask) {
711
717
mutex.lock (); // TODO: remove group lock, and use status for synchronization
718
+ SWIFT_TASK_DEBUG_LOG (" poll group = %p" , this );
712
719
auto assumed = statusMarkWaitingAssumeAcquire ();
713
720
714
721
PollResult result;
@@ -718,6 +725,7 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
718
725
719
726
// ==== 1) bail out early if no tasks are pending ----------------------------
720
727
if (assumed.isEmpty ()) {
728
+ SWIFT_TASK_DEBUG_LOG (" poll group = %p, group is empty, no pending tasks" , this );
721
729
// No tasks in flight, we know no tasks were submitted before this poll
722
730
// was issued, and if we parked here we'd potentially never be woken up.
723
731
// Bail out and return `nil` from `group.next()`.
@@ -735,6 +743,9 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
735
743
736
744
// ==== 2) Ready task was polled, return with it immediately -----------------
737
745
if (assumed.readyTasks ()) {
746
+ SWIFT_TASK_DEBUG_LOG (" poll group = %p, group has ready tasks = %d" ,
747
+ this , assumed.readyTasks ());
748
+
738
749
auto assumedStatus = assumed.status ;
739
750
auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus};
740
751
if (status.compare_exchange_weak (
@@ -838,13 +849,16 @@ static void swift_taskGroup_cancelAllImpl(TaskGroup *group) {
838
849
}
839
850
840
851
bool TaskGroupImpl::cancelAll () {
852
+ SWIFT_TASK_DEBUG_LOG (" cancel all tasks in group = %p" , this );
853
+
841
854
// store the cancelled bit
842
855
auto old = statusCancel ();
843
856
if (old.isCancelled ()) {
844
857
// already was cancelled previously, nothing to do?
845
858
return false ;
846
859
}
847
860
861
+ // FIXME: must also remove the records!!!!
848
862
// cancel all existing tasks within the group
849
863
swift_task_cancel_group_child_tasks (asAbstract (this ));
850
864
return true ;
0 commit comments