-
Notifications
You must be signed in to change notification settings - Fork 14.3k
[Parallel] Revert sequential task changes #109084
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,6 @@ | |
#include "llvm/Support/Threading.h" | ||
|
||
#include <atomic> | ||
#include <deque> | ||
#include <future> | ||
#include <thread> | ||
#include <vector> | ||
|
@@ -39,7 +38,7 @@ namespace { | |
class Executor { | ||
public: | ||
virtual ~Executor() = default; | ||
virtual void add(std::function<void()> func, bool Sequential = false) = 0; | ||
virtual void add(std::function<void()> func) = 0; | ||
virtual size_t getThreadCount() const = 0; | ||
|
||
static Executor *getDefaultExecutor(); | ||
|
@@ -98,56 +97,34 @@ class ThreadPoolExecutor : public Executor { | |
static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } | ||
}; | ||
|
||
void add(std::function<void()> F, bool Sequential = false) override { | ||
void add(std::function<void()> F) override { | ||
{ | ||
std::lock_guard<std::mutex> Lock(Mutex); | ||
if (Sequential) | ||
WorkQueueSequential.emplace_front(std::move(F)); | ||
else | ||
WorkQueue.emplace_back(std::move(F)); | ||
WorkStack.push_back(std::move(F)); | ||
} | ||
Cond.notify_one(); | ||
} | ||
|
||
size_t getThreadCount() const override { return ThreadCount; } | ||
|
||
private: | ||
bool hasSequentialTasks() const { | ||
return !WorkQueueSequential.empty() && !SequentialQueueIsLocked; | ||
} | ||
|
||
bool hasGeneralTasks() const { return !WorkQueue.empty(); } | ||
|
||
void work(ThreadPoolStrategy S, unsigned ThreadID) { | ||
threadIndex = ThreadID; | ||
S.apply_thread_strategy(ThreadID); | ||
while (true) { | ||
std::unique_lock<std::mutex> Lock(Mutex); | ||
Cond.wait(Lock, [&] { | ||
return Stop || hasGeneralTasks() || hasSequentialTasks(); | ||
}); | ||
Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); | ||
if (Stop) | ||
break; | ||
bool Sequential = hasSequentialTasks(); | ||
if (Sequential) | ||
SequentialQueueIsLocked = true; | ||
else | ||
assert(hasGeneralTasks()); | ||
|
||
auto &Queue = Sequential ? WorkQueueSequential : WorkQueue; | ||
auto Task = std::move(Queue.back()); | ||
Queue.pop_back(); | ||
auto Task = std::move(WorkStack.back()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was missed in original code, but it seems that the check for WorkStack should be added here: if (WorkStack.empty()) { Otherwise, WorkStack.back() might be called on empty WorkStack in case spurious wakeup? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the code is fine. The condition_variable should prevent spurious wakeup. Also, |
||
WorkStack.pop_back(); | ||
Lock.unlock(); | ||
Task(); | ||
if (Sequential) | ||
SequentialQueueIsLocked = false; | ||
} | ||
} | ||
|
||
std::atomic<bool> Stop{false}; | ||
std::atomic<bool> SequentialQueueIsLocked{false}; | ||
std::deque<std::function<void()>> WorkQueue; | ||
std::deque<std::function<void()>> WorkQueueSequential; | ||
std::vector<std::function<void()>> WorkStack; | ||
std::mutex Mutex; | ||
std::condition_variable Cond; | ||
std::promise<void> ThreadsCreated; | ||
|
@@ -214,16 +191,14 @@ TaskGroup::~TaskGroup() { | |
L.sync(); | ||
} | ||
|
||
void TaskGroup::spawn(std::function<void()> F, bool Sequential) { | ||
void TaskGroup::spawn(std::function<void()> F) { | ||
#if LLVM_ENABLE_THREADS | ||
if (Parallel) { | ||
L.inc(); | ||
detail::Executor::getDefaultExecutor()->add( | ||
[&, F = std::move(F)] { | ||
F(); | ||
L.dec(); | ||
}, | ||
Sequential); | ||
detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] { | ||
F(); | ||
L.dec(); | ||
}); | ||
return; | ||
} | ||
#endif | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to run scanEH(); on main thread here? If it uses threadIndex then it will receive UINT_MAX which is unwanted. On the another hand scanEH contains whole loop and then all scanSection would be done in deterministic order. It looks like we can replace whole "if(serial)" with single "tg.spawn(scanEH);" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to run
scanEH
in a worker thread to avoidUINT_MAX
threadIndex
.The
if (serial) scanEH(); else tg.spawn(scanEH);
pattern is similar toif (serial) fn(); else tg.spawn(fn);
.scanEH
scans.eh_frame
in all input files..eh_frame
relocations are smaller than other sections, so the previous code scans all.eh_frame
in one task.