@@ -981,6 +981,35 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
981
981
}
982
982
};
983
983
984
+ #if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
985
+
986
+ // / Given that a job is enqueued normally on a default actor, get/set
987
+ // / the next job in the actor's queue.
988
+ static JobRef getNextJobInQueue (Job *job) {
989
+ return *reinterpret_cast <JobRef *>(job->SchedulerPrivate );
990
+ }
991
+ static void setNextJobInQueue (Job *job, JobRef next) {
992
+ *reinterpret_cast <JobRef *>(job->SchedulerPrivate ) = next;
993
+ }
994
+
995
+ namespace {
996
+
997
+ struct JobQueueTraits {
998
+ static Job *getNext (Job *job) {
999
+ return getNextJobInQueue (job).getAsPreprocessedJob ();
1000
+ }
1001
+ static void setNext (Job *job, Job *next) {
1002
+ setNextJobInQueue (job, JobRef::getPreprocessed (next));
1003
+ }
1004
+ static int compare (Job *lhs, Job *rhs) {
1005
+ return descendingPriorityOrder (lhs->getPriority (), rhs->getPriority ());
1006
+ }
1007
+ };
1008
+
1009
+ } // end anonymous namespace
1010
+
1011
+ #endif
1012
+
984
1013
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION && SWIFT_POINTER_IS_4_BYTES
985
1014
#define ACTIVE_ACTOR_STATUS_SIZE (4 * (sizeof (uintptr_t )))
986
1015
#else
@@ -1052,10 +1081,13 @@ class DefaultActorImpl : public HeapObject {
1052
1081
// enforce alignment. This is space that is available for us to use in
1053
1082
// the future
1054
1083
alignas (sizeof (ActiveActorStatus)) char StatusStorage[sizeof(ActiveActorStatus)];
1084
+
1085
+ using ListMerger = swift::ListMerger<Job *, JobQueueTraits>;
1086
+ ListMerger::LastInsertionPoint lastInsertionPoint =
1087
+ ListMerger::LastInsertionPoint ();
1055
1088
#endif
1056
1089
// TODO (rokhinip): Make this a flagset
1057
1090
bool isDistributedRemoteActor;
1058
-
1059
1091
public:
1060
1092
// / Properly construct an actor, except for the heap header.
1061
1093
void initialize (bool isDistributedRemote = false ) {
@@ -1128,6 +1160,10 @@ class DefaultActorImpl : public HeapObject {
1128
1160
// / It can be done when actor transitions from Idle to Scheduled or
1129
1161
// / when actor gets a priority override and we schedule a stealer.
1130
1162
void scheduleActorProcessJob (JobPriority priority);
1163
+
1164
+ Job *preprocessQueue (JobRef start);
1165
+ Job *preprocessQueue (JobRef unprocessedStart, JobRef unprocessedEnd,
1166
+ Job *existingProcessedJobsToMergeInto);
1131
1167
#endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */
1132
1168
1133
1169
void deallocateUnconditional ();
@@ -1203,31 +1239,6 @@ static NonDefaultDistributedActorImpl *asImpl(NonDefaultDistributedActor *actor)
1203
1239
/* ****************************************************************************/
1204
1240
1205
1241
#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
1206
- // / Given that a job is enqueued normally on a default actor, get/set
1207
- // / the next job in the actor's queue.
1208
- static JobRef getNextJobInQueue (Job *job) {
1209
- return *reinterpret_cast <JobRef*>(job->SchedulerPrivate );
1210
- }
1211
- static void setNextJobInQueue (Job *job, JobRef next) {
1212
- *reinterpret_cast <JobRef*>(job->SchedulerPrivate ) = next;
1213
- }
1214
-
1215
- namespace {
1216
-
1217
- struct JobQueueTraits {
1218
- static Job *getNext (Job *job) {
1219
- return getNextJobInQueue (job).getAsPreprocessedJob ();
1220
- }
1221
- static void setNext (Job *job, Job *next) {
1222
- setNextJobInQueue (job, JobRef::getPreprocessed (next));
1223
- }
1224
- static int compare (Job *lhs, Job *rhs) {
1225
- return descendingPriorityOrder (lhs->getPriority (), rhs->getPriority ());
1226
- }
1227
- };
1228
-
1229
- } // end anonymous namespace
1230
-
1231
1242
1232
1243
// Called with the actor drain lock held
1233
1244
//
@@ -1240,15 +1251,14 @@ struct JobQueueTraits {
1240
1251
// and the previous start. We can then process these jobs and merge them into
1241
1252
// the already processed list of jobs from the previous iteration of
1242
1253
// preprocessQueue
1243
- static Job *
1244
- preprocessQueue (JobRef unprocessedStart, JobRef unprocessedEnd, Job *existingProcessedJobsToMergeInto)
1245
- {
1254
+ Job *DefaultActorImpl::preprocessQueue (JobRef unprocessedStart,
1255
+ JobRef unprocessedEnd,
1256
+ Job *existingProcessedJobsToMergeInto) {
1246
1257
assert (existingProcessedJobsToMergeInto != NULL );
1247
1258
assert (unprocessedStart.needsPreprocessing ());
1248
1259
assert (unprocessedStart.getAsJob () != unprocessedEnd.getAsJob ());
1249
1260
1250
1261
// Build up a list of jobs we need to preprocess
1251
- using ListMerger = swift::ListMerger<Job*, JobQueueTraits>;
1252
1262
ListMerger jobsToProcess;
1253
1263
1254
1264
// Get just the prefix list of unprocessed jobs
@@ -1263,19 +1273,20 @@ preprocessQueue(JobRef unprocessedStart, JobRef unprocessedEnd, Job *existingPro
1263
1273
}
1264
1274
1265
1275
// Finish processing the unprocessed jobs
1266
- Job *newProcessedJobs = jobsToProcess.release ();
1276
+ Job *newProcessedJobs = std::get< 0 >( jobsToProcess.release () );
1267
1277
assert (newProcessedJobs);
1268
1278
1269
- ListMerger mergedList (existingProcessedJobsToMergeInto);
1279
+ ListMerger mergedList (existingProcessedJobsToMergeInto, lastInsertionPoint );
1270
1280
mergedList.merge (newProcessedJobs);
1271
- return mergedList.release ();
1281
+ Job *result;
1282
+ std::tie (result, lastInsertionPoint) = mergedList.release ();
1283
+ return result;
1272
1284
}
1273
1285
1274
1286
// Called with the actor drain lock held.
1275
1287
//
1276
1288
// Preprocess the queue starting from the top
1277
- static Job *
1278
- preprocessQueue (JobRef start) {
1289
+ Job *DefaultActorImpl::preprocessQueue (JobRef start) {
1279
1290
if (!start) {
1280
1291
return NULL ;
1281
1292
}
@@ -1288,7 +1299,6 @@ preprocessQueue(JobRef start) {
1288
1299
// There exist some jobs which haven't been preprocessed
1289
1300
1290
1301
// Build up a list of jobs we need to preprocess
1291
- using ListMerger = swift::ListMerger<Job*, JobQueueTraits>;
1292
1302
ListMerger jobsToProcess;
1293
1303
1294
1304
Job *wellFormedListStart = NULL ;
@@ -1311,18 +1321,19 @@ preprocessQueue(JobRef start) {
1311
1321
}
1312
1322
1313
1323
// Finish processing the unprocessed jobs
1314
- auto processedJobHead = jobsToProcess.release ();
1324
+ auto processedJobHead = std::get< 0 >( jobsToProcess.release () );
1315
1325
assert (processedJobHead);
1316
1326
1317
1327
Job *firstJob = NULL ;
1318
1328
if (wellFormedListStart) {
1319
1329
// Merge it with already known well formed list if we have one.
1320
- ListMerger mergedList (wellFormedListStart);
1330
+ ListMerger mergedList (wellFormedListStart, lastInsertionPoint );
1321
1331
mergedList.merge (processedJobHead);
1322
- firstJob = mergedList.release ();
1332
+ std::tie ( firstJob, lastInsertionPoint) = mergedList.release ();
1323
1333
} else {
1324
1334
// Nothing to merge with, just return the head we already have
1325
1335
firstJob = processedJobHead;
1336
+ lastInsertionPoint = ListMerger::LastInsertionPoint ();
1326
1337
}
1327
1338
1328
1339
return firstJob;
@@ -1528,6 +1539,7 @@ Job * DefaultActorImpl::drainOne() {
1528
1539
if (_status ().compare_exchange_weak (oldState, newState,
1529
1540
/* success */ std::memory_order_relaxed,
1530
1541
/* failure */ std::memory_order_relaxed)) {
1542
+ lastInsertionPoint.nodeWasRemoved (firstJob);
1531
1543
SWIFT_TASK_DEBUG_LOG (" Drained first job %p from actor %p" , firstJob, this );
1532
1544
traceActorStateTransition (this , oldState, newState, distributedActorIsRemote);
1533
1545
concurrency::trace::actor_dequeue (this , firstJob);
0 commit comments