@@ -1085,9 +1085,13 @@ class DefaultActorImpl
1085
1085
// / new priority
1086
1086
void enqueueStealer (Job *job, JobPriority priority);
1087
1087
1088
- // / Processes incoming jobs if needed and dequeues one job of the highest priority .
1089
- // / The calling thread must be holding the actor lock while calling this.
1088
+ // / Dequeues one job from `prioritisedJobs` .
1089
+ // / The calling thread must be holding the actor lock while calling this
1090
1090
Job *drainOne ();
1091
+
1092
+ // / Atomically claims incoming jobs from ActiveActorStatus, and calls `handleUnprioritizedJobs()`.
1093
+ // / Called with actor lock held on current thread.
1094
+ void processIncomingQueue ();
1091
1095
#endif
1092
1096
1093
1097
// / Check if the actor is actually a distributed *remote* actor.
@@ -1124,10 +1128,6 @@ class DefaultActorImpl
1124
1128
// / when actor gets a priority override and we schedule a stealer.
1125
1129
void scheduleActorProcessJob (JobPriority priority);
1126
1130
1127
- // / Atomically claims incoming jobs from ActiveActorStatus, and calls `handleUnprioritizedJobs()`.
1128
- // / Called with actor lock held on current thread.
1129
- void processIncomingQueue ();
1130
-
1131
1131
// / Processes claimed incoming jobs into `prioritizedJobs`.
1132
1132
// / Incoming jobs are of mixed priorities and in LIFO order.
1133
1133
// / Called with actor lock held on current thread.
@@ -1434,7 +1434,6 @@ void DefaultActorImpl::handleUnprioritizedJobs(Job *head) {
1434
1434
Job *DefaultActorImpl::drainOne () {
1435
1435
SWIFT_TASK_DEBUG_LOG (" Draining one job from default actor %p" , this );
1436
1436
1437
- processIncomingQueue ();
1438
1437
traceJobQueue (this , prioritizedJobs.peek ());
1439
1438
auto firstJob = prioritizedJobs.dequeue ();
1440
1439
if (!firstJob) {
@@ -1499,39 +1498,40 @@ static void defaultActorDrain(DefaultActorImpl *actor) {
1499
1498
TaskExecutorRef::undefined ());
1500
1499
1501
1500
while (true ) {
1502
- if (shouldYieldThread ()) {
1503
- currentActor->unlock (true );
1504
- break ;
1505
- }
1506
-
1507
1501
Job *job = currentActor->drainOne ();
1508
1502
if (job == NULL ) {
1509
1503
// No work left to do, try unlocking the actor. This may fail if there is
1510
1504
// work concurrently enqueued in which case, we'd try again in the loop
1511
- if (!currentActor->unlock (false )) {
1512
- continue ;
1505
+ if (currentActor->unlock (false )) {
1506
+ break ;
1507
+ }
1508
+ } else {
1509
+ if (AsyncTask *task = dyn_cast<AsyncTask>(job)) {
1510
+ auto taskExecutor = task->getPreferredTaskExecutor ();
1511
+ trackingInfo.setTaskExecutor (taskExecutor);
1513
1512
}
1514
- break ;
1515
- }
1516
1513
1517
- if (AsyncTask *task = dyn_cast<AsyncTask>(job)) {
1518
- auto taskExecutor = task->getPreferredTaskExecutor ();
1519
- trackingInfo.setTaskExecutor (taskExecutor);
1514
+ // This thread is now going to follow the task on this actor. It may hop off
1515
+ // the actor
1516
+ runJobInEstablishedExecutorContext (job);
1517
+
1518
+ // We could have come back from the job on a generic executor and not as
1519
+ // part of a default actor. If so, there is no more work left for us to do
1520
+ // here.
1521
+ auto currentExecutor = trackingInfo.getActiveExecutor ();
1522
+ if (!currentExecutor.isDefaultActor ()) {
1523
+ currentActor = nullptr ;
1524
+ break ;
1525
+ }
1526
+ currentActor = asImpl (currentExecutor.getDefaultActor ());
1520
1527
}
1521
1528
1522
- // This thread is now going to follow the task on this actor. It may hop off
1523
- // the actor
1524
- runJobInEstablishedExecutorContext (job);
1525
-
1526
- // We could have come back from the job on a generic executor and not as
1527
- // part of a default actor. If so, there is no more work left for us to do
1528
- // here.
1529
- auto currentExecutor = trackingInfo.getActiveExecutor ();
1530
- if (!currentExecutor.isDefaultActor ()) {
1531
- currentActor = nullptr ;
1529
+ if (shouldYieldThread ()) {
1530
+ currentActor->unlock (true );
1532
1531
break ;
1533
1532
}
1534
- currentActor = asImpl (currentExecutor.getDefaultActor ());
1533
+
1534
+ currentActor->processIncomingQueue ();
1535
1535
}
1536
1536
1537
1537
// Leave the tracking info.
0 commit comments