@@ -136,13 +136,14 @@ struct AsyncToken : public RefCounted {
136
136
// asynchronously executed task. If the caller immediately will drop its
137
137
// reference we must ensure that the token will be alive until the
138
138
// asynchronous operation is completed.
139
- AsyncToken (AsyncRuntime *runtime) : RefCounted(runtime, /* count=*/ 2 ) {}
139
+ AsyncToken (AsyncRuntime *runtime)
140
+ : RefCounted(runtime, /* count=*/ 2 ), ready(false ) {}
140
141
141
- // Internal state below guarded by a mutex.
142
+ std::atomic<bool > ready;
143
+
144
+ // Pending awaiters are guarded by a mutex.
142
145
std::mutex mu;
143
146
std::condition_variable cv;
144
-
145
- bool ready = false ;
146
147
std::vector<std::function<void ()>> awaiters;
147
148
};
148
149
@@ -152,17 +153,17 @@ struct AsyncToken : public RefCounted {
152
153
struct AsyncValue : public RefCounted {
153
154
// AsyncValue similar to an AsyncToken created with a reference count of 2.
154
155
AsyncValue (AsyncRuntime *runtime, int32_t size)
155
- : RefCounted(runtime, /* count=*/ 2 ), storage(size) {}
156
-
157
- // Internal state below guarded by a mutex.
158
- std::mutex mu;
159
- std::condition_variable cv;
156
+ : RefCounted(runtime, /* count=*/ 2 ), ready(false ), storage(size) {}
160
157
161
- bool ready = false ;
162
- std::vector<std::function<void ()>> awaiters;
158
+ std::atomic<bool > ready;
163
159
164
160
// Use vector of bytes to store async value payload.
165
161
std::vector<int8_t > storage;
162
+
163
+ // Pending awaiters are guarded by a mutex.
164
+ std::mutex mu;
165
+ std::condition_variable cv;
166
+ std::vector<std::function<void ()>> awaiters;
166
167
};
167
168
168
169
// Async group provides a mechanism to group together multiple async tokens or
@@ -175,10 +176,9 @@ struct AsyncGroup : public RefCounted {
175
176
std::atomic<int > pendingTokens;
176
177
std::atomic<int > rank;
177
178
178
- // Internal state below guarded by a mutex.
179
+ // Pending awaiters are guarded by a mutex.
179
180
std::mutex mu;
180
181
std::condition_variable cv;
181
-
182
182
std::vector<std::function<void ()>> awaiters;
183
183
};
184
184
@@ -291,13 +291,13 @@ extern "C" void mlirAsyncRuntimeEmplaceValue(AsyncValue *value) {
291
291
extern " C" void mlirAsyncRuntimeAwaitToken (AsyncToken *token) {
292
292
std::unique_lock<std::mutex> lock (token->mu );
293
293
if (!token->ready )
294
- token->cv .wait (lock, [token] { return token->ready ; });
294
+ token->cv .wait (lock, [token] { return token->ready . load () ; });
295
295
}
296
296
297
297
extern " C" void mlirAsyncRuntimeAwaitValue (AsyncValue *value) {
298
298
std::unique_lock<std::mutex> lock (value->mu );
299
299
if (!value->ready )
300
- value->cv .wait (lock, [value] { return value->ready ; });
300
+ value->cv .wait (lock, [value] { return value->ready . load () ; });
301
301
}
302
302
303
303
extern " C" void mlirAsyncRuntimeAwaitAllInGroup (AsyncGroup *group) {
@@ -319,34 +319,37 @@ extern "C" void mlirAsyncRuntimeExecute(CoroHandle handle, CoroResume resume) {
319
319
extern " C" void mlirAsyncRuntimeAwaitTokenAndExecute (AsyncToken *token,
320
320
CoroHandle handle,
321
321
CoroResume resume) {
322
- std::unique_lock<std::mutex> lock (token->mu );
323
322
auto execute = [handle, resume]() { (*resume)(handle); };
324
- if (token->ready )
323
+ if (token->ready ) {
325
324
execute ();
326
- else
325
+ } else {
326
+ std::unique_lock<std::mutex> lock (token->mu );
327
327
token->awaiters .push_back ([execute]() { execute (); });
328
+ }
328
329
}
329
330
330
331
extern " C" void mlirAsyncRuntimeAwaitValueAndExecute (AsyncValue *value,
331
332
CoroHandle handle,
332
333
CoroResume resume) {
333
- std::unique_lock<std::mutex> lock (value->mu );
334
334
auto execute = [handle, resume]() { (*resume)(handle); };
335
- if (value->ready )
335
+ if (value->ready ) {
336
336
execute ();
337
- else
337
+ } else {
338
+ std::unique_lock<std::mutex> lock (value->mu );
338
339
value->awaiters .push_back ([execute]() { execute (); });
340
+ }
339
341
}
340
342
341
343
extern " C" void mlirAsyncRuntimeAwaitAllInGroupAndExecute (AsyncGroup *group,
342
344
CoroHandle handle,
343
345
CoroResume resume) {
344
- std::unique_lock<std::mutex> lock (group->mu );
345
346
auto execute = [handle, resume]() { (*resume)(handle); };
346
- if (group->pendingTokens == 0 )
347
+ if (group->pendingTokens == 0 ) {
347
348
execute ();
348
- else
349
+ } else {
350
+ std::unique_lock<std::mutex> lock (group->mu );
349
351
group->awaiters .push_back ([execute]() { execute (); });
352
+ }
350
353
}
351
354
352
355
// ===----------------------------------------------------------------------===//
0 commit comments