Skip to content

[Concurrency] Initial implementation of TaskGroups #34999

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 17, 2020

Conversation

ktoso
Copy link
Contributor

@ktoso ktoso commented Dec 8, 2020

I need to do some more cleanups and remove all the println debugging... but still fixing some final things.

I fully expect this to still have some bugs.

rdar://70141994

@ktoso
Copy link
Contributor Author

ktoso commented Dec 8, 2020

@swift-ci please smoke test

@ktoso
Copy link
Contributor Author

ktoso commented Dec 10, 2020

@swift-ci Please smoke test OS X platform

@ktoso ktoso force-pushed the wip-task-groups-impl-channel-rebased branch from ce5a1c2 to c11e1b1 Compare December 10, 2020 04:20
@ktoso
Copy link
Contributor Author

ktoso commented Dec 10, 2020

@swift-ci Please smoke test OS X platform

@ktoso ktoso changed the title [Concurrency]Initial implementation of TaskGroups [Concurrency] Initial implementation of TaskGroups Dec 10, 2020
@ktoso ktoso force-pushed the wip-task-groups-impl-channel-rebased branch from c11e1b1 to b115f9c Compare December 10, 2020 04:35
@ktoso
Copy link
Contributor Author

ktoso commented Dec 10, 2020

Ok rebased again and given up un lockless version for now; at least this way we can have things working first.

Will remove debugging and some of the noise and ask for review then...

@swift-ci Please smoke test OS X platform

@ktoso ktoso force-pushed the wip-task-groups-impl-channel-rebased branch from 54e980b to 824bbe0 Compare December 10, 2020 11:16
@ktoso
Copy link
Contributor Author

ktoso commented Dec 10, 2020

Actually... not fully sure about the command, is it macOS or OS X? the readme has both hm...

@swift-ci please smoke test

@ktoso ktoso force-pushed the wip-task-groups-impl-channel-rebased branch 3 times, most recently from 082fe29 to ff52070 Compare December 10, 2020 16:02

/// Drain all remaining tasks by awaiting on them;
/// Their failures are ignored;
await group._tearDown()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the semantics are:

  • run body; run all tasks in it
  • if body throws, cancel all, just return, without awaiting on the remaining ones
  • if body returns and there are remaining pending tasks, await on them
  • if one of those "ignored in body" throws, this is ignored and we keep waiting <- not sure that's right, perhaps we should bail out whenever at least one of them throws? It wasn't clear what to do in this case in our writeups. Other impls do bail out early I believe then

The logic is based on my reading of Kotlin's coroutineScopes, Trio's nurseries and also similar to what Loom does: https://github.com/openjdk/loom/blob/fibers/src/java.base/share/classes/java/util/concurrent/ExecutorService.java#L329


assert(isTaskGroup());
auto fragment = groupFragment();
fragment->mutex.lock(); // TODO: remove fragment lock, and use status for synchronization
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been spending a lot of time on the lockless dance and perhaps we should first get things going as I try to get the lockless exchanges correct.

One of the things I was unsure about is the queue style to use here -- I'm not completely sure if we perhaps could get away with an MPSC queue for the readyQueue? Initially I assumed yes, but then I thought no, and now I'm actually not sure... Can the group assume it will be run in an actor exclusive executor? I was kind of assuming "no", but even if, would it give enough guarantees for the consuming side of the ready queue... Have not figured that all out, and since we don't have the actor runtime yet I was not sure really.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(So all of the dance with atomics inside here is being developed with the assumption we'll get rid of this lock, but I wanted to offer the PR as some incremental progress for the time being rather than block progress on this)

mutable std::mutex mutex;

/// Used for queue management, counting number of waiting and ready tasks
std::atomic<unsigned long> status;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status is of utmost importance; it allows us to implement: isEmpty and properly implement scheduling including as well as decide if next() should return nil or suspend.

All offer and poll initially are to acquire the status and release it when they're done processing -- poll processes it by "if there is e.g. 2 ready tasks and 5 pending tasks" we know we can -1 both -- we then perform a CAS, to know if that polling task "won" and shall pull. This way even if we get multiple polls, (because they may be done in async lets -- ???), polls can be correct and we won't pull/decrement more than we indeed have ready.

