@@ -35,6 +35,8 @@ int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) {
35
35
q -> slab .data = buffer ;
36
36
37
37
q -> queue = 0 ;
38
+ q -> tick = equeue_tick ();
39
+ q -> generation = 0 ;
38
40
q -> breaks = 0 ;
39
41
40
42
int err ;
@@ -158,8 +160,19 @@ static inline int equeue_tickdiff(unsigned a, unsigned b) {
158
160
return (int )(a - b );
159
161
}
160
162
161
- static void equeue_enqueue (equeue_t * q , struct equeue_event * e , unsigned ms ) {
163
+ static inline void equeue_incid (equeue_t * q , struct equeue_event * e ) {
164
+ e -> id += 1 ;
165
+ if (e -> id >> (8 * sizeof (int )-1 - q -> npw2 )) {
166
+ e -> id = 1 ;
167
+ }
168
+ }
169
+
170
+ static int equeue_enqueue (equeue_t * q , struct equeue_event * e , unsigned ms ) {
171
+ int id = (e -> id << q -> npw2 ) | ((unsigned char * )e - q -> buffer );
162
172
e -> target = equeue_tick () + ms ;
173
+ e -> generation = q -> generation ;
174
+
175
+ equeue_mutex_lock (& q -> queuelock );
163
176
164
177
struct equeue_event * * p = & q -> queue ;
165
178
while (* p && equeue_tickdiff ((* p )-> target , e -> target ) < 0 ) {
@@ -185,9 +198,31 @@ static void equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned ms) {
185
198
186
199
* p = e ;
187
200
e -> ref = p ;
201
+
202
+ equeue_mutex_unlock (& q -> queuelock );
203
+
204
+ return id ;
188
205
}
189
206
190
- static void equeue_unqueue (equeue_t * q , struct equeue_event * e ) {
207
+ static struct equeue_event * equeue_unqueue (equeue_t * q , int id ) {
208
+ struct equeue_event * e = (struct equeue_event * )
209
+ & q -> buffer [id & ((1 << q -> npw2 )- 1 )];
210
+
211
+ equeue_mutex_lock (& q -> queuelock );
212
+ if (e -> id != id >> q -> npw2 ) {
213
+ equeue_mutex_unlock (& q -> queuelock );
214
+ return 0 ;
215
+ }
216
+
217
+ e -> cb = 0 ;
218
+ e -> period = -1 ;
219
+
220
+ int diff = equeue_tickdiff (e -> target , q -> tick );
221
+ if (diff < 0 || (diff == 0 && e -> generation != q -> generation )) {
222
+ equeue_mutex_unlock (& q -> queuelock );
223
+ return 0 ;
224
+ }
225
+
191
226
if (e -> sibling ) {
192
227
e -> sibling -> next = e -> next ;
193
228
if (e -> sibling -> next ) {
@@ -202,16 +237,41 @@ static void equeue_unqueue(equeue_t *q, struct equeue_event *e) {
202
237
e -> next -> ref = e -> ref ;
203
238
}
204
239
}
240
+
241
+ equeue_incid (q , e );
242
+ equeue_mutex_unlock (& q -> queuelock );
243
+
244
+ return e ;
205
245
}
206
246
207
- static struct equeue_event * equeue_dequeue (equeue_t * q ) {
208
- unsigned target = equeue_tick ();
209
- struct equeue_event * head = 0 ;
210
- struct equeue_event * * tail = & head ;
247
+ static struct equeue_event * equeue_dequeue (equeue_t * q , unsigned target ) {
248
+ equeue_mutex_lock (& q -> queuelock );
211
249
212
- while (q -> queue && equeue_tickdiff (q -> queue -> target , target ) <= 0 ) {
213
- struct equeue_event * es = q -> queue ;
214
- q -> queue = es -> next ;
250
+ q -> generation += 1 ;
251
+ if (equeue_tickdiff (q -> tick , target ) <= 0 ) {
252
+ q -> tick = target ;
253
+ }
254
+
255
+ struct equeue_event * head = q -> queue ;
256
+ struct equeue_event * * p = & head ;
257
+ while (* p && equeue_tickdiff ((* p )-> target , target ) <= 0 ) {
258
+ p = & (* p )-> next ;
259
+ }
260
+
261
+ q -> queue = * p ;
262
+ if (q -> queue ) {
263
+ q -> queue -> ref = & q -> queue ;
264
+ }
265
+
266
+ * p = 0 ;
267
+
268
+ equeue_mutex_unlock (& q -> queuelock );
269
+
270
+ struct equeue_event * * tail = & head ;
271
+ struct equeue_event * ess = head ;
272
+ while (ess ) {
273
+ struct equeue_event * es = ess ;
274
+ ess = es -> next ;
215
275
216
276
struct equeue_event * prev = 0 ;
217
277
for (struct equeue_event * e = es ; e ; e = e -> sibling ) {
@@ -223,59 +283,23 @@ static struct equeue_event *equeue_dequeue(equeue_t *q) {
223
283
tail = & es -> next ;
224
284
}
225
285
226
- if (q -> queue ) {
227
- q -> queue -> ref = & q -> queue ;
228
- }
229
-
230
286
return head ;
231
287
}
232
288
233
- static inline int equeue_incid (equeue_t * q , int id ) {
234
- if ((id + 1 ) >> (8 * sizeof (int )-1 - q -> npw2 )) {
235
- return 1 ;
236
- }
237
-
238
- return id + 1 ;
239
- }
240
-
241
289
int equeue_post (equeue_t * q , void (* cb )(void * ), void * p ) {
242
290
struct equeue_event * e = (struct equeue_event * )p - 1 ;
243
- int id = (e -> id << q -> npw2 ) | ((unsigned char * )e - q -> buffer );
244
291
e -> cb = cb ;
245
292
246
- if (e -> target < 0 ) {
247
- equeue_dealloc (q , e + 1 );
248
- return id ;
249
- }
250
-
251
- equeue_mutex_lock (& q -> queuelock );
252
- equeue_enqueue (q , e , e -> target );
253
- equeue_mutex_unlock (& q -> queuelock );
254
-
293
+ int id = equeue_enqueue (q , e , e -> target );
255
294
equeue_sema_signal (& q -> eventsema );
256
295
return id ;
257
296
}
258
297
259
298
void equeue_cancel (equeue_t * q , int id ) {
260
- struct equeue_event * e = (struct equeue_event * )
261
- & q -> buffer [id & ((1 << q -> npw2 )- 1 )];
262
-
263
- equeue_mutex_lock (& q -> queuelock );
264
- if (e -> id == - id >> q -> npw2 ) {
265
- e -> cb = 0 ;
266
- e -> period = -1 ;
267
- }
268
-
269
- if (e -> id != id >> q -> npw2 ) {
270
- equeue_mutex_unlock (& q -> queuelock );
271
- return ;
299
+ struct equeue_event * e = equeue_unqueue (q , id );
300
+ if (e ) {
301
+ equeue_dealloc (q , e + 1 );
272
302
}
273
-
274
- equeue_unqueue (q , e );
275
- e -> id = equeue_incid (q , e -> id );
276
- equeue_mutex_unlock (& q -> queuelock );
277
-
278
- equeue_dealloc (q , e + 1 );
279
303
}
280
304
281
305
void equeue_break (equeue_t * q ) {
@@ -286,18 +310,12 @@ void equeue_break(equeue_t *q) {
286
310
}
287
311
288
312
void equeue_dispatch (equeue_t * q , int ms ) {
289
- unsigned timeout = equeue_tick () + ms ;
313
+ unsigned tick = equeue_tick ();
314
+ unsigned timeout = tick + ms ;
290
315
291
316
while (1 ) {
292
317
// collect all the available events and next deadline
293
- equeue_mutex_lock (& q -> queuelock );
294
- struct equeue_event * es = equeue_dequeue (q );
295
-
296
- // mark events as in-flight
297
- for (struct equeue_event * e = es ; e ; e = e -> next ) {
298
- e -> id = - e -> id ;
299
- }
300
- equeue_mutex_unlock (& q -> queuelock );
318
+ struct equeue_event * es = equeue_dequeue (q , tick );
301
319
302
320
// dispatch events
303
321
while (es ) {
@@ -310,20 +328,17 @@ void equeue_dispatch(equeue_t *q, int ms) {
310
328
cb (e + 1 );
311
329
}
312
330
313
- // undirty the id and either dealloc or reenqueue periodic events
331
+ // reenqueue periodic events or deallocate
314
332
if (e -> period >= 0 ) {
315
- equeue_mutex_lock (& q -> queuelock );
316
- e -> id = - e -> id ;
317
333
equeue_enqueue (q , e , e -> period );
318
- equeue_mutex_unlock (& q -> queuelock );
319
334
} else {
320
- e -> id = equeue_incid (q , - e -> id );
335
+ equeue_incid (q , e );
321
336
equeue_dealloc (q , e + 1 );
322
337
}
323
338
}
324
339
325
340
int deadline = -1 ;
326
- unsigned tick = equeue_tick ();
341
+ tick = equeue_tick ();
327
342
328
343
// check if we should stop dispatching soon
329
344
if (ms >= 0 ) {
@@ -356,6 +371,9 @@ void equeue_dispatch(equeue_t *q, int ms) {
356
371
}
357
372
equeue_mutex_unlock (& q -> queuelock );
358
373
}
374
+
375
+ // update tick for next iteration
376
+ tick = equeue_tick ();
359
377
}
360
378
}
361
379
0 commit comments