@@ -984,13 +984,6 @@ class DefaultActorImplFooter {
984
984
// prioritizedJobs in the appropriate priority bucket.
985
985
//
986
986
PriorityQueue prioritizedJobs;
987
-
988
- // As an optimisation, we handle unprioritized jobs when obtaining actor lock.
989
- // This flag is used to skip `processIncomingQueue()` on the iteration of the
990
- // drainer loop right after taking the lock.
991
- // This applies both to obtaining lock when entering the drainer loop, and
992
- // when assuming thread for switching to a new actor.
993
- bool shouldProcessIncomingQueue;
994
987
#endif
995
988
};
996
989
@@ -1057,7 +1050,6 @@ class DefaultActorImpl
1057
1050
#else
1058
1051
_status ().store (ActiveActorStatus (), std::memory_order_relaxed);
1059
1052
new (&this ->prioritizedJobs ) PriorityQueue ();
1060
- this ->shouldProcessIncomingQueue = true ;
1061
1053
#endif
1062
1054
SWIFT_TASK_DEBUG_LOG (" Creating default actor %p" , this );
1063
1055
concurrency::trace::actor_create (this );
@@ -1374,14 +1366,10 @@ void DefaultActorImpl::enqueueStealer(Job *job, JobPriority priority) {
1374
1366
#endif
1375
1367
}
1376
1368
}
1369
+
1377
1370
}
1378
1371
1379
1372
void DefaultActorImpl::processIncomingQueue () {
1380
- if (!shouldProcessIncomingQueue) {
1381
- SWIFT_TASK_DEBUG_LOG (" Skip processing incoming queue of default actor %p" , this );
1382
- return ;
1383
- }
1384
-
1385
1373
// Pairs with the store release in DefaultActorImpl::enqueue
1386
1374
bool distributedActorIsRemote = swift_distributed_actor_is_remote (this );
1387
1375
auto oldState = _status ().load (SWIFT_MEMORY_ORDER_CONSUME);
@@ -1427,7 +1415,6 @@ void DefaultActorImpl::handleUnprioritizedJobs(Job *head) {
1427
1415
head = next;
1428
1416
}
1429
1417
prioritizedJobs.enqueueContentsOf (reversed);
1430
- shouldProcessIncomingQueue = false ;
1431
1418
}
1432
1419
1433
1420
// Called with actor lock held on current thread
@@ -1442,7 +1429,6 @@ Job *DefaultActorImpl::drainOne() {
1442
1429
SWIFT_TASK_DEBUG_LOG (" Drained first job %p from actor %p" , firstJob, this );
1443
1430
concurrency::trace::actor_dequeue (this , firstJob);
1444
1431
}
1445
- shouldProcessIncomingQueue = true ;
1446
1432
return firstJob;
1447
1433
}
1448
1434
@@ -1677,9 +1663,11 @@ retry:;
1677
1663
auto newState = oldState.withRunning ();
1678
1664
newState = newState.withoutEscalatedPriority ();
1679
1665
1680
- // Claim incoming jobs when obtaining lock, to save one
1666
+ // Claim incoming jobs when obtaining lock as a drainer , to save one
1681
1667
// round of atomic load and compare-exchange.
1682
- newState = newState.withFirstUnprioritisedJob (nullptr );
1668
+ if (asDrainer) {
1669
+ newState = newState.withFirstUnprioritisedJob (nullptr );
1670
+ }
1683
1671
1684
1672
// This needs an acquire since we are taking a lock
1685
1673
if (_status ().compare_exchange_weak (oldState, newState,
@@ -1690,7 +1678,9 @@ retry:;
1690
1678
assert (prioritizedJobs.empty ());
1691
1679
}
1692
1680
traceActorStateTransition (this , oldState, newState, distributedActorIsRemote);
1693
- handleUnprioritizedJobs (oldState.getFirstUnprioritisedJob ());
1681
+ if (asDrainer) {
1682
+ handleUnprioritizedJobs (oldState.getFirstUnprioritisedJob ());
1683
+ }
1694
1684
return true ;
1695
1685
}
1696
1686
}
0 commit comments