Skip to content

Split the llvm::ThreadPool into an abstract base class and an implementation #82094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions bolt/include/bolt/Core/ParallelUtilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "bolt/Core/MCPlusBuilder.h"
#include "llvm/Support/CommandLine.h"
#include "llvm/Support/ThreadPool.h"

using namespace llvm;

Expand All @@ -28,8 +29,6 @@ extern cl::opt<unsigned> TaskCount;
} // namespace opts

namespace llvm {
class ThreadPool;

namespace bolt {
class BinaryContext;
class BinaryFunction;
Expand Down
6 changes: 3 additions & 3 deletions lldb/include/lldb/Core/Debugger.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

namespace llvm {
class raw_ostream;
class ThreadPool;
class ThreadPoolInterface;
} // namespace llvm

namespace lldb_private {
Expand Down Expand Up @@ -499,8 +499,8 @@ class Debugger : public std::enable_shared_from_this<Debugger>,
return m_broadcaster_manager_sp;
}

/// Shared thread poll. Use only with ThreadPoolTaskGroup.
static llvm::ThreadPool &GetThreadPool();
/// Shared thread pool. Use only with ThreadPoolTaskGroup.
static llvm::ThreadPoolInterface &GetThreadPool();

/// Report warning events.
///
Expand Down
6 changes: 3 additions & 3 deletions llvm/include/llvm/Debuginfod/Debuginfod.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Expected<std::string> getCachedOrDownloadArtifact(
StringRef UniqueKey, StringRef UrlPath, StringRef CacheDirectoryPath,
ArrayRef<StringRef> DebuginfodUrls, std::chrono::milliseconds Timeout);

class ThreadPool;
class ThreadPoolInterface;

struct DebuginfodLogEntry {
std::string Message;
Expand Down Expand Up @@ -135,7 +135,7 @@ class DebuginfodCollection {
// error.
Expected<bool> updateIfStale();
DebuginfodLog &Log;
ThreadPool &Pool;
ThreadPoolInterface &Pool;
Timer UpdateTimer;
sys::Mutex UpdateMutex;

Expand All @@ -145,7 +145,7 @@ class DebuginfodCollection {

public:
DebuginfodCollection(ArrayRef<StringRef> Paths, DebuginfodLog &Log,
ThreadPool &Pool, double MinInterval);
ThreadPoolInterface &Pool, double MinInterval);
Error update();
Error updateForever(std::chrono::milliseconds Interval);
Expected<std::string> findDebugBinaryPath(object::BuildIDRef);
Expand Down
7 changes: 4 additions & 3 deletions llvm/include/llvm/Support/BalancedPartitioning.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

namespace llvm {

class ThreadPool;
class ThreadPoolInterface;
/// A function with a set of utility nodes where it is beneficial to order two
/// functions close together if they have similar utility nodes
class BPFunctionNode {
Expand Down Expand Up @@ -115,7 +115,7 @@ class BalancedPartitioning {
/// threads, so we need to track how many active threads that could spawn more
/// threads.
struct BPThreadPool {
ThreadPool &TheThreadPool;
ThreadPoolInterface &TheThreadPool;
std::mutex mtx;
std::condition_variable cv;
/// The number of threads that could spawn more threads
Expand All @@ -128,7 +128,8 @@ class BalancedPartitioning {
/// acceptable for other threads to add more tasks while blocking on this
/// call.
void wait();
BPThreadPool(ThreadPool &TheThreadPool) : TheThreadPool(TheThreadPool) {}
BPThreadPool(ThreadPoolInterface &TheThreadPool)
: TheThreadPool(TheThreadPool) {}
};

/// Run a recursive bisection of a given list of FunctionNodes
Expand Down
184 changes: 111 additions & 73 deletions llvm/include/llvm/Support/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@ namespace llvm {

class ThreadPoolTaskGroup;

/// A ThreadPool for asynchronous parallel execution on a defined number of
/// threads.
///
/// The pool keeps a vector of threads alive, waiting on a condition variable
/// for some work to become available.
/// This defines the abstract base interface for a ThreadPool allowing
/// asynchronous parallel execution on a defined number of threads.
///
/// It is possible to reuse one thread pool for different groups of tasks
/// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
Expand All @@ -49,16 +46,31 @@ class ThreadPoolTaskGroup;
/// available threads are used up by tasks waiting for a task that has no thread
/// left to run on (this includes waiting on the returned future). It should be
/// generally safe to wait() for a group as long as groups do not form a cycle.
class ThreadPool {
class ThreadPoolInterface {
/// The actual method to enqueue a task to be defined by the concrete
/// implementation.
virtual void asyncEnqueue(std::function<void()> Task,
ThreadPoolTaskGroup *Group) = 0;

public:
/// Construct a pool using the hardware strategy \p S for mapping hardware
/// execution resources (threads, cores, CPUs)
/// Defaults to using the maximum execution resources in the system, but
/// accounting for the affinity mask.
ThreadPool(ThreadPoolStrategy S = hardware_concurrency());
/// Destroying the pool will drain the pending tasks and wait. The current
/// thread may participate in the execution of the pending tasks.
virtual ~ThreadPoolInterface();

/// Blocking destructor: the pool will wait for all the threads to complete.
~ThreadPool();
/// Blocking wait for all the threads to complete and the queue to be empty.
/// It is an error to try to add new tasks while blocking on this call.
/// Calling wait() from a task would deadlock waiting for itself.
virtual void wait() = 0;

/// Blocking wait for only all the threads in the given group to complete.
/// It is possible to wait even inside a task, but waiting (directly or
/// indirectly) on itself will deadlock. If called from a task running on a
/// worker thread, the call may process pending tasks while waiting in order
/// not to waste the thread.
virtual void wait(ThreadPoolTaskGroup &Group) = 0;

/// Returns the maximum number of worker this pool can eventually grow to.
virtual unsigned getMaxConcurrency() const = 0;

/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
Expand Down Expand Up @@ -92,102 +104,85 @@ class ThreadPool {
&Group);
}

private:
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
template <typename ResTy>
std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
ThreadPoolTaskGroup *Group) {
auto Future = std::async(std::launch::deferred, std::move(Task)).share();
asyncEnqueue([Future]() { Future.wait(); }, Group);
return Future;
}
};

#if LLVM_ENABLE_THREADS
/// A ThreadPool implementation using std::threads.
///
/// The pool keeps a vector of threads alive, waiting on a condition variable
/// for some work to become available.
class StdThreadPool : public ThreadPoolInterface {
public:
/// Construct a pool using the hardware strategy \p S for mapping hardware
/// execution resources (threads, cores, CPUs)
/// Defaults to using the maximum execution resources in the system, but
/// accounting for the affinity mask.
StdThreadPool(ThreadPoolStrategy S = hardware_concurrency());

/// Blocking destructor: the pool will wait for all the threads to complete.
~StdThreadPool() override;

/// Blocking wait for all the threads to complete and the queue to be empty.
/// It is an error to try to add new tasks while blocking on this call.
/// Calling wait() from a task would deadlock waiting for itself.
void wait();
void wait() override;

/// Blocking wait for only all the threads in the given group to complete.
/// It is possible to wait even inside a task, but waiting (directly or
/// indirectly) on itself will deadlock. If called from a task running on a
/// worker thread, the call may process pending tasks while waiting in order
/// not to waste the thread.
void wait(ThreadPoolTaskGroup &Group);
void wait(ThreadPoolTaskGroup &Group) override;

// Returns the maximum number of worker threads in the pool, not the current
// number of threads!
unsigned getMaxConcurrency() const { return MaxThreadCount; }
/// Returns the maximum number of worker threads in the pool, not the current
/// number of threads!
unsigned getMaxConcurrency() const override { return MaxThreadCount; }

// TODO: misleading legacy name warning!
// TODO: Remove, misleading legacy name warning!
LLVM_DEPRECATED("Use getMaxConcurrency instead", "getMaxConcurrency")
unsigned getThreadCount() const { return MaxThreadCount; }

/// Returns true if the current thread is a worker thread of this thread pool.
bool isWorkerThread() const;

private:
/// Helpers to create a promise and a callable wrapper of \p Task that sets
/// the result of the promise. Returns the callable and a future to access the
/// result.
template <typename ResTy>
static std::pair<std::function<void()>, std::future<ResTy>>
createTaskAndFuture(std::function<ResTy()> Task) {
std::shared_ptr<std::promise<ResTy>> Promise =
std::make_shared<std::promise<ResTy>>();
auto F = Promise->get_future();
return {
[Promise = std::move(Promise), Task]() { Promise->set_value(Task()); },
std::move(F)};
}
static std::pair<std::function<void()>, std::future<void>>
createTaskAndFuture(std::function<void()> Task) {
std::shared_ptr<std::promise<void>> Promise =
std::make_shared<std::promise<void>>();
auto F = Promise->get_future();
return {[Promise = std::move(Promise), Task]() {
Task();
Promise->set_value();
},
std::move(F)};
}

/// Returns true if all tasks in the given group have finished (nullptr means
/// all tasks regardless of their group). QueueLock must be locked.
bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;

/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
template <typename ResTy>
std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
ThreadPoolTaskGroup *Group) {

#if LLVM_ENABLE_THREADS
/// Wrap the Task in a std::function<void()> that sets the result of the
/// corresponding future.
auto R = createTaskAndFuture(Task);

void asyncEnqueue(std::function<void()> Task,
ThreadPoolTaskGroup *Group) override {
int requestedThreads;
{
// Lock the queue and push the new task
std::unique_lock<std::mutex> LockGuard(QueueLock);

// Don't allow enqueueing after disabling the pool
assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
Tasks.emplace_back(std::make_pair(std::move(R.first), Group));
Tasks.emplace_back(std::make_pair(std::move(Task), Group));
requestedThreads = ActiveThreads + Tasks.size();
}
QueueCondition.notify_one();
grow(requestedThreads);
return R.second.share();

#else // LLVM_ENABLE_THREADS Disabled

// Get a Future with launch::deferred execution using std::async
auto Future = std::async(std::launch::deferred, std::move(Task)).share();
// Wrap the future so that both ThreadPool::wait() can operate and the
// returned future can be sync'ed on.
Tasks.emplace_back(std::make_pair([Future]() { Future.get(); }, Group));
return Future;
#endif
}

#if LLVM_ENABLE_THREADS
// Grow to ensure that we have at least `requested` Threads, but do not go
// over MaxThreadCount.
/// Grow to ensure that we have at least `requested` Threads, but do not go
/// over MaxThreadCount.
void grow(int requested);

void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
#endif

/// Threads in flight
std::vector<llvm::thread> Threads;
Expand All @@ -209,25 +204,68 @@ class ThreadPool {
/// Number of threads active for tasks in the given group (only non-zero).
DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;

#if LLVM_ENABLE_THREADS // avoids warning for unused variable
/// Signal for the destruction of the pool, asking thread to exit.
bool EnableFlag = true;
#endif

const ThreadPoolStrategy Strategy;

/// Maximum number of threads to potentially grow this pool to.
const unsigned MaxThreadCount;
};

#endif // LLVM_ENABLE_THREADS Disabled

/// A non-threaded implementation.
class SingleThreadExecutor : public ThreadPoolInterface {
public:
/// Construct a non-threaded pool, ignoring using the hardware strategy.
SingleThreadExecutor(ThreadPoolStrategy ignored = {});

/// Blocking destructor: the pool will first execute the pending tasks.
~SingleThreadExecutor() override;

/// Blocking wait for all the tasks to execute first
void wait() override;

/// Blocking wait for only all the tasks in the given group to complete.
void wait(ThreadPoolTaskGroup &Group) override;

/// Returns always 1: there is no concurrency.
unsigned getMaxConcurrency() const override { return 1; }

// TODO: Remove, misleading legacy name warning!
LLVM_DEPRECATED("Use getMaxConcurrency instead", "getMaxConcurrency")
unsigned getThreadCount() const { return 1; }

/// Returns true if the current thread is a worker thread of this thread pool.
bool isWorkerThread() const;

private:
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
void asyncEnqueue(std::function<void()> Task,
ThreadPoolTaskGroup *Group) override {
Tasks.emplace_back(std::make_pair(std::move(Task), Group));
}

/// Tasks waiting for execution in the pool.
std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
};

#if LLVM_ENABLE_THREADS
using ThreadPool = StdThreadPool;
#else
using ThreadPool = SingleThreadExecutor;
#endif

/// A group of tasks to be run on a thread pool. Thread pool tasks in different
/// groups can run on the same threadpool but can be waited for separately.
/// It is even possible for tasks of one group to submit and wait for tasks
/// of another group, as long as this does not form a loop.
class ThreadPoolTaskGroup {
public:
/// The ThreadPool argument is the thread pool to forward calls to.
ThreadPoolTaskGroup(ThreadPool &Pool) : Pool(Pool) {}
ThreadPoolTaskGroup(ThreadPoolInterface &Pool) : Pool(Pool) {}

/// Blocking destructor: will wait for all the tasks in the group to complete
/// by calling ThreadPool::wait().
Expand All @@ -244,7 +282,7 @@ class ThreadPoolTaskGroup {
void wait() { Pool.wait(*this); }

private:
ThreadPool &Pool;
ThreadPoolInterface &Pool;
};

} // namespace llvm
Expand Down
3 changes: 2 additions & 1 deletion llvm/lib/Debuginfod/Debuginfod.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ DebuginfodLogEntry DebuginfodLog::pop() {
}

DebuginfodCollection::DebuginfodCollection(ArrayRef<StringRef> PathsRef,
DebuginfodLog &Log, ThreadPool &Pool,
DebuginfodLog &Log,
ThreadPoolInterface &Pool,
double MinInterval)
: Log(Log), Pool(Pool), MinInterval(MinInterval) {
for (StringRef Path : PathsRef)
Expand Down
Loading