Skip to content

Commit c0a922b

Browse files
committed
Add parallelTransformReduce and parallelForEachError
parallelTransformReduce is modelled on the C++17 pstl API of std::transform_reduce, except our wrappers do not use execution policy parameters. parallelForEachError allows loops that contain potentially failing operations to propagate errors out of the loop. This was one of the major challenges I encountered while parallelizing PDB type merging in LLD. Parallelizing a loop with parallelForEachError is not behavior preserving: the loop will no longer stop on the first error, it will continue working and report all errors it encounters in a list. I plan to use this to propagate errors out of LLD's coff::TpiSource::remapTpiWithGHashes, which currently stores errors an error in the TpiSource object. Differential Revision: https://reviews.llvm.org/D90639
1 parent ca01a6b commit c0a922b

File tree

2 files changed

+141
-6
lines changed

2 files changed

+141
-6
lines changed

llvm/include/llvm/Support/Parallel.h

Lines changed: 98 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#include "llvm/ADT/STLExtras.h"
1313
#include "llvm/Config/llvm-config.h"
14+
#include "llvm/Support/Error.h"
1415
#include "llvm/Support/MathExtras.h"
1516
#include "llvm/Support/Threading.h"
1617

@@ -120,13 +121,17 @@ void parallel_sort(RandomAccessIterator Start, RandomAccessIterator End,
120121
llvm::Log2_64(std::distance(Start, End)) + 1);
121122
}
122123

