@@ -979,6 +979,35 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
979
979
}
980
980
};
981
981
982
+ #if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
983
+
984
+ // / Given that a job is enqueued normally on a default actor, get/set
985
+ // / the next job in the actor's queue.
986
+ static JobRef getNextJobInQueue (Job *job) {
987
+ return *reinterpret_cast <JobRef *>(job->SchedulerPrivate );
988
+ }
989
+ static void setNextJobInQueue (Job *job, JobRef next) {
990
+ *reinterpret_cast <JobRef *>(job->SchedulerPrivate ) = next;
991
+ }
992
+
993
+ namespace {
994
+
995
+ struct JobQueueTraits {
996
+ static Job *getNext (Job *job) {
997
+ return getNextJobInQueue (job).getAsPreprocessedJob ();
998
+ }
999
+ static void setNext (Job *job, Job *next) {
1000
+ setNextJobInQueue (job, JobRef::getPreprocessed (next));
1001
+ }
1002
+ static int compare (Job *lhs, Job *rhs) {
1003
+ return descendingPriorityOrder (lhs->getPriority (), rhs->getPriority ());
1004
+ }
1005
+ };
1006
+
1007
+ } // end anonymous namespace
1008
+
1009
+ #endif
1010
+
982
1011
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION && SWIFT_POINTER_IS_4_BYTES
983
1012
#define ACTIVE_ACTOR_STATUS_SIZE (4 * (sizeof (uintptr_t )))
984
1013
#else
@@ -1050,10 +1079,13 @@ class DefaultActorImpl : public HeapObject {
1050
1079
// enforce alignment. This is space that is available for us to use in
1051
1080
// the future
1052
1081
alignas (sizeof (ActiveActorStatus)) char StatusStorage[sizeof(ActiveActorStatus)];
1082
+
1083
+ using ListMerger = swift::ListMerger<Job *, JobQueueTraits>;
1084
+ ListMerger::LastInsertionPoint lastInsertionPoint =
1085
+ ListMerger::LastInsertionPoint ();
1053
1086
#endif
1054
1087
// TODO (rokhinip): Make this a flagset
1055
1088
bool isDistributedRemoteActor;
1056
-
1057
1089
public:
1058
1090
// / Properly construct an actor, except for the heap header.
1059
1091
void initialize (bool isDistributedRemote = false ) {
@@ -1126,6 +1158,10 @@ class DefaultActorImpl : public HeapObject {
1126
1158
// / It can be done when actor transitions from Idle to Scheduled or
1127
1159
// / when actor gets a priority override and we schedule a stealer.
1128
1160
void scheduleActorProcessJob (JobPriority priority);
1161
+
1162
+ Job *preprocessQueue (JobRef start);
1163
+ Job *preprocessQueue (JobRef unprocessedStart, JobRef unprocessedEnd,
1164
+ Job *existingProcessedJobsToMergeInto);
1129
1165
#endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */
1130
1166
1131
1167
void deallocateUnconditional ();
@@ -1201,31 +1237,6 @@ static NonDefaultDistributedActorImpl *asImpl(NonDefaultDistributedActor *actor)
1201
1237
/* ****************************************************************************/
1202
1238
1203
1239
#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
1204
- // / Given that a job is enqueued normally on a default actor, get/set
1205
- // / the next job in the actor's queue.
1206
- static JobRef getNextJobInQueue (Job *job) {
1207
- return *reinterpret_cast <JobRef*>(job->SchedulerPrivate );
1208
- }
1209
- static void setNextJobInQueue (Job *job, JobRef next) {
1210
- *reinterpret_cast <JobRef*>(job->SchedulerPrivate ) = next;
1211
- }
1212
-
1213
- namespace {
1214
-
1215
- struct JobQueueTraits {
1216
- static Job *getNext (Job *job) {
1217
- return getNextJobInQueue (job).getAsPreprocessedJob ();
1218
- }
1219
- static void setNext (Job *job, Job *next) {
1220
- setNextJobInQueue (job, JobRef::getPreprocessed (next));
1221
- }
1222
- static int compare (Job *lhs, Job *rhs) {
1223
- return descendingPriorityOrder (lhs->getPriority (), rhs->getPriority ());
1224
- }
1225
- };
1226
-
1227
- } // end anonymous namespace
1228
-
1229
1240
1230
1241
// Called with the actor drain lock held
1231
1242
//
@@ -1238,15 +1249,14 @@ struct JobQueueTraits {
1238
1249
// and the previous start. We can then process these jobs and merge them into
1239
1250
// the already processed list of jobs from the previous iteration of
1240
1251
// preprocessQueue
1241
- static Job *
1242
- preprocessQueue (JobRef unprocessedStart, JobRef unprocessedEnd, Job *existingProcessedJobsToMergeInto)
1243
- {
1252
+ Job *DefaultActorImpl::preprocessQueue (JobRef unprocessedStart,
1253
+ JobRef unprocessedEnd,
1254
+ Job *existingProcessedJobsToMergeInto) {
1244
1255
assert (existingProcessedJobsToMergeInto != NULL );
1245
1256
assert (unprocessedStart.needsPreprocessing ());
1246
1257
assert (unprocessedStart.getAsJob () != unprocessedEnd.getAsJob ());
1247
1258
1248
1259
// Build up a list of jobs we need to preprocess
1249
- using ListMerger = swift::ListMerger<Job*, JobQueueTraits>;
1250
1260
ListMerger jobsToProcess;
1251
1261
1252
1262
// Get just the prefix list of unprocessed jobs
@@ -1261,19 +1271,20 @@ preprocessQueue(JobRef unprocessedStart, JobRef unprocessedEnd, Job *existingPro
1261
1271
}
1262
1272
1263
1273
// Finish processing the unprocessed jobs
1264
- Job *newProcessedJobs = jobsToProcess.release ();
1274
+ Job *newProcessedJobs = std::get< 0 >( jobsToProcess.release () );
1265
1275
assert (newProcessedJobs);
1266
1276
1267
- ListMerger mergedList (existingProcessedJobsToMergeInto);
1277
+ ListMerger mergedList (existingProcessedJobsToMergeInto, lastInsertionPoint );
1268
1278
mergedList.merge (newProcessedJobs);
1269
- return mergedList.release ();
1279
+ Job *result;
1280
+ std::tie (result, lastInsertionPoint) = mergedList.release ();
1281
+ return result;
1270
1282
}
1271
1283
1272
1284
// Called with the actor drain lock held.
1273
1285
//
1274
1286
// Preprocess the queue starting from the top
1275
- static Job *
1276
- preprocessQueue (JobRef start) {
1287
+ Job *DefaultActorImpl::preprocessQueue (JobRef start) {
1277
1288
if (!start) {
1278
1289
return NULL ;
1279
1290
}
@@ -1286,7 +1297,6 @@ preprocessQueue(JobRef start) {
1286
1297
// There exist some jobs which haven't been preprocessed
1287
1298
1288
1299
// Build up a list of jobs we need to preprocess
1289
- using ListMerger = swift::ListMerger<Job*, JobQueueTraits>;
1290
1300
ListMerger jobsToProcess;
1291
1301
1292
1302
Job *wellFormedListStart = NULL ;
@@ -1309,18 +1319,19 @@ preprocessQueue(JobRef start) {
1309
1319
}
1310
1320
1311
1321
// Finish processing the unprocessed jobs
1312
- auto processedJobHead = jobsToProcess.release ();
1322
+ auto processedJobHead = std::get< 0 >( jobsToProcess.release () );
1313
1323
assert (processedJobHead);
1314
1324
1315
1325
Job *firstJob = NULL ;
1316
1326
if (wellFormedListStart) {
1317
1327
// Merge it with already known well formed list if we have one.
1318
- ListMerger mergedList (wellFormedListStart);
1328
+ ListMerger mergedList (wellFormedListStart, lastInsertionPoint );
1319
1329
mergedList.merge (processedJobHead);
1320
- firstJob = mergedList.release ();
1330
+ std::tie ( firstJob, lastInsertionPoint) = mergedList.release ();
1321
1331
} else {
1322
1332
// Nothing to merge with, just return the head we already have
1323
1333
firstJob = processedJobHead;
1334
+ lastInsertionPoint = ListMerger::LastInsertionPoint ();
1324
1335
}
1325
1336
1326
1337
return firstJob;
@@ -1526,6 +1537,7 @@ Job * DefaultActorImpl::drainOne() {
1526
1537
if (_status ().compare_exchange_weak (oldState, newState,
1527
1538
/* success */ std::memory_order_relaxed,
1528
1539
/* failure */ std::memory_order_relaxed)) {
1540
+ lastInsertionPoint.nodeWasRemoved (firstJob);
1529
1541
SWIFT_TASK_DEBUG_LOG (" Drained first job %p from actor %p" , firstJob, this );
1530
1542
traceActorStateTransition (this , oldState, newState, distributedActorIsRemote);
1531
1543
concurrency::trace::actor_dequeue (this , firstJob);
0 commit comments