Skip to content

Commit 504764b

Browse files
committed
more fixes
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 91d3990 commit 504764b

File tree

2 files changed

+7
-17
lines changed

2 files changed

+7
-17
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketRequester.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ private Mono<Void> handleMetadataPush(Payload payload) {
411411
MetadataPushFrameFlyweight.encode(allocator, payload.sliceMetadata().retain());
412412
payload.release();
413413

414-
sendProcessor.onNext(metadataPushFrame);
414+
sendProcessor.onNextPrioritized(metadataPushFrame);
415415
});
416416
}
417417

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ void drainRegular(Subscriber<? super T> a) {
107107
empty = t == null;
108108
}
109109

110-
if (checkTerminated(d, empty, a, q, pq)) {
110+
if (checkTerminated(d, empty, a)) {
111111
return;
112112
}
113113

@@ -121,7 +121,7 @@ void drainRegular(Subscriber<? super T> a) {
121121
}
122122

123123
if (r == e) {
124-
if (checkTerminated(done, q.isEmpty() && pq.isEmpty(), a, q, pq)) {
124+
if (checkTerminated(done, q.isEmpty() && pq.isEmpty(), a)) {
125125
return;
126126
}
127127
}
@@ -140,9 +140,6 @@ void drainRegular(Subscriber<? super T> a) {
140140
void drainFused(Subscriber<? super T> a) {
141141
int missed = 1;
142142

143-
final Queue<T> q = queue;
144-
final Queue<T> pq = priorityQueue;
145-
146143
for (; ; ) {
147144

148145
if (cancelled) {
@@ -201,7 +198,7 @@ public void drain() {
201198
}
202199

203200
boolean checkTerminated(
204-
boolean d, boolean empty, Subscriber<? super T> a, Queue<T> q, Queue<T> pq) {
201+
boolean d, boolean empty, Subscriber<? super T> a) {
205202
if (cancelled) {
206203
clear();
207204
actual = null;
@@ -344,19 +341,12 @@ public void cancel() {
344341
}
345342
}
346343

347-
@Override
348-
public T peek() {
349-
if (!priorityQueue.isEmpty()) {
350-
return priorityQueue.peek();
351-
}
352-
return queue.peek();
353-
}
354-
355344
@Override
356345
@Nullable
357346
public T poll() {
358-
if (!priorityQueue.isEmpty()) {
359-
return priorityQueue.poll();
347+
Queue<T> pq = this.priorityQueue;
348+
if (!pq.isEmpty()) {
349+
return pq.poll();
360350
}
361351
return queue.poll();
362352
}

0 commit comments

Comments
 (0)