Skip to content

Commit f9f2e93

Browse files
committed
Split the llvm::ThreadPool into an abstract base class and an implementation
This decouples the public API used to enqueue tasks and wait for completion from the actual implementation, and opens up the possibility for clients to set their own thread pool implementation for the pool. https://discourse.llvm.org/t/construct-threadpool-from-vector-of-existing-threads/76883
1 parent 1d5e3b2 commit f9f2e93

File tree

15 files changed

+174
-114
lines changed

15 files changed

+174
-114
lines changed

bolt/include/bolt/Core/ParallelUtilities.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ extern cl::opt<unsigned> TaskCount;
2828
} // namespace opts
2929

3030
namespace llvm {
31-
class ThreadPool;
3231

3332
namespace bolt {
3433
class BinaryContext;
@@ -51,7 +50,7 @@ enum SchedulingPolicy {
5150
};
5251

5352
/// Return the managed thread pool and initialize it if not initiliazed.
54-
ThreadPool &getThreadPool();
53+
ThreadPoolInterface &getThreadPool();
5554

5655
/// Perform the work on each BinaryFunction except those that are accepted
5756
/// by SkipPredicate, scheduling heuristic is based on SchedPolicy.

lldb/include/lldb/Core/Debugger.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252

5353
namespace llvm {
5454
class raw_ostream;
55-
class ThreadPool;
55+
class ThreadPoolInterface;
5656
} // namespace llvm
5757

5858
namespace lldb_private {
@@ -499,8 +499,8 @@ class Debugger : public std::enable_shared_from_this<Debugger>,
499499
return m_broadcaster_manager_sp;
500500
}
501501

502-
/// Shared thread poll. Use only with ThreadPoolTaskGroup.
503-
static llvm::ThreadPool &GetThreadPool();
502+
/// Shared thread pool. Use only with ThreadPoolTaskGroup.
503+
static llvm::ThreadPoolInterface &GetThreadPool();
504504

505505
/// Report warning events.
506506
///

llvm/include/llvm/Debuginfod/Debuginfod.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ Expected<std::string> getCachedOrDownloadArtifact(
9797
StringRef UniqueKey, StringRef UrlPath, StringRef CacheDirectoryPath,
9898
ArrayRef<StringRef> DebuginfodUrls, std::chrono::milliseconds Timeout);
9999

100-
class ThreadPool;
100+
class ThreadPoolInterface;
101101

102102
struct DebuginfodLogEntry {
103103
std::string Message;
@@ -135,7 +135,7 @@ class DebuginfodCollection {
135135
// error.
136136
Expected<bool> updateIfStale();
137137
DebuginfodLog &Log;
138-
ThreadPool &Pool;
138+
ThreadPoolInterface &Pool;
139139
Timer UpdateTimer;
140140
sys::Mutex UpdateMutex;
141141

@@ -145,7 +145,7 @@ class DebuginfodCollection {
145145

146146
public:
147147
DebuginfodCollection(ArrayRef<StringRef> Paths, DebuginfodLog &Log,
148-
ThreadPool &Pool, double MinInterval);
148+
ThreadPoolInterface &Pool, double MinInterval);
149149
Error update();
150150
Error updateForever(std::chrono::milliseconds Interval);
151151
Expected<std::string> findDebugBinaryPath(object::BuildIDRef);

llvm/include/llvm/Support/BalancedPartitioning.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050

5151
namespace llvm {
5252

53-
class ThreadPool;
53+
class ThreadPoolInterface;
5454
/// A function with a set of utility nodes where it is beneficial to order two
5555
/// functions close together if they have similar utility nodes
5656
class BPFunctionNode {
@@ -115,7 +115,7 @@ class BalancedPartitioning {
115115
/// threads, so we need to track how many active threads that could spawn more
116116
/// threads.
117117
struct BPThreadPool {
118-
ThreadPool &TheThreadPool;
118+
ThreadPoolInterface &TheThreadPool;
119119
std::mutex mtx;
120120
std::condition_variable cv;
121121
/// The number of threads that could spawn more threads
@@ -128,7 +128,8 @@ class BalancedPartitioning {
128128
/// acceptable for other threads to add more tasks while blocking on this
129129
/// call.
130130
void wait();
131-
BPThreadPool(ThreadPool &TheThreadPool) : TheThreadPool(TheThreadPool) {}
131+
BPThreadPool(ThreadPoolInterface &TheThreadPool)
132+
: TheThreadPool(TheThreadPool) {}
132133
};
133134

134135
/// Run a recursive bisection of a given list of FunctionNodes

llvm/include/llvm/Support/ThreadPool.h

Lines changed: 127 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,8 @@ namespace llvm {
3232

3333
class ThreadPoolTaskGroup;
3434

35-
/// A ThreadPool for asynchronous parallel execution on a defined number of
36-
/// threads.
37-
///
38-
/// The pool keeps a vector of threads alive, waiting on a condition variable
39-
/// for some work to become available.
35+
/// This defines the abstract base interface for a ThreadPool allowing
36+
/// asynchronous parallel execution on a defined number of threads.
4037
///
4138
/// It is possible to reuse one thread pool for different groups of tasks
4239
/// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
@@ -49,16 +46,31 @@ class ThreadPoolTaskGroup;
4946
/// available threads are used up by tasks waiting for a task that has no thread
5047
/// left to run on (this includes waiting on the returned future). It should be
5148
/// generally safe to wait() for a group as long as groups do not form a cycle.
52-
class ThreadPool {
49+
class ThreadPoolInterface {
50+
/// The actual method to enqueue a task to be defined by the concrete
51+
/// implementation.
52+
virtual void asyncEnqueue(std::function<void()> Task,
53+
ThreadPoolTaskGroup *Group) = 0;
54+
5355
public:
54-
/// Construct a pool using the hardware strategy \p S for mapping hardware
55-
/// execution resources (threads, cores, CPUs)
56-
/// Defaults to using the maximum execution resources in the system, but
57-
/// accounting for the affinity mask.
58-
ThreadPool(ThreadPoolStrategy S = hardware_concurrency());
56+
/// Destroying the pool will drain the pending tasks and wait. The current
57+
/// thread may participate in the execution of the pending tasks.
58+
virtual ~ThreadPoolInterface();
5959

60-
/// Blocking destructor: the pool will wait for all the threads to complete.
61-
~ThreadPool();
60+
/// Blocking wait for all the threads to complete and the queue to be empty.
61+
/// It is an error to try to add new tasks while blocking on this call.
62+
/// Calling wait() from a task would deadlock waiting for itself.
63+
virtual void wait() = 0;
64+
65+
/// Blocking wait for only all the threads in the given group to complete.
66+
/// It is possible to wait even inside a task, but waiting (directly or
67+
/// indirectly) on itself will deadlock. If called from a task running on a
68+
/// worker thread, the call may process pending tasks while waiting in order
69+
/// not to waste the thread.
70+
virtual void wait(ThreadPoolTaskGroup &Group) = 0;
71+
72+
/// Returns the maximum number of worker this pool can eventually grow to.
73+
virtual unsigned getMaxConcurrency() const = 0;
6274

6375
/// Asynchronous submission of a task to the pool. The returned future can be
6476
/// used to wait for the task to finish and is *non-blocking* on destruction.
@@ -92,30 +104,20 @@ class ThreadPool {
92104
&Group);
93105
}
94106

95-
/// Blocking wait for all the threads to complete and the queue to be empty.
96-
/// It is an error to try to add new tasks while blocking on this call.
97-
/// Calling wait() from a task would deadlock waiting for itself.
98-
void wait();
99-
100-
/// Blocking wait for only all the threads in the given group to complete.
101-
/// It is possible to wait even inside a task, but waiting (directly or
102-
/// indirectly) on itself will deadlock. If called from a task running on a
103-
/// worker thread, the call may process pending tasks while waiting in order
104-
/// not to waste the thread.
105-
void wait(ThreadPoolTaskGroup &Group);
106-
107-
// Returns the maximum number of worker threads in the pool, not the current
108-
// number of threads!
109-
unsigned getMaxConcurrency() const { return MaxThreadCount; }
110-
111-
// TODO: misleading legacy name warning!
112-
LLVM_DEPRECATED("Use getMaxConcurrency instead", "getMaxConcurrency")
113-
unsigned getThreadCount() const { return MaxThreadCount; }
107+
private:
108+
/// Asynchronous submission of a task to the pool. The returned future can be
109+
/// used to wait for the task to finish and is *non-blocking* on destruction.
110+
template <typename ResTy>
111+
std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
112+
ThreadPoolTaskGroup *Group) {
113+
/// Wrap the Task in a std::function<void()> that sets the result of the
114+
/// corresponding future.
115+
auto R = createTaskAndFuture(Task);
114116

115-
/// Returns true if the current thread is a worker thread of this thread pool.
116-
bool isWorkerThread() const;
117+
asyncEnqueue(std::move(R.first), Group);
118+
return R.second.share();
119+
}
117120

118-
private:
119121
/// Helpers to create a promise and a callable wrapper of \p Task that sets
120122
/// the result of the promise. Returns the callable and a future to access the
121123
/// result.
@@ -140,54 +142,74 @@ class ThreadPool {
140142
},
141143
std::move(F)};
142144
}
145+
};
146+
147+
/// A ThreadPool implementation using std::threads.
148+
///
149+
/// The pool keeps a vector of threads alive, waiting on a condition variable
150+
/// for some work to become available.
151+
class StdThreadPool : public ThreadPoolInterface {
152+
public:
153+
/// Construct a pool using the hardware strategy \p S for mapping hardware
154+
/// execution resources (threads, cores, CPUs)
155+
/// Defaults to using the maximum execution resources in the system, but
156+
/// accounting for the affinity mask.
157+
StdThreadPool(ThreadPoolStrategy S = hardware_concurrency());
158+
159+
/// Blocking destructor: the pool will wait for all the threads to complete.
160+
~StdThreadPool() override;
143161

162+
/// Blocking wait for all the threads to complete and the queue to be empty.
163+
/// It is an error to try to add new tasks while blocking on this call.
164+
/// Calling wait() from a task would deadlock waiting for itself.
165+
void wait() override;
166+
167+
/// Blocking wait for only all the threads in the given group to complete.
168+
/// It is possible to wait even inside a task, but waiting (directly or
169+
/// indirectly) on itself will deadlock. If called from a task running on a
170+
/// worker thread, the call may process pending tasks while waiting in order
171+
/// not to waste the thread.
172+
void wait(ThreadPoolTaskGroup &Group) override;
173+
174+
/// Returns the maximum number of worker threads in the pool, not the current
175+
/// number of threads!
176+
unsigned getMaxConcurrency() const override { return MaxThreadCount; }
177+
178+
// TODO: Remove, misleading legacy name warning!
179+
LLVM_DEPRECATED("Use getMaxConcurrency instead", "getMaxConcurrency")
180+
unsigned getThreadCount() const { return MaxThreadCount; }
181+
182+
/// Returns true if the current thread is a worker thread of this thread pool.
183+
bool isWorkerThread() const;
184+
185+
private:
144186
/// Returns true if all tasks in the given group have finished (nullptr means
145187
/// all tasks regardless of their group). QueueLock must be locked.
146188
bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;
147189

148190
/// Asynchronous submission of a task to the pool. The returned future can be
149191
/// used to wait for the task to finish and is *non-blocking* on destruction.
150-
template <typename ResTy>
151-
std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
152-
ThreadPoolTaskGroup *Group) {
153-
154-
#if LLVM_ENABLE_THREADS
155-
/// Wrap the Task in a std::function<void()> that sets the result of the
156-
/// corresponding future.
157-
auto R = createTaskAndFuture(Task);
158-
192+
void asyncEnqueue(std::function<void()> Task,
193+
ThreadPoolTaskGroup *Group) override {
159194
int requestedThreads;
160195
{
161196
// Lock the queue and push the new task
162197
std::unique_lock<std::mutex> LockGuard(QueueLock);
163198

164199
// Don't allow enqueueing after disabling the pool
165200
assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
166-
Tasks.emplace_back(std::make_pair(std::move(R.first), Group));
201+
Tasks.emplace_back(std::make_pair(std::move(Task), Group));
167202
requestedThreads = ActiveThreads + Tasks.size();
168203
}
169204
QueueCondition.notify_one();
170205
grow(requestedThreads);
171-
return R.second.share();
172-
173-
#else // LLVM_ENABLE_THREADS Disabled
174-
175-
// Get a Future with launch::deferred execution using std::async
176-
auto Future = std::async(std::launch::deferred, std::move(Task)).share();
177-
// Wrap the future so that both ThreadPool::wait() can operate and the
178-
// returned future can be sync'ed on.
179-
Tasks.emplace_back(std::make_pair([Future]() { Future.get(); }, Group));
180-
return Future;
181-
#endif
182206
}
183207

184-
#if LLVM_ENABLE_THREADS
185-
// Grow to ensure that we have at least `requested` Threads, but do not go
186-
// over MaxThreadCount.
208+
/// Grow to ensure that we have at least `requested` Threads, but do not go
209+
/// over MaxThreadCount.
187210
void grow(int requested);
188211

189212
void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
190-
#endif
191213

192214
/// Threads in flight
193215
std::vector<llvm::thread> Threads;
@@ -209,25 +231,66 @@ class ThreadPool {
209231
/// Number of threads active for tasks in the given group (only non-zero).
210232
DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;
211233

212-
#if LLVM_ENABLE_THREADS // avoids warning for unused variable
213234
/// Signal for the destruction of the pool, asking thread to exit.
214235
bool EnableFlag = true;
215-
#endif
216236

217237
const ThreadPoolStrategy Strategy;
218238

219239
/// Maximum number of threads to potentially grow this pool to.
220240
const unsigned MaxThreadCount;
221241
};
222242

243+
/// A non-threaded implementation.
244+
class SingleThreadExecutor : public ThreadPoolInterface {
245+
public:
246+
/// Construct a non-threaded pool, ignoring using the hardware strategy.
247+
SingleThreadExecutor(ThreadPoolStrategy ignored = {});
248+
249+
/// Blocking destructor: the pool will first execute the pending tasks.
250+
~SingleThreadExecutor() override;
251+
252+
/// Blocking wait for all the tasks to execute first
253+
void wait() override;
254+
255+
/// Blocking wait for only all the tasks in the given group to complete.
256+
void wait(ThreadPoolTaskGroup &Group) override;
257+
258+
/// Returns always 1: there is no concurrency.
259+
unsigned getMaxConcurrency() const override { return 1; }
260+
261+
// TODO: Remove, misleading legacy name warning!
262+
LLVM_DEPRECATED("Use getMaxConcurrency instead", "getMaxConcurrency")
263+
unsigned getThreadCount() const { return 1; }
264+
265+
/// Returns true if the current thread is a worker thread of this thread pool.
266+
bool isWorkerThread() const;
267+
268+
private:
269+
/// Asynchronous submission of a task to the pool. The returned future can be
270+
/// used to wait for the task to finish and is *non-blocking* on destruction.
271+
void asyncEnqueue(std::function<void()> Task,
272+
ThreadPoolTaskGroup *Group) override {
273+
Tasks.emplace_back(std::make_pair(std::move(Task), Group));
274+
}
275+
276+
/// Tasks waiting for execution in the pool.
277+
std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
278+
};
279+
280+
#if LLVM_ENABLE_THREADS
281+
using ThreadPool = StdThreadPool;
282+
#else
283+
using ThreadPool = SingleThreadExecutor;
284+
#endif
285+
223286
/// A group of tasks to be run on a thread pool. Thread pool tasks in different
224287
/// groups can run on the same threadpool but can be waited for separately.
225288
/// It is even possible for tasks of one group to submit and wait for tasks
226289
/// of another group, as long as this does not form a loop.
227290
class ThreadPoolTaskGroup {
228291
public:
229292
/// The ThreadPool argument is the thread pool to forward calls to.
230-
ThreadPoolTaskGroup(ThreadPool &Pool) : Pool(Pool) {}
293+
ThreadPoolTaskGroup(ThreadPoolInterface &Pool) : Pool(Pool) {}
231294

232295
/// Blocking destructor: will wait for all the tasks in the group to complete
233296
/// by calling ThreadPool::wait().
@@ -244,7 +307,7 @@ class ThreadPoolTaskGroup {
244307
void wait() { Pool.wait(*this); }
245308

246309
private:
247-
ThreadPool &Pool;
310+
ThreadPoolInterface &Pool;
248311
};
249312

250313
} // namespace llvm

llvm/lib/Debuginfod/Debuginfod.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,8 @@ DebuginfodLogEntry DebuginfodLog::pop() {
348348
}
349349

350350
DebuginfodCollection::DebuginfodCollection(ArrayRef<StringRef> PathsRef,
351-
DebuginfodLog &Log, ThreadPool &Pool,
351+
DebuginfodLog &Log,
352+
ThreadPoolInterface &Pool,
352353
double MinInterval)
353354
: Log(Log), Pool(Pool), MinInterval(MinInterval) {
354355
for (StringRef Path : PathsRef)

0 commit comments

Comments
 (0)