I am not clear about the concurrent poller case -- I was assuming that because async lets, there may be concurrent polls issued... but perhaps I'm assuming too much? Are async lets all confined to the same exclusive actor executor and thus will not be able to cause concurrent polls? Clarification here would be helpful - thanks! If we can assume no concurrent polls, we will be able to get away with a simple queue for readyQueue as well, which would be excellent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DougGregor so all of this dance around the status and the two queues is remnants of attempts of the lockless impl;

I'm planning to get back to it and making it happen now that John's runtime landed -- I did not know if you'd rather we remove all of it and add proper thing later; or simply keep status quo + the lock because it works just fine enough; Honestly I'd prefer keeping as is as then I can spare a cycle of restoring similar infra...

Let me know if you'd rather only merge a perfectly clean locking impl for now. I did remove all the other not currently used MPSC queues and stuff so at least we do not pollute with not used datastructures.

void destroy();

bool isEmpty() {
auto oldStatus = GroupStatus { status.load(std::memory_order_relaxed) };
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be my lack of C++ experience showing...

Or could I model this as the atomic being of type GroupStatus?

offset += sizeof(GroupFragment);
}

return reinterpret_cast<FutureFragment *>(offset);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DougGregor I think I got this right; last time we talked this was an intense if/else dance if you remember.

@@ -293,12 +663,19 @@ class AsyncTask : public HeapObject, public Job {

FutureFragment *futureFragment() {
assert(isFuture());

auto offset = reinterpret_cast<uintptr_t>(this); // TODO: char* instead?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what style to follow -- other files do char* but this file seems to do uintptr_t for similar things.

WDYT / does it matter?

@ktoso ktoso requested review from DougGregor and rjmccall December 10, 2020 16:05
@ktoso ktoso marked this pull request as ready for review December 11, 2020 00:19
@ktoso
Copy link
Contributor Author

ktoso commented Dec 11, 2020

Okey I realized we can get back to lockless and vastly simplify the internal queues by just using one linked one like Futures do and CAS over it, since we don't need to guarantee ordering as strongly as I had thought initially I suppose.

@ktoso ktoso force-pushed the wip-task-groups-impl-channel-rebased branch 2 times, most recently from bf368d2 to 557969a Compare December 14, 2020 10:26
@ktoso
Copy link
Contributor Author

ktoso commented Dec 14, 2020

@swift-ci please smoke test

bool taskDequeued = readyQueue.dequeue(item);
}
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure if anything else to cleanup here; the wait queue never has values in it, just waiting tasks; so nothing to cleanup there I think

@ktoso ktoso force-pushed the wip-task-groups-impl-channel-rebased branch from 557969a to 25ff6cc Compare December 14, 2020 13:11
@DougGregor
Copy link
Member

@swift-ci please smoke test

// directly.
waitingTask->run(executor);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this only moved)

// switch (waitHead.getStatus()) {
// case GroupFragment::WaitStatus::Waiting:
// assert(false && "destroying a task group that still has waiting tasks");
// }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really I guess... since they never store any values, unlike in the Future case 🤔

@ktoso ktoso force-pushed the wip-task-groups-impl-channel-rebased branch from 25ff6cc to a0b9d73 Compare December 14, 2020 22:22
@ktoso
Copy link
Contributor Author

ktoso commented Dec 14, 2020

Fixed the SIL issue, introduced new _runGroupChildTask rather than change _runChildTask

@swift-ci please smoke test

@@ -190,12 +205,350 @@ class AsyncTask : public HeapObject, public Job {
}
};

// TODO: rename? all other functions are `is...` rather than `has...Fragment`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, TaskLocals also read as hasLocalValues so I'll remove this TODO

DougGregor and others added 7 commits December 17, 2020 06:05
move comments to the wired up continuations

remove duplicated continuations; leep the wired up ones

before moving to C++ for queue impl

trying to next wait via channel_poll

submitting works; need to impl next()
before reversing order of fragments; future must be last since dynamic
size

offer fixed

before implementing poll
@ktoso ktoso force-pushed the wip-task-groups-impl-channel-rebased branch from a0b9d73 to b267778 Compare December 16, 2020 21:36
@ktoso
Copy link
Contributor Author

ktoso commented Dec 16, 2020

@swift-ci please smoke test and merge

@swift-ci swift-ci merged commit f23f5b8 into swiftlang:main Dec 17, 2020
@ktoso ktoso deleted the wip-task-groups-impl-channel-rebased branch February 10, 2021 07:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants