@@ -1086,13 +1086,9 @@ class DefaultActorImpl
1086
1086
// / new priority
1087
1087
void enqueueStealer (Job *job, JobPriority priority);
1088
1088
1089
- // / Dequeues one job from `prioritisedJobs` .
1090
- // / The calling thread must be holding the actor lock while calling this
1089
+ // / Processes incoming jobs if needed and dequeues one job of the highest priority .
1090
+ // / The calling thread must be holding the actor lock while calling this.
1091
1091
Job *drainOne ();
1092
-
1093
- // / Atomically claims incoming jobs from ActiveActorStatus, and calls `handleUnprioritizedJobs()`.
1094
- // / Called with actor lock held on current thread.
1095
- void processIncomingQueue ();
1096
1092
#endif
1097
1093
1098
1094
// / Check if the actor is actually a distributed *remote* actor.
@@ -1129,6 +1125,10 @@ class DefaultActorImpl
1129
1125
// / when actor gets a priority override and we schedule a stealer.
1130
1126
void scheduleActorProcessJob (JobPriority priority);
1131
1127
1128
+ // / Atomically claims incoming jobs from ActiveActorStatus, and calls `handleUnprioritizedJobs()`.
1129
+ // / Called with actor lock held on current thread.
1130
+ void processIncomingQueue ();
1131
+
1132
1132
// / Processes claimed incoming jobs into `prioritisedJobs`.
1133
1133
// / Incoming jobs are of mixed priorities and in LIFO order.
1134
1134
// / Called with actor lock held on current thread.
@@ -1435,6 +1435,7 @@ void DefaultActorImpl::handleUnprioritizedJobs(Job *head) {
1435
1435
Job *DefaultActorImpl::drainOne () {
1436
1436
SWIFT_TASK_DEBUG_LOG (" Draining one job from default actor %p" , this );
1437
1437
1438
+ processIncomingQueue ();
1438
1439
traceJobQueue (this , prioritisedJobs.peek ());
1439
1440
auto firstJob = prioritisedJobs.dequeue ();
1440
1441
if (!firstJob) {
@@ -1499,40 +1500,39 @@ static void defaultActorDrain(DefaultActorImpl *actor) {
1499
1500
TaskExecutorRef::undefined ());
1500
1501
1501
1502
while (true ) {
1503
+ if (shouldYieldThread ()) {
1504
+ currentActor->unlock (true );
1505
+ break ;
1506
+ }
1507
+
1502
1508
Job *job = currentActor->drainOne ();
1503
1509
if (job == NULL ) {
1504
1510
// No work left to do, try unlocking the actor. This may fail if there is
1505
1511
// work concurrently enqueued in which case, we'd try again in the loop
1506
- if (currentActor->unlock (false )) {
1507
- break ;
1508
- }
1509
- } else {
1510
- if (AsyncTask *task = dyn_cast<AsyncTask>(job)) {
1511
- auto taskExecutor = task->getPreferredTaskExecutor ();
1512
- trackingInfo.setTaskExecutor (taskExecutor);
1512
+ if (!currentActor->unlock (false )) {
1513
+ continue ;
1513
1514
}
1515
+ break ;
1516
+ }
1514
1517
1515
- // This thread is now going to follow the task on this actor. It may hop off
1516
- // the actor
1517
- runJobInEstablishedExecutorContext (job);
1518
-
1519
- // We could have come back from the job on a generic executor and not as
1520
- // part of a default actor. If so, there is no more work left for us to do
1521
- // here.
1522
- auto currentExecutor = trackingInfo.getActiveExecutor ();
1523
- if (!currentExecutor.isDefaultActor ()) {
1524
- currentActor = nullptr ;
1525
- break ;
1526
- }
1527
- currentActor = asImpl (currentExecutor.getDefaultActor ());
1518
+ if (AsyncTask *task = dyn_cast<AsyncTask>(job)) {
1519
+ auto taskExecutor = task->getPreferredTaskExecutor ();
1520
+ trackingInfo.setTaskExecutor (taskExecutor);
1528
1521
}
1529
1522
1530
- if (shouldYieldThread ()) {
1531
- currentActor->unlock (true );
1523
+ // This thread is now going to follow the task on this actor. It may hop off
1524
+ // the actor
1525
+ runJobInEstablishedExecutorContext (job);
1526
+
1527
+ // We could have come back from the job on a generic executor and not as
1528
+ // part of a default actor. If so, there is no more work left for us to do
1529
+ // here.
1530
+ auto currentExecutor = trackingInfo.getActiveExecutor ();
1531
+ if (!currentExecutor.isDefaultActor ()) {
1532
+ currentActor = nullptr ;
1532
1533
break ;
1533
1534
}
1534
-
1535
- currentActor->processIncomingQueue ();
1535
+ currentActor = asImpl (currentExecutor.getDefaultActor ());
1536
1536
}
1537
1537
1538
1538
// Leave the tracking info.
0 commit comments