|
43 | 43 | public final class UnboundedProcessor<T> extends FluxProcessor<T, T>
|
44 | 44 | implements Fuseable.QueueSubscription<T>, Fuseable {
|
45 | 45 |
|
46 |
| - |
47 | 46 | final Queue<T> queue;
|
48 | 47 | final Queue<T> priorityQueue;
|
49 | 48 |
|
50 | 49 | volatile boolean done;
|
51 | 50 | Throwable error;
|
52 | 51 |
|
53 |
| - boolean hasDownstream; //important to not loose the downstream too early and miss discard hook, while having relevant hasDownstreams() |
| 52 | + boolean |
| 53 | + hasDownstream; // important to not loose the downstream too early and miss discard hook, while |
| 54 | + // having relevant hasDownstreams() |
54 | 55 | volatile CoreSubscriber<? super T> actual;
|
55 | 56 |
|
56 | 57 | volatile boolean cancelled;
|
57 | 58 |
|
58 | 59 | volatile int once;
|
| 60 | + |
59 | 61 | @SuppressWarnings("rawtypes")
|
60 | 62 | static final AtomicIntegerFieldUpdater<UnboundedProcessor> ONCE =
|
61 | 63 | AtomicIntegerFieldUpdater.newUpdater(UnboundedProcessor.class, "once");
|
62 | 64 |
|
63 | 65 | volatile int wip;
|
| 66 | + |
64 | 67 | @SuppressWarnings("rawtypes")
|
65 | 68 | static final AtomicIntegerFieldUpdater<UnboundedProcessor> WIP =
|
66 | 69 | AtomicIntegerFieldUpdater.newUpdater(UnboundedProcessor.class, "wip");
|
67 | 70 |
|
68 | 71 | volatile int discardGuard;
|
| 72 | + |
69 | 73 | @SuppressWarnings("rawtypes")
|
70 | 74 | static final AtomicIntegerFieldUpdater<UnboundedProcessor> DISCARD_GUARD =
|
71 |
| - AtomicIntegerFieldUpdater.newUpdater(UnboundedProcessor.class, "discardGuard"); |
| 75 | + AtomicIntegerFieldUpdater.newUpdater(UnboundedProcessor.class, "discardGuard"); |
72 | 76 |
|
73 | 77 | volatile long requested;
|
| 78 | + |
74 | 79 | @SuppressWarnings("rawtypes")
|
75 | 80 | static final AtomicLongFieldUpdater<UnboundedProcessor> REQUESTED =
|
76 | 81 | AtomicLongFieldUpdater.newUpdater(UnboundedProcessor.class, "requested");
|
@@ -379,7 +384,7 @@ public void clear() {
|
379 | 384 |
|
380 | 385 | int missed = 1;
|
381 | 386 |
|
382 |
| - for (;;) { |
| 387 | + for (; ; ) { |
383 | 388 | while (!queue.isEmpty()) {
|
384 | 389 | T t = queue.poll();
|
385 | 390 | if (t != null) {
|
|
0 commit comments