Skip to content

Commit fa249b4

Browse files
committed
Split the llvm::ThreadPool into an abstract base class and an implementation
This decouples the public API used to enqueue tasks and wait for completion from the actual implementation, and opens up the possibility for clients to set their own thread pool implementation for the pool. https://discourse.llvm.org/t/construct-threadpool-from-vector-of-existing-threads/76883
1 parent 1d5e3b2 commit fa249b4

File tree

16 files changed

+220
-165
lines changed

16 files changed

+220
-165
lines changed

bolt/include/bolt/Core/ParallelUtilities.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ extern cl::opt<unsigned> TaskCount;
2828
} // namespace opts
2929

3030
namespace llvm {
31-
class ThreadPool;
3231

3332
namespace bolt {
3433
class BinaryContext;
@@ -51,7 +50,7 @@ enum SchedulingPolicy {
5150
};
5251

5352
/// Return the managed thread pool and initialize it if not initiliazed.
54-
ThreadPool &getThreadPool();
53+
ThreadPoolInterface &getThreadPool();
5554

5655
/// Perform the work on each BinaryFunction except those that are accepted
5756
/// by SkipPredicate, scheduling heuristic is based on SchedPolicy.

lldb/include/lldb/Core/Debugger.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252

5353
namespace llvm {
5454
class raw_ostream;
55-
class ThreadPool;
55+
class ThreadPoolInterface;
5656
} // namespace llvm
5757

5858
namespace lldb_private {
@@ -499,8 +499,8 @@ class Debugger : public std::enable_shared_from_this<Debugger>,
499499
return m_broadcaster_manager_sp;
500500
}
501501

502-
/// Shared thread poll. Use only with ThreadPoolTaskGroup.
503-
static llvm::ThreadPool &GetThreadPool();
502+
/// Shared thread pool. Use only with ThreadPoolTaskGroup.
503+
static llvm::ThreadPoolInterface &GetThreadPool();
504504

505505
/// Report warning events.
506506
///

llvm/include/llvm/Debuginfod/Debuginfod.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ Expected<std::string> getCachedOrDownloadArtifact(
9797
StringRef UniqueKey, StringRef UrlPath, StringRef CacheDirectoryPath,
9898
ArrayRef<StringRef> DebuginfodUrls, std::chrono::milliseconds Timeout);
9999

100-
class ThreadPool;
100+
class ThreadPoolInterface;
101101

102102
struct DebuginfodLogEntry {
103103
std::string Message;
@@ -135,7 +135,7 @@ class DebuginfodCollection {
135135
// error.
136136
Expected<bool> updateIfStale();
137137
DebuginfodLog &Log;
138-
ThreadPool &Pool;
138+
ThreadPoolInterface &Pool;
139139
Timer UpdateTimer;
140140
sys::Mutex UpdateMutex;
141141

@@ -145,7 +145,7 @@ class DebuginfodCollection {
145145

146146
public:
147147
DebuginfodCollection(ArrayRef<StringRef> Paths, DebuginfodLog &Log,
148-
ThreadPool &Pool, double MinInterval);
148+
ThreadPoolInterface &Pool, double MinInterval);
149149
Error update();
150150
Error updateForever(std::chrono::milliseconds Interval);
151151
Expected<std::string> findDebugBinaryPath(object::BuildIDRef);

llvm/include/llvm/Support/BalancedPartitioning.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050

5151
namespace llvm {
5252

53-
class ThreadPool;
53+
class ThreadPoolInterface;
5454
/// A function with a set of utility nodes where it is beneficial to order two
5555
/// functions close together if they have similar utility nodes
5656
class BPFunctionNode {
@@ -115,7 +115,7 @@ class BalancedPartitioning {
115115
/// threads, so we need to track how many active threads that could spawn more
116116
/// threads.
117117
struct BPThreadPool {
118-
ThreadPool &TheThreadPool;
118+
ThreadPoolInterface &TheThreadPool;
119119
std::mutex mtx;
120120
std::condition_variable cv;
121121
/// The number of threads that could spawn more threads
@@ -128,7 +128,8 @@ class BalancedPartitioning {
128128
/// acceptable for other threads to add more tasks while blocking on this
129129
/// call.
130130
void wait();
131-
BPThreadPool(ThreadPool &TheThreadPool) : TheThreadPool(TheThreadPool) {}
131+
BPThreadPool(ThreadPoolInterface &TheThreadPool)
132+
: TheThreadPool(TheThreadPool) {}
132133
};
133134

134135
/// Run a recursive bisection of a given list of FunctionNodes

llvm/include/llvm/Support/ThreadPool.h

Lines changed: 108 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,8 @@ namespace llvm {
3232

3333
class ThreadPoolTaskGroup;
3434

35-
/// A ThreadPool for asynchronous parallel execution on a defined number of
36-
/// threads.
37-
///
38-
/// The pool keeps a vector of threads alive, waiting on a condition variable
39-
/// for some work to become available.
35+
/// This defines the abstract base interface for a ThreadPool allowing
36+
/// asynchronous parallel execution on a defined number of threads.
4037
///
4138
/// It is possible to reuse one thread pool for different groups of tasks
4239
/// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
@@ -49,16 +46,31 @@ class ThreadPoolTaskGroup;
4946
/// available threads are used up by tasks waiting for a task that has no thread
5047
/// left to run on (this includes waiting on the returned future). It should be
5148
/// generally safe to wait() for a group as long as groups do not form a cycle.
52-
class ThreadPool {
49+
class ThreadPoolInterface {
50+
/// The actual method to enqueue a task to be defined by the concrete
51+
/// implementation.
52+
virtual void asyncEnqueue(std::function<void()> Task,
53+
ThreadPoolTaskGroup *Group) = 0;
54+
5355
public:
54-
/// Construct a pool using the hardware strategy \p S for mapping hardware
55-
/// execution resources (threads, cores, CPUs)
56-
/// Defaults to using the maximum execution resources in the system, but
57-
/// accounting for the affinity mask.
58-
ThreadPool(ThreadPoolStrategy S = hardware_concurrency());
56+
/// Destroying the pool will drain the pending tasks and wait. The current
57+
/// thread may participate in the execution of the pending tasks.
58+
virtual ~ThreadPoolInterface();
5959

60-
/// Blocking destructor: the pool will wait for all the threads to complete.
61-
~ThreadPool();
60+
/// Blocking wait for all the threads to complete and the queue to be empty.
61+
/// It is an error to try to add new tasks while blocking on this call.
62+
/// Calling wait() from a task would deadlock waiting for itself.
63+
virtual void wait() = 0;
64+
65+
/// Blocking wait for only all the threads in the given group to complete.
66+
/// It is possible to wait even inside a task, but waiting (directly or
67+
/// indirectly) on itself will deadlock. If called from a task running on a
68+
/// worker thread, the call may process pending tasks while waiting in order
69+
/// not to waste the thread.
70+
virtual void wait(ThreadPoolTaskGroup &Group) = 0;
71+
72+
/// Returns the maximum number of worker this pool can eventually grow to.
73+
virtual unsigned getMaxConcurrency() const = 0;
6274

6375
/// Asynchronous submission of a task to the pool. The returned future can be
6476
/// used to wait for the task to finish and is *non-blocking* on destruction.
@@ -92,102 +104,84 @@ class ThreadPool {
92104
&Group);
93105
}
94106

107+
private:
108+
/// Asynchronous submission of a task to the pool. The returned future can be
109+
/// used to wait for the task to finish and is *non-blocking* on destruction.
110+
template <typename ResTy>
111+
std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
112+
ThreadPoolTaskGroup *Group) {
113+
auto Future = std::async(std::launch::deferred, std::move(Task)).share();
114+
asyncEnqueue([Future]() { Future.wait(); }, Group);
115+
return Future;
116+
}
117+
};
118+
119+
/// A ThreadPool implementation using std::threads.
120+
///
121+
/// The pool keeps a vector of threads alive, waiting on a condition variable
122+
/// for some work to become available.
123+
class StdThreadPool : public ThreadPoolInterface {
124+
public:
125+
/// Construct a pool using the hardware strategy \p S for mapping hardware
126+
/// execution resources (threads, cores, CPUs)
127+
/// Defaults to using the maximum execution resources in the system, but
128+
/// accounting for the affinity mask.
129+
StdThreadPool(ThreadPoolStrategy S = hardware_concurrency());
130+
131+
/// Blocking destructor: the pool will wait for all the threads to complete.
132+
~StdThreadPool() override;
133+
95134
/// Blocking wait for all the threads to complete and the queue to be empty.
96135
/// It is an error to try to add new tasks while blocking on this call.
97136
/// Calling wait() from a task would deadlock waiting for itself.
98-
void wait();
137+
void wait() override;
99138

100139
/// Blocking wait for only all the threads in the given group to complete.
101140
/// It is possible to wait even inside a task, but waiting (directly or
102141
/// indirectly) on itself will deadlock. If called from a task running on a
103142
/// worker thread, the call may process pending tasks while waiting in order
104143
/// not to waste the thread.
105-
void wait(ThreadPoolTaskGroup &Group);
144+
void wait(ThreadPoolTaskGroup &Group) override;
106145

107-
// Returns the maximum number of worker threads in the pool, not the current
108-
// number of threads!
109-
unsigned getMaxConcurrency() const { return MaxThreadCount; }
146+
/// Returns the maximum number of worker threads in the pool, not the current
147+
/// number of threads!
148+
unsigned getMaxConcurrency() const override { return MaxThreadCount; }
110149

111-
// TODO: misleading legacy name warning!
150+
// TODO: Remove, misleading legacy name warning!
112151
LLVM_DEPRECATED("Use getMaxConcurrency instead", "getMaxConcurrency")
113152
unsigned getThreadCount() const { return MaxThreadCount; }
114153

115154
/// Returns true if the current thread is a worker thread of this thread pool.
116155
bool isWorkerThread() const;
117156

118157
private:
119-
/// Helpers to create a promise and a callable wrapper of \p Task that sets
120-
/// the result of the promise. Returns the callable and a future to access the
121-
/// result.
122-
template <typename ResTy>
123-
static std::pair<std::function<void()>, std::future<ResTy>>
124-
createTaskAndFuture(std::function<ResTy()> Task) {
125-
std::shared_ptr<std::promise<ResTy>> Promise =
126-
std::make_shared<std::promise<ResTy>>();
127-
auto F = Promise->get_future();
128-
return {
129-
[Promise = std::move(Promise), Task]() { Promise->set_value(Task()); },
130-
std::move(F)};
131-
}
132-
static std::pair<std::function<void()>, std::future<void>>
133-
createTaskAndFuture(std::function<void()> Task) {
134-
std::shared_ptr<std::promise<void>> Promise =
135-
std::make_shared<std::promise<void>>();
136-
auto F = Promise->get_future();
137-
return {[Promise = std::move(Promise), Task]() {
138-
Task();
139-
Promise->set_value();
140-
},
141-
std::move(F)};
142-
}
143-
144158
/// Returns true if all tasks in the given group have finished (nullptr means
145159
/// all tasks regardless of their group). QueueLock must be locked.
146160
bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;
147161

148162
/// Asynchronous submission of a task to the pool. The returned future can be
149163
/// used to wait for the task to finish and is *non-blocking* on destruction.
150-
template <typename ResTy>
151-
std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
152-
ThreadPoolTaskGroup *Group) {
153-
154-
#if LLVM_ENABLE_THREADS
155-
/// Wrap the Task in a std::function<void()> that sets the result of the
156-
/// corresponding future.
157-
auto R = createTaskAndFuture(Task);
158-
164+
void asyncEnqueue(std::function<void()> Task,
165+
ThreadPoolTaskGroup *Group) override {
159166
int requestedThreads;
160167
{
161168
// Lock the queue and push the new task
162169
std::unique_lock<std::mutex> LockGuard(QueueLock);
163170

164171
// Don't allow enqueueing after disabling the pool
165172
assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
166-
Tasks.emplace_back(std::make_pair(std::move(R.first), Group));
173+
Tasks.emplace_back(std::make_pair(std::move(Task), Group));
167174
requestedThreads = ActiveThreads + Tasks.size();
168175
}
169176
QueueCondition.notify_one();
170177
grow(requestedThreads);
171-
return R.second.share();
172-
173-
#else // LLVM_ENABLE_THREADS Disabled
174-
175-
// Get a Future with launch::deferred execution using std::async
176-
auto Future = std::async(std::launch::deferred, std::move(Task)).share();
177-
// Wrap the future so that both ThreadPool::wait() can operate and the
178-
// returned future can be sync'ed on.
179-
Tasks.emplace_back(std::make_pair([Future]() { Future.get(); }, Group));
180-
return Future;
181-
#endif
182178
}
183179

184-
#if LLVM_ENABLE_THREADS
185-
// Grow to ensure that we have at least `requested` Threads, but do not go
186-
// over MaxThreadCount.
180+
/// Grow to ensure that we have at least `requested` Threads, but do not go
181+
/// over MaxThreadCount.
187182
void grow(int requested);
188183

189184
void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
190-
#endif
191185

192186
/// Threads in flight
193187
std::vector<llvm::thread> Threads;
@@ -209,25 +203,66 @@ class ThreadPool {
209203
/// Number of threads active for tasks in the given group (only non-zero).
210204
DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;
211205

212-
#if LLVM_ENABLE_THREADS // avoids warning for unused variable
213206
/// Signal for the destruction of the pool, asking thread to exit.
214207
bool EnableFlag = true;
215-
#endif
216208

217209
const ThreadPoolStrategy Strategy;
218210

219211
/// Maximum number of threads to potentially grow this pool to.
220212
const unsigned MaxThreadCount;
221213
};
222214

215+
/// A non-threaded implementation.
216+
class SingleThreadExecutor : public ThreadPoolInterface {
217+
public:
218+
/// Construct a non-threaded pool, ignoring using the hardware strategy.
219+
SingleThreadExecutor(ThreadPoolStrategy ignored = {});
220+
221+
/// Blocking destructor: the pool will first execute the pending tasks.
222+
~SingleThreadExecutor() override;
223+
224+
/// Blocking wait for all the tasks to execute first
225+
void wait() override;
226+
227+
/// Blocking wait for only all the tasks in the given group to complete.
228+
void wait(ThreadPoolTaskGroup &Group) override;
229+
230+
/// Returns always 1: there is no concurrency.
231+
unsigned getMaxConcurrency() const override { return 1; }
232+
233+
// TODO: Remove, misleading legacy name warning!
234+
LLVM_DEPRECATED("Use getMaxConcurrency instead", "getMaxConcurrency")
235+
unsigned getThreadCount() const { return 1; }
236+
237+
/// Returns true if the current thread is a worker thread of this thread pool.
238+
bool isWorkerThread() const;
239+
240+
private:
241+
/// Asynchronous submission of a task to the pool. The returned future can be
242+
/// used to wait for the task to finish and is *non-blocking* on destruction.
243+
void asyncEnqueue(std::function<void()> Task,
244+
ThreadPoolTaskGroup *Group) override {
245+
Tasks.emplace_back(std::make_pair(std::move(Task), Group));
246+
}
247+
248+
/// Tasks waiting for execution in the pool.
249+
std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
250+
};
251+
252+
#if LLVM_ENABLE_THREADS
253+
using ThreadPool = StdThreadPool;
254+
#else
255+
using ThreadPool = SingleThreadExecutor;
256+
#endif
257+
223258
/// A group of tasks to be run on a thread pool. Thread pool tasks in different
224259
/// groups can run on the same threadpool but can be waited for separately.
225260
/// It is even possible for tasks of one group to submit and wait for tasks
226261
/// of another group, as long as this does not form a loop.
227262
class ThreadPoolTaskGroup {
228263
public:
229264
/// The ThreadPool argument is the thread pool to forward calls to.
230-
ThreadPoolTaskGroup(ThreadPool &Pool) : Pool(Pool) {}
265+
ThreadPoolTaskGroup(ThreadPoolInterface &Pool) : Pool(Pool) {}
231266

232267
/// Blocking destructor: will wait for all the tasks in the group to complete
233268
/// by calling ThreadPool::wait().
@@ -244,7 +279,7 @@ class ThreadPoolTaskGroup {
244279
void wait() { Pool.wait(*this); }
245280

246281
private:
247-
ThreadPool &Pool;
282+
ThreadPoolInterface &Pool;
248283
};
249284

250285
} // namespace llvm

llvm/lib/Debuginfod/Debuginfod.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,8 @@ DebuginfodLogEntry DebuginfodLog::pop() {
348348
}
349349

350350
DebuginfodCollection::DebuginfodCollection(ArrayRef<StringRef> PathsRef,
351-
DebuginfodLog &Log, ThreadPool &Pool,
351+
DebuginfodLog &Log,
352+
ThreadPoolInterface &Pool,
352353
double MinInterval)
353354
: Log(Log), Pool(Pool), MinInterval(MinInterval) {
354355
for (StringRef Path : PathsRef)

0 commit comments

Comments
 (0)