2
2
//
3
3
// This source file is part of the Swift.org open source project
4
4
//
5
- // Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
5
+ // Copyright (c) 2014 - 2021 Apple Inc. and the Swift project authors
6
6
// Licensed under Apache License v2.0 with Runtime Library Exception
7
7
//
8
8
// See https://swift.org/LICENSE.txt for license information
@@ -278,7 +278,7 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
278
278
279
279
private:
280
280
281
- // // TODO: move to lockless via the status atomic
281
+ // TODO: move to lockless via the status atomic (make readyQueue an mpsc_queue_t<ReadyQueueItem>)
282
282
mutable std::mutex mutex;
283
283
284
284
// / Used for queue management, counting number of waiting and ready tasks
@@ -289,7 +289,6 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
289
289
// / The low bits contain the status, the rest of the pointer is the
290
290
// / AsyncTask.
291
291
NaiveQueue<ReadyQueueItem> readyQueue;
292
- // mpsc_queue_t<ReadyQueueItem> readyQueue; // TODO: can we get away with an MPSC queue here once actor executors land?
293
292
294
293
// / Single waiting `AsyncTask` currently waiting on `group.next()`,
295
294
// / or `nullptr` if no task is currently waiting.
@@ -304,10 +303,8 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
304
303
: TaskGroupTaskStatusRecord(),
305
304
status(GroupStatus::initial().status),
306
305
readyQueue(),
307
- // readyQueue(ReadyQueueItem::get(ReadyStatus::Empty, nullptr)),
308
306
waitQueue(nullptr ), successType(T) {}
309
307
310
-
311
308
TaskGroupTaskStatusRecord *getTaskRecord () {
312
309
return reinterpret_cast <TaskGroupTaskStatusRecord *>(this );
313
310
}
@@ -471,11 +468,18 @@ static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T)
471
468
472
469
// =============================================================================
473
470
// ==== add / attachChild ------------------------------------------------------
471
+
474
472
SWIFT_CC (swift)
475
473
static void swift_taskGroup_attachChildImpl(TaskGroup *group,
476
474
AsyncTask *child) {
475
+ SWIFT_TASK_DEBUG_LOG (" attach child task = %p to group = %p\n " ,
476
+ child, group);
477
+
478
+ // The counterpart of this (detachChild) is performed by the group itself,
479
+ // when it offers the completed (child) task's value to a waiting task -
480
+ // during the implementation of `await group.next()`.
477
481
auto groupRecord = asImpl (group)->getTaskRecord ();
478
- return groupRecord->attachChild (child);
482
+ groupRecord->attachChild (child);
479
483
}
480
484
481
485
// =============================================================================
@@ -505,8 +509,7 @@ bool TaskGroup::isCancelled() {
505
509
}
506
510
507
511
static void fillGroupNextResult (TaskFutureWaitAsyncContext *context,
508
- PollResult result,
509
- bool releaseResultRetainedTask) {
512
+ PollResult result) {
510
513
// / Fill in the result value
511
514
switch (result.status ) {
512
515
case PollStatus::MustWait:
@@ -515,7 +518,7 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
515
518
516
519
case PollStatus::Error: {
517
520
context->fillWithError (reinterpret_cast <SwiftError *>(result.storage ));
518
- break ;
521
+ return ;
519
522
}
520
523
521
524
case PollStatus::Success: {
@@ -527,28 +530,17 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
527
530
// remaining references to it.
528
531
successType->vw_initializeWithCopy (destPtr, result.storage );
529
532
successType->vw_storeEnumTagSinglePayload (destPtr, 0 , 1 );
530
- break ;
533
+ return ;
531
534
}
532
535
533
536
case PollStatus::Empty: {
534
537
// Initialize the result as a nil Optional<Success>.
535
538
const Metadata *successType = result.successType ;
536
539
OpaqueValue *destPtr = context->successResultPointer ;
537
540
successType->vw_storeEnumTagSinglePayload (destPtr, 1 , 1 );
538
- break ;
541
+ return ;
539
542
}
540
543
}
541
-
542
- // We only release if asked to; This is because this function is called in two
543
- // cases "immediately":
544
- // a) when a completed task arrives and a waiting one existed then we don't
545
- // need to retain the completed task at all, thus we also don't release it.
546
- // b) when the task was stored in the readyQueue it was retained. As a
547
- // waitingTask arrives we will fill-in with the value from the retained
548
- // task. In this situation we must release the ready task, to allow it to
549
- // be destroyed.
550
- if (releaseResultRetainedTask)
551
- swift_release (result.retainedTask );
552
544
}
553
545
554
546
void TaskGroupImpl::offer (AsyncTask *completedTask, AsyncContext *context) {
@@ -557,7 +549,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
557
549
assert (completedTask->hasChildFragment ());
558
550
assert (completedTask->hasGroupChildFragment ());
559
551
assert (completedTask->groupChildFragment ()->getGroup () == asAbstract (this ));
560
- SWIFT_TASK_DEBUG_LOG (" offer task %p to group %p\n " , completedTask, group );
552
+ SWIFT_TASK_DEBUG_LOG (" offer task %p to group %p" , completedTask, this );
561
553
562
554
mutex.lock (); // TODO: remove fragment lock, and use status for synchronization
563
555
@@ -603,7 +595,9 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
603
595
static_cast <TaskFutureWaitAsyncContext *>(
604
596
waitingTask->ResumeContext );
605
597
606
- fillGroupNextResult (waitingContext, result, /* release*/ false );
598
+ fillGroupNextResult (waitingContext, result);
599
+ detachChild (result.retainedTask );
600
+
607
601
_swift_tsan_acquire (static_cast <Job *>(waitingTask));
608
602
609
603
// TODO: allow the caller to suggest an executor
@@ -621,11 +615,12 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
621
615
// queue when a task polls during next() it will notice that we have a value
622
616
// ready for it, and will process it immediately without suspending.
623
617
assert (!waitQueue.load (std::memory_order_relaxed));
624
- SWIFT_TASK_DEBUG_LOG (" group has no waiting tasks, store ready task = %p" ,
618
+ SWIFT_TASK_DEBUG_LOG (" group has no waiting tasks, RETAIN and store ready task = %p" ,
625
619
completedTask);
626
620
// Retain the task while it is in the queue;
627
621
// it must remain alive until the task group is alive.
628
622
swift_retain (completedTask);
623
+
629
624
auto readyItem = ReadyQueueItem::get (
630
625
hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
631
626
completedTask
@@ -691,7 +686,7 @@ static void swift_taskGroup_wait_next_throwingImpl(
691
686
PollResult polled = group->poll (waitingTask);
692
687
switch (polled.status ) {
693
688
case PollStatus::MustWait:
694
- SWIFT_TASK_DEBUG_LOG (" poll group = %p, no ready tasks, waiting task = %p\n " ,
689
+ SWIFT_TASK_DEBUG_LOG (" poll group = %p, no ready tasks, waiting task = %p" ,
695
690
group, waitingTask);
696
691
// The waiting task has been queued on the channel,
697
692
// there were pending tasks so it will be woken up eventually.
@@ -705,15 +700,22 @@ static void swift_taskGroup_wait_next_throwingImpl(
705
700
case PollStatus::Empty:
706
701
case PollStatus::Error:
707
702
case PollStatus::Success:
708
- SWIFT_TASK_DEBUG_LOG (" poll group = %p, task = %p, ready task available = %p\n " ,
703
+ SWIFT_TASK_DEBUG_LOG (" poll group = %p, task = %p, ready task available = %p" ,
709
704
group, waitingTask, polled.retainedTask );
710
- fillGroupNextResult (context, polled, /* release*/ true );
705
+ fillGroupNextResult (context, polled);
706
+ if (auto completedTask = polled.retainedTask ) {
707
+ // it would be null for PollStatus::Empty, then we don't need to release
708
+ group->detachChild (polled.retainedTask );
709
+ swift_release (polled.retainedTask );
710
+ }
711
+
711
712
return waitingTask->runInFullyEstablishedContext ();
712
713
}
713
714
}
714
715
715
716
PollResult TaskGroupImpl::poll (AsyncTask *waitingTask) {
716
717
mutex.lock (); // TODO: remove group lock, and use status for synchronization
718
+ SWIFT_TASK_DEBUG_LOG (" poll group = %p" , this );
717
719
auto assumed = statusMarkWaitingAssumeAcquire ();
718
720
719
721
PollResult result;
@@ -723,6 +725,7 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
723
725
724
726
// ==== 1) bail out early if no tasks are pending ----------------------------
725
727
if (assumed.isEmpty ()) {
728
+ SWIFT_TASK_DEBUG_LOG (" poll group = %p, group is empty, no pending tasks" , this );
726
729
// No tasks in flight, we know no tasks were submitted before this poll
727
730
// was issued, and if we parked here we'd potentially never be woken up.
728
731
// Bail out and return `nil` from `group.next()`.
@@ -740,6 +743,9 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
740
743
741
744
// ==== 2) Ready task was polled, return with it immediately -----------------
742
745
if (assumed.readyTasks ()) {
746
+ SWIFT_TASK_DEBUG_LOG (" poll group = %p, group has ready tasks = %d" ,
747
+ this , assumed.readyTasks ());
748
+
743
749
auto assumedStatus = assumed.status ;
744
750
auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus};
745
751
if (status.compare_exchange_weak (
@@ -843,13 +849,16 @@ static void swift_taskGroup_cancelAllImpl(TaskGroup *group) {
843
849
}
844
850
845
851
bool TaskGroupImpl::cancelAll () {
852
+ SWIFT_TASK_DEBUG_LOG (" cancel all tasks in group = %p" , this );
853
+
846
854
// store the cancelled bit
847
855
auto old = statusCancel ();
848
856
if (old.isCancelled ()) {
849
857
// already was cancelled previously, nothing to do?
850
858
return false ;
851
859
}
852
860
861
+ // FIXME: must also remove the records!!!!
853
862
// cancel all existing tasks within the group
854
863
swift_task_cancel_group_child_tasks (asAbstract (this ));
855
864
return true ;
0 commit comments