124+
// TaskGroup has a relatively high overhead, so we want to reduce
125+
// the number of spawn() calls. We'll create up to 1024 tasks here.
126+
// (Note that 1024 is an arbitrary number. This code probably needs
127+
// improving to take the number of available cores into account.)
128+
enum { MaxTasksPerGroup = 1024 };
129+
123130
template <class IterTy, class FuncTy>
124131
void parallel_for_each(IterTy Begin, IterTy End, FuncTy Fn) {
125-
// TaskGroup has a relatively high overhead, so we want to reduce
126-
// the number of spawn() calls. We'll create up to 1024 tasks here.
127-
// (Note that 1024 is an arbitrary number. This code probably needs
128-
// improving to take the number of available cores into account.)
129-
ptrdiff_t TaskSize = std::distance(Begin, End) / 1024;
132+
// Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
133+
// overhead on large inputs.
134+
ptrdiff_t TaskSize = std::distance(Begin, End) / MaxTasksPerGroup;
130135
if (TaskSize == 0)
131136
TaskSize = 1;
132137

@@ -140,7 +145,9 @@ void parallel_for_each(IterTy Begin, IterTy End, FuncTy Fn) {
140145

141146
template <class IndexTy, class FuncTy>
142147
void parallel_for_each_n(IndexTy Begin, IndexTy End, FuncTy Fn) {
143-
ptrdiff_t TaskSize = (End - Begin) / 1024;
148+
// Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
149+
// overhead on large inputs.
150+
ptrdiff_t TaskSize = (End - Begin) / MaxTasksPerGroup;
144151
if (TaskSize == 0)
145152
TaskSize = 1;
146153

@@ -156,6 +163,50 @@ void parallel_for_each_n(IndexTy Begin, IndexTy End, FuncTy Fn) {
156163
Fn(J);
157164
}
158165

166+
template <class IterTy, class ResultTy, class ReduceFuncTy,
167+
class TransformFuncTy>
168+
ResultTy parallel_transform_reduce(IterTy Begin, IterTy End, ResultTy Init,
169+
ReduceFuncTy Reduce,
170+
TransformFuncTy Transform) {
171+
// Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
172+
// overhead on large inputs.
173+
size_t NumInputs = std::distance(Begin, End);
174+
if (NumInputs == 0)
175+
return std::move(Init);
176+
size_t NumTasks = std::min(static_cast<size_t>(MaxTasksPerGroup), NumInputs);
177+
std::vector<ResultTy> Results(NumTasks, Init);
178+
{
179+
// Each task processes either TaskSize or TaskSize+1 inputs. Any inputs
180+
// remaining after dividing them equally amongst tasks are distributed as
181+
// one extra input over the first tasks.
182+
TaskGroup TG;
183+
size_t TaskSize = NumInputs / NumTasks;
184+
size_t RemainingInputs = NumInputs % NumTasks;
185+
IterTy TBegin = Begin;
186+
for (size_t TaskId = 0; TaskId < NumTasks; ++TaskId) {
187+
IterTy TEnd = TBegin + TaskSize + (TaskId < RemainingInputs ? 1 : 0);
188+
TG.spawn([=, &Transform, &Reduce, &Results] {
189+
// Reduce the result of transformation eagerly within each task.
190+
ResultTy R = Init;
191+
for (IterTy It = TBegin; It != TEnd; ++It)
192+
R = Reduce(R, Transform(*It));
193+
Results[TaskId] = R;
194+
});
195+
TBegin = TEnd;
196+
}
197+
assert(TBegin == End);
198+
}
199+
200+
// Do a final reduction. There are at most 1024 tasks, so this only adds
201+
// constant single-threaded overhead for large inputs. Hopefully most
202+
// reductions are cheaper than the transformation.
203+
ResultTy FinalResult = std::move(Results.front());
204+
for (ResultTy &PartialResult :
205+
makeMutableArrayRef(Results.data() + 1, Results.size() - 1))
206+
FinalResult = Reduce(FinalResult, std::move(PartialResult));
207+
return std::move(FinalResult);
208+
}
209+
159210
#endif
160211

161212
} // namespace detail
@@ -198,6 +249,22 @@ void parallelForEachN(size_t Begin, size_t End, FuncTy Fn) {
198249
Fn(I);
199250
}
200251

252+
template <class IterTy, class ResultTy, class ReduceFuncTy,
253+
class TransformFuncTy>
254+
ResultTy parallelTransformReduce(IterTy Begin, IterTy End, ResultTy Init,
255+
ReduceFuncTy Reduce,
256+
TransformFuncTy Transform) {
257+
#if LLVM_ENABLE_THREADS
258+
if (parallel::strategy.ThreadsRequested != 1) {
259+
return parallel::detail::parallel_transform_reduce(Begin, End, Init, Reduce,
260+
Transform);
261+
}
262+
#endif
263+
for (IterTy I = Begin; I != End; ++I)
264+
Init = Reduce(std::move(Init), Transform(*I));
265+
return std::move(Init);
266+
}
267+
201268
// Range wrappers.
202269
template <class RangeTy,
203270
class Comparator = std::less<decltype(*std::begin(RangeTy()))>>
@@ -210,6 +277,31 @@ void parallelForEach(RangeTy &&R, FuncTy Fn) {
210277
parallelForEach(std::begin(R), std::end(R), Fn);
211278
}
212279

280+
template <class RangeTy, class ResultTy, class ReduceFuncTy,
281+
class TransformFuncTy>
282+
ResultTy parallelTransformReduce(RangeTy &&R, ResultTy Init,
283+
ReduceFuncTy Reduce,
284+
TransformFuncTy Transform) {
285+
return parallelTransformReduce(std::begin(R), std::end(R), Init, Reduce,
286+
Transform);
287+
}
288+
289+
// Parallel for-each, but with error handling.
290+
template <class RangeTy, class FuncTy>
291+
Error parallelForEachError(RangeTy &&R, FuncTy Fn) {
292+
// The transform_reduce algorithm requires that the initial value be copyable.
293+
// Error objects are uncopyable. We only need to copy initial success values,
294+
// so work around this mismatch via the C API. The C API represents success
295+
// values with a null pointer. The joinErrors discards null values and joins
296+
// multiple errors into an ErrorList.
297+
return unwrap(parallelTransformReduce(
298+
std::begin(R), std::end(R), wrap(Error::success()),
299+
[](LLVMErrorRef Lhs, LLVMErrorRef Rhs) {
300+
return wrap(joinErrors(unwrap(Lhs), unwrap(Rhs)));
301+
},
302+
[&Fn](auto &&V) { return wrap(Fn(V)); }));
303+
}
304+
213305
} // namespace llvm
214306

215307
#endif // LLVM_SUPPORT_PARALLEL_H

llvm/unittests/Support/ParallelTest.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,47 @@ TEST(Parallel, parallel_for) {
4949
ASSERT_EQ(range[2049], 1u);
5050
}
5151

52+
TEST(Parallel, TransformReduce) {
53+
// Sum an empty list, check that it works.
54+
auto identity = [](uint32_t v) { return v; };
55+
uint32_t sum = parallelTransformReduce(ArrayRef<uint32_t>(), 0U,
56+
std::plus<uint32_t>(), identity);
57+
EXPECT_EQ(sum, 0U);
58+
59+
// Sum the lengths of these strings in parallel.
60+
const char *strs[] = {"a", "ab", "abc", "abcd", "abcde", "abcdef"};
61+
size_t lenSum =
62+
parallelTransformReduce(strs, static_cast<size_t>(0), std::plus<size_t>(),
63+
[](const char *s) { return strlen(s); });
64+
EXPECT_EQ(lenSum, static_cast<size_t>(21));
65+
66+
// Check that we handle non-divisible task sizes as above.
67+
uint32_t range[2050];
68+
std::fill(std::begin(range), std::end(range), 1);
69+
sum = parallelTransformReduce(range, 0U, std::plus<uint32_t>(), identity);
70+
EXPECT_EQ(sum, 2050U);
71+
72+
std::fill(std::begin(range), std::end(range), 2);
73+
sum = parallelTransformReduce(range, 0U, std::plus<uint32_t>(), identity);
74+
EXPECT_EQ(sum, 4100U);
75+
76+
// Avoid one large task.
77+
uint32_t range2[3060];
78+
std::fill(std::begin(range2), std::end(range2), 1);
79+
sum = parallelTransformReduce(range2, 0U, std::plus<uint32_t>(), identity);
80+
EXPECT_EQ(sum, 3060U);
81+
}
82+
83+
TEST(Parallel, ForEachError) {
84+
int nums[] = {1, 2, 3, 4, 5, 6};
85+
Error e = parallelForEachError(nums, [](int v) -> Error {
86+
if ((v & 1) == 0)
87+
return createStringError(std::errc::invalid_argument, "asdf");
88+
return Error::success();
89+
});
90+
EXPECT_TRUE(e.isA<ErrorList>());
91+
std::string errText = toString(std::move(e));
92+
EXPECT_EQ(errText, std::string("asdf\nasdf\nasdf"));
93+
}
94+
5295
#endif

0 commit comments

Comments
 (0)