-
Notifications
You must be signed in to change notification settings - Fork 10.5k
[Concurrency] Initial implementation of TaskGroups #34999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Concurrency] Initial implementation of TaskGroups #34999
Conversation
@swift-ci please smoke test |
@swift-ci Please smoke test OS X platform |
ce5a1c2
to
c11e1b1
Compare
@swift-ci Please smoke test OS X platform |
c11e1b1
to
b115f9c
Compare
Ok rebased again and given up un lockless version for now; at least this way we can have things working first. Will remove debugging and some of the noise and ask for review then... @swift-ci Please smoke test OS X platform |
54e980b
to
824bbe0
Compare
Actually... not fully sure about the command, is it macOS or OS X? the readme has both hm... @swift-ci please smoke test |
082fe29
to
ff52070
Compare
|
||
/// Drain all remaining tasks by awaiting on them; | ||
/// Their failures are ignored; | ||
await group._tearDown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the semantics are:
- run body; run all tasks in it
- if body throws, cancel all, just return, without awaiting on the remaining ones
- if body returns and there are remaining pending tasks, await on them
- if one of those "ignored in body" throws, this is ignored and we keep waiting <- not sure that's right, perhaps we should bail out whenever at least one of them throws? It wasn't clear what to do in this case in our writeups. Other impls do bail out early I believe then
The logic is based on my reading of Kotlin's coroutineScopes, Trio's nurseries and also similar to what Loom does: https://github.com/openjdk/loom/blob/fibers/src/java.base/share/classes/java/util/concurrent/ExecutorService.java#L329
|
||
assert(isTaskGroup()); | ||
auto fragment = groupFragment(); | ||
fragment->mutex.lock(); // TODO: remove fragment lock, and use status for synchronization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been spending a lot of time on the lockless dance and perhaps we should first get things going as I try to get the lockless exchanges correct.
One of the things I was unsure about is the queue style to use here -- I'm not completely sure if we perhaps could get away with an MPSC queue for the readyQueue
? Initially I assumed yes, but then I thought no, and now I'm actually not sure... Can the group assume it will be run in an actor exclusive executor? I was kind of assuming "no", but even if, would it give enough guarantees for the consuming side of the ready queue... Have not figured that all out, and since we don't have the actor runtime yet I was not sure really.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(So all of the dance with atomics inside here is being developed with the assumption we'll get rid of this lock, but I wanted to offer the PR as some incremental progress for the time being rather than block progress on this)
mutable std::mutex mutex; | ||
|
||
/// Used for queue management, counting number of waiting and ready tasks | ||
std::atomic<unsigned long> status; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The status is of utmost importance; it allows us to implement: isEmpty and properly implement scheduling including as well as decide if next() should return nil or suspend.
All offer and poll initially are to acquire the status and release it when they're done processing -- poll processes it by "if there is e.g. 2 ready tasks and 5 pending tasks" we know we can -1 both -- we then perform a CAS, to know if that polling task "won" and shall pull. This way even if we get multiple polls, (because they may be done in async lets -- ???), polls can be correct and we won't pull/decrement more than we indeed have ready.
I am not clear about the concurrent poller case -- I was assuming that because async lets, there may be concurrent polls issued... but perhaps I'm assuming too much? Are async lets
all confined to the same exclusive actor executor and thus will not be able to cause concurrent polls? Clarification here would be helpful - thanks! If we can assume no concurrent polls, we will be able to get away with a simple queue for readyQueue as well, which would be excellent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DougGregor so all of this dance around the status and the two queues is remnants of attempts of the lockless impl;
I'm planning to get back to it and making it happen now that John's runtime landed -- I did not know if you'd rather we remove all of it and add proper thing later; or simply keep status quo + the lock because it works just fine enough; Honestly I'd prefer keeping as is as then I can spare a cycle of restoring similar infra...
Let me know if you'd rather only merge a perfectly clean locking impl for now. I did remove all the other not currently used MPSC queues and stuff so at least we do not pollute with not used datastructures.
void destroy(); | ||
|
||
bool isEmpty() { | ||
auto oldStatus = GroupStatus { status.load(std::memory_order_relaxed) }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be my lack of C++ experience showing...
Or could I model this as the atomic being of type GroupStatus?
offset += sizeof(GroupFragment); | ||
} | ||
|
||
return reinterpret_cast<FutureFragment *>(offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DougGregor I think I got this right; last time we talked this was an intense if/else dance if you remember.
@@ -293,12 +663,19 @@ class AsyncTask : public HeapObject, public Job { | |||
|
|||
FutureFragment *futureFragment() { | |||
assert(isFuture()); | |||
|
|||
auto offset = reinterpret_cast<uintptr_t>(this); // TODO: char* instead? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what style to follow -- other files do char* but this file seems to do uintptr_t for similar things.
WDYT / does it matter?
Okey I realized we can get back to lockless and vastly simplify the internal queues by just using one linked one like Futures do and CAS over it, since we don't need to guarantee ordering as strongly as I had thought initially I suppose. |
bf368d2
to
557969a
Compare
@swift-ci please smoke test |
bool taskDequeued = readyQueue.dequeue(item); | ||
} | ||
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsure if anything else to cleanup here; the wait queue never has values in it, just waiting tasks; so nothing to cleanup there I think
557969a
to
25ff6cc
Compare
@swift-ci please smoke test |
// directly. | ||
waitingTask->run(executor); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(this only moved)
// switch (waitHead.getStatus()) { | ||
// case GroupFragment::WaitStatus::Waiting: | ||
// assert(false && "destroying a task group that still has waiting tasks"); | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really I guess... since they never store any values, unlike in the Future case 🤔
25ff6cc
to
a0b9d73
Compare
Fixed the SIL issue, introduced new @swift-ci please smoke test |
@@ -190,12 +205,350 @@ class AsyncTask : public HeapObject, public Job { | |||
} | |||
}; | |||
|
|||
// TODO: rename? all other functions are `is...` rather than `has...Fragment` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, TaskLocals also read as hasLocalValues
so I'll remove this TODO
move comments to the wired up continuations remove duplicated continuations; leep the wired up ones before moving to C++ for queue impl trying to next wait via channel_poll submitting works; need to impl next()
before reversing order of fragments; future must be last since dynamic size offer fixed before implementing poll
a0b9d73
to
b267778
Compare
@swift-ci please smoke test and merge |
I need to do some more cleanups and remove all the println debugging... but still fixing some final things.
I fully expect this to still have some bugs.
rdar://70141994