18
18
19
19
import io .netty .util .ReferenceCounted ;
20
20
import io .rsocket .internal .jctools .queues .MpscUnboundedArrayQueue ;
21
+ import io .rsocket .internal .jctools .queues .SpscUnboundedArrayQueue ;
21
22
import java .util .Objects ;
22
23
import java .util .Queue ;
23
24
import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
@@ -56,6 +57,7 @@ public final class UnboundedProcessor<T> extends FluxProcessor<T, T>
56
57
AtomicLongFieldUpdater .newUpdater (UnboundedProcessor .class , "requested" );
57
58
58
59
final Queue <T > queue ;
60
+ final Queue <T > priorityQueue ;
59
61
volatile boolean done ;
60
62
Throwable error ;
61
63
volatile CoreSubscriber <? super T > actual ;
@@ -67,6 +69,7 @@ public final class UnboundedProcessor<T> extends FluxProcessor<T, T>
67
69
68
70
public UnboundedProcessor () {
69
71
this .queue = new MpscUnboundedArrayQueue <>(Queues .SMALL_BUFFER_SIZE );
72
+ this .priorityQueue = new SpscUnboundedArrayQueue <>(Queues .SMALL_BUFFER_SIZE );
70
73
}
71
74
72
75
@ Override
@@ -84,6 +87,7 @@ void drainRegular(Subscriber<? super T> a) {
84
87
int missed = 1 ;
85
88
86
89
final Queue <T > q = queue ;
90
+ final Queue <T > pq = priorityQueue ;
87
91
88
92
for (; ; ) {
89
93
@@ -93,10 +97,18 @@ void drainRegular(Subscriber<? super T> a) {
93
97
while (r != e ) {
94
98
boolean d = done ;
95
99
96
- T t = q .poll ();
97
- boolean empty = t == null ;
100
+ T t ;
101
+ boolean empty ;
102
+
103
+ if (!pq .isEmpty ()) {
104
+ t = pq .poll ();
105
+ empty = false ;
106
+ } else {
107
+ t = q .poll ();
108
+ empty = t == null ;
109
+ }
98
110
99
- if (checkTerminated (d , empty , a , q )) {
111
+ if (checkTerminated (d , empty , a , q , pq )) {
100
112
return ;
101
113
}
102
114
@@ -110,7 +122,7 @@ void drainRegular(Subscriber<? super T> a) {
110
122
}
111
123
112
124
if (r == e ) {
113
- if (checkTerminated (done , q .isEmpty (), a , q )) {
125
+ if (checkTerminated (done , q .isEmpty () && pq . isEmpty () , a , q , pq )) {
114
126
return ;
115
127
}
116
128
}
@@ -130,11 +142,13 @@ void drainFused(Subscriber<? super T> a) {
130
142
int missed = 1 ;
131
143
132
144
final Queue <T > q = queue ;
145
+ final Queue <T > pq = priorityQueue ;
133
146
134
147
for (; ; ) {
135
148
136
149
if (cancelled ) {
137
150
q .clear ();
151
+ pq .clear ();
138
152
actual = null ;
139
153
return ;
140
154
}
@@ -188,14 +202,21 @@ public void drain() {
188
202
}
189
203
}
190
204
191
- boolean checkTerminated (boolean d , boolean empty , Subscriber <? super T > a , Queue <T > q ) {
205
+ boolean checkTerminated (
206
+ boolean d , boolean empty , Subscriber <? super T > a , Queue <T > q , Queue <T > pq ) {
192
207
if (cancelled ) {
193
208
while (!q .isEmpty ()) {
194
209
T t = q .poll ();
195
210
if (t != null ) {
196
211
release (t );
197
212
}
198
213
}
214
+ while (!pq .isEmpty ()) {
215
+ T t = pq .poll ();
216
+ if (t != null ) {
217
+ release (t );
218
+ }
219
+ }
199
220
actual = null ;
200
221
return true ;
201
222
}
@@ -237,6 +258,23 @@ public Context currentContext() {
237
258
return actual != null ? actual .currentContext () : Context .empty ();
238
259
}
239
260
261
+ public void onNextPrioritized (T t ) {
262
+ if (done || cancelled ) {
263
+ Operators .onNextDropped (t , currentContext ());
264
+ release (t );
265
+ return ;
266
+ }
267
+
268
+ if (!priorityQueue .offer (t )) {
269
+ Throwable ex =
270
+ Operators .onOperatorError (null , Exceptions .failWithOverflow (), t , currentContext ());
271
+ onError (Operators .onOperatorError (null , ex , t , currentContext ()));
272
+ release (t );
273
+ return ;
274
+ }
275
+ drain ();
276
+ }
277
+
240
278
@ Override
241
279
public void onNext (T t ) {
242
280
if (done || cancelled ) {
@@ -321,23 +359,29 @@ public void cancel() {
321
359
322
360
@ Override
323
361
public T peek () {
362
+ if (!priorityQueue .isEmpty ()) {
363
+ return priorityQueue .peek ();
364
+ }
324
365
return queue .peek ();
325
366
}
326
367
327
368
@ Override
328
369
@ Nullable
329
370
public T poll () {
371
+ if (!priorityQueue .isEmpty ()) {
372
+ return priorityQueue .poll ();
373
+ }
330
374
return queue .poll ();
331
375
}
332
376
333
377
@ Override
334
378
public int size () {
335
- return queue .size ();
379
+ return priorityQueue . size () + queue .size ();
336
380
}
337
381
338
382
@ Override
339
383
public boolean isEmpty () {
340
- return queue .isEmpty ();
384
+ return priorityQueue . isEmpty () && queue .isEmpty ();
341
385
}
342
386
343
387
@ Override
@@ -348,6 +392,12 @@ public void clear() {
348
392
release (t );
349
393
}
350
394
}
395
+ while (!priorityQueue .isEmpty ()) {
396
+ T t = priorityQueue .poll ();
397
+ if (t != null ) {
398
+ release (t );
399
+ }
400
+ }
351
401
}
352
402
353
403
@ Override
0 commit comments