@@ -106,9 +106,28 @@ class AtomicWaitQueue {
106
106
return referenceCount == 1 ;
107
107
}
108
108
109
+ // / This queue is being re-used with new construction arguments.
110
+ // / Update it appropriately.
111
+ void updateForNewArguments () {
112
+ // We intentionally take no arguments so that only calls to
113
+ // createQueue with no arguments will succeed at calling this no-op
114
+ // implementation. Queue types with construction arguments
115
+ // will need to implement this method to take the appropriate
116
+ // arguments. Hopefully this discourages people from forgetting
117
+ // that queues can be re-used if created in a loop.
118
+ }
119
+
109
120
// / An RAII helper class for signalling that the current thread is a
110
121
// / worker thread which has acquired the lock.
111
122
// /
123
+ // / `AtomicWaitQueue` does not require the global lock to be held
124
+ // / while creating or publishing the queue. Clients taking advantage
125
+ // / of this should inform the Worker class that a created queue has
126
+ // / been published by calling `flagQueueIsPublished`. Clients who
127
+ // / wish to publish the queue while holding the global lock, perhaps
128
+ // / to get a rule that all stores are done under the lock, may instead
129
+ // / use `tryPublishQueue`.
130
+ // /
112
131
// / The expected use pattern is something like:
113
132
// /
114
133
// / ```
@@ -117,26 +136,47 @@ class AtomicWaitQueue {
117
136
// / while (true) {
118
137
// / if (oldStatus.isDone()) return;
119
138
// /
120
- // / // Try to publish the wait queue. If this succeeds, we've become
121
- // / // the worker thread.
122
- // / bool published = worker.tryPublishQueue([&] {
123
- // / auto newStatus = oldStatus.withLock(worker.createQueue());
124
- // / return myAtomic.compare_exchange_weak(oldStatus, newStatus,
125
- // / /*success*/ std::memory_order_release,
126
- // / /*failure*/ std::memory_order_acquire);
127
- // / });
128
- // / if (!published) continue;
139
+ // / if (oldStatus.hasWaitQueue()) {
140
+ // / bool waited = worker.tryReloadAndWait([&] {
141
+ // / oldStatus = myAtomic.load(std::memory_order_acquire);
142
+ // / return (oldStatus.hasWaitQueue() ? oldStatus.getWaitQueue()
143
+ // / : nullptr);
144
+ // / });
129
145
// /
130
- // / // Do the actual work here.
146
+ // / // If we waited, `oldStatus` will be out of date; reload it.
147
+ // / //
148
+ // / // (For the pattern in this example, where the worker thread
149
+ // / // always transitions the status to done, this is actually
150
+ // / // unnecessary: by virtue of having successfully waited, we've
151
+ // / // synchronized with the worker thread and know that the status
152
+ // / // is done, so we could just return. But in general, this
153
+ // / // reload is necessary.)
154
+ // / if (waited)
155
+ // / oldStatus = myAtomic.load(std::memory_order_acquire);
131
156
// /
132
- // / // "Unpublish" the queue from the the atomic.
133
- // / while (true) {
134
- // / auto newStatus = oldStatus.withDone(true);
135
- // / if (myAtomic.compare_exchange_weak(oldStatus, newStatus,
157
+ // / // Go back and reconsider the updated status.
158
+ // / continue;
159
+ // / }
160
+ // /
161
+ // / // Create a queue and try to publish it. If this succeeds,
162
+ // / // we've become the worker thread. We don't have to worry
163
+ // / // about the queue leaking if we don't use it; that's managed
164
+ // / // by the Worker class.
165
+ // / {
166
+ // / auto queue = worker.createQueue();
167
+ // / auto newStatus = oldStatus.withWaitQueue(queue);
168
+ // / if (!myAtomic.compare_exchange_weak(oldStatus, newStatus,
136
169
// / /*success*/ std::memory_order_release,
137
170
// / /*failure*/ std::memory_order_acquire))
138
- // / break;
171
+ // / continue;
172
+ // / worker.flagQueueIsPublished(queue);
139
173
// / }
174
+ // /
175
+ // / // Do the actual work here.
176
+ // /
177
+ // / // Report that the work is done and "unpublish" the queue from
178
+ // / // the atomic.
179
+ // / myAtomic.store(oldStatus.withDone(true), std::memory_order_release);
140
180
// / worker.finishAndUnpublishQueue([]{});
141
181
// / return;
142
182
// / }
@@ -182,6 +222,19 @@ class AtomicWaitQueue {
182
222
return Published;
183
223
}
184
224
225
+ // / Given that this thread is not the worker thread and there seems
226
+ // / to be a wait queue in place, try to wait on it.
227
+ // /
228
+ // / Acquire the global lock and call the given function. If it
229
+ // / returns a wait queue, wait on that queue and return true;
230
+ // / otherwise, return false.
231
+ template <class Fn >
232
+ bool tryReloadAndWait (Fn &&fn) {
233
+ assert (!isWorkerThread ());
234
+ typename Impl::Waiter waiter (GlobalLock);
235
+ return waiter.tryReloadAndWait (std::forward<Fn>(fn));
236
+ }
237
+
185
238
// / Given that this thread is the worker thread, return the queue
186
239
// / that's been created and published for it.
187
240
Impl *getPublishedQueue () const {
@@ -195,14 +248,18 @@ class AtomicWaitQueue {
195
248
// /
196
249
// / The Worker object takes ownership of the queue until it's
197
250
// / published, so you can safely call this even if publishing
198
- // / might fail. Note that the same queue will be returned on
199
- // / successive invocations, so take care if the arguments might
200
- // / change during the loop.
251
+ // / might fail.
252
+ // /
253
+ // / Note that the same queue will be returned on successive
254
+ // / invocations. Queues that accept arguments for construction
255
+ // / should implement `updateForNewArguments`.
201
256
template <class ... Args>
202
257
Impl *createQueue (Args &&...args) {
203
258
assert (!Published);
204
259
if (!CurrentQueue)
205
260
CurrentQueue = asImpl ().createNewQueue (std::forward<Args>(args)...);
261
+ else
262
+ CurrentQueue->updateForNewArguments (std::forward<Args>(args)...);
206
263
return CurrentQueue;
207
264
}
208
265
0 commit comments