Skip to content

Commit 93297b3

Browse files
committed
reworks multiplexer to avoid group operator usage
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 5b751da commit 93297b3

File tree

16 files changed

+981
-39
lines changed

16 files changed

+981
-39
lines changed
Lines changed: 399 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,399 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.core;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.ByteBufAllocator;
21+
import io.rsocket.Closeable;
22+
import io.rsocket.DuplexConnection;
23+
import io.rsocket.frame.FrameHeaderCodec;
24+
import io.rsocket.frame.FrameUtil;
25+
import io.rsocket.plugins.DuplexConnectionInterceptor.Type;
26+
import io.rsocket.plugins.InitializingInterceptorRegistry;
27+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
28+
import org.reactivestreams.Publisher;
29+
import org.reactivestreams.Subscription;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
import reactor.core.CoreSubscriber;
33+
import reactor.core.publisher.Flux;
34+
import reactor.core.publisher.Mono;
35+
import reactor.core.publisher.Operators;
36+
37+
/**
38+
* {@link DuplexConnection#receive()} is a single stream on which the following type of frames
39+
* arrive:
40+
*
41+
* <ul>
42+
* <li>Frames for streams initiated by the initiator of the connection (client).
43+
* <li>Frames for streams initiated by the acceptor of the connection (server).
44+
* </ul>
45+
*
46+
* <p>The only way to differentiate these two frames is determining whether the stream Id is odd or
47+
* even. Even IDs are for the streams initiated by server and odds are for streams initiated by the
48+
* client.
49+
*/
50+
class ClientServerInputMultiplexer implements CoreSubscriber<ByteBuf>, Closeable {
51+
52+
private static final Logger LOGGER = LoggerFactory.getLogger("io.rsocket.FrameLogger");
53+
private static final InitializingInterceptorRegistry emptyInterceptorRegistry =
54+
new InitializingInterceptorRegistry();
55+
56+
private final InternalDuplexConnection setupReceiver;
57+
private final InternalDuplexConnection serverReceiver;
58+
private final InternalDuplexConnection clientReceiver;
59+
private final DuplexConnection setupConnection;
60+
private final DuplexConnection serverConnection;
61+
private final DuplexConnection clientConnection;
62+
private final DuplexConnection source;
63+
private final boolean isClient;
64+
65+
private Subscription s;
66+
private boolean setupReceived;
67+
68+
private Throwable t;
69+
70+
private volatile int state;
71+
private static final AtomicIntegerFieldUpdater<ClientServerInputMultiplexer> STATE =
72+
AtomicIntegerFieldUpdater.newUpdater(ClientServerInputMultiplexer.class, "state");
73+
74+
public ClientServerInputMultiplexer(DuplexConnection source) {
75+
this(source, emptyInterceptorRegistry, false);
76+
}
77+
78+
public ClientServerInputMultiplexer(
79+
DuplexConnection source, InitializingInterceptorRegistry registry, boolean isClient) {
80+
this.source = source;
81+
this.isClient = isClient;
82+
source = registry.initConnection(Type.SOURCE, source);
83+
84+
if (!isClient) {
85+
setupReceiver = new InternalDuplexConnection(this, source);
86+
setupConnection = registry.initConnection(Type.SETUP, setupReceiver);
87+
} else {
88+
setupReceiver = null;
89+
setupConnection = null;
90+
}
91+
serverReceiver = new InternalDuplexConnection(this, source);
92+
clientReceiver = new InternalDuplexConnection(this, source);
93+
serverConnection = registry.initConnection(Type.SERVER, serverReceiver);
94+
clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
95+
}
96+
97+
public DuplexConnection asClientServerConnection() {
98+
return source;
99+
}
100+
101+
public DuplexConnection asServerConnection() {
102+
return serverConnection;
103+
}
104+
105+
public DuplexConnection asClientConnection() {
106+
return clientConnection;
107+
}
108+
109+
public DuplexConnection asSetupConnection() {
110+
return setupConnection;
111+
}
112+
113+
@Override
114+
public void dispose() {
115+
source.dispose();
116+
}
117+
118+
@Override
119+
public boolean isDisposed() {
120+
return source.isDisposed();
121+
}
122+
123+
@Override
124+
public Mono<Void> onClose() {
125+
return source.onClose();
126+
}
127+
128+
@Override
129+
public void onSubscribe(Subscription s) {
130+
if (Operators.validate(this.s, s)) {
131+
this.s = s;
132+
if (isClient) {
133+
s.request(Long.MAX_VALUE);
134+
} else {
135+
// request first SetupFrame
136+
s.request(1);
137+
}
138+
}
139+
}
140+
141+
@Override
142+
public void onNext(ByteBuf frame) {
143+
int streamId = FrameHeaderCodec.streamId(frame);
144+
final Type type;
145+
if (streamId == 0) {
146+
switch (FrameHeaderCodec.frameType(frame)) {
147+
case SETUP:
148+
case RESUME:
149+
case RESUME_OK:
150+
type = Type.SETUP;
151+
setupReceived = true;
152+
break;
153+
case LEASE:
154+
case KEEPALIVE:
155+
case ERROR:
156+
type = isClient ? Type.CLIENT : Type.SERVER;
157+
break;
158+
default:
159+
type = isClient ? Type.SERVER : Type.CLIENT;
160+
}
161+
} else if ((streamId & 0b1) == 0) {
162+
type = Type.SERVER;
163+
} else {
164+
type = Type.CLIENT;
165+
}
166+
if (!isClient && type != Type.SETUP && !setupReceived) {
167+
final IllegalStateException error =
168+
new IllegalStateException("SETUP or LEASE frame must be received before any others.");
169+
this.s.cancel();
170+
onError(error);
171+
}
172+
173+
switch (type) {
174+
case SETUP:
175+
final InternalDuplexConnection setupReceiver = this.setupReceiver;
176+
setupReceiver.onNext(frame);
177+
setupReceiver.onComplete();
178+
break;
179+
case CLIENT:
180+
clientReceiver.onNext(frame);
181+
break;
182+
case SERVER:
183+
serverReceiver.onNext(frame);
184+
break;
185+
}
186+
}
187+
188+
@Override
189+
public void onComplete() {
190+
final int previousState = STATE.getAndSet(this, Integer.MIN_VALUE);
191+
if (previousState == Integer.MIN_VALUE || previousState == 0) {
192+
return;
193+
}
194+
195+
if (!isClient) {
196+
if (!setupReceived) {
197+
setupReceiver.onComplete();
198+
}
199+
200+
if (previousState == 1) {
201+
return;
202+
}
203+
}
204+
205+
if (clientReceiver.isSubscribed()) {
206+
clientReceiver.onComplete();
207+
}
208+
if (serverReceiver.isSubscribed()) {
209+
serverReceiver.onComplete();
210+
}
211+
}
212+
213+
@Override
214+
public void onError(Throwable t) {
215+
this.t = t;
216+
217+
final int previousState = STATE.getAndSet(this, Integer.MIN_VALUE);
218+
if (previousState == Integer.MIN_VALUE || previousState == 0) {
219+
return;
220+
}
221+
222+
if (!isClient) {
223+
if (!setupReceived) {
224+
setupReceiver.onError(t);
225+
}
226+
227+
if (previousState == 1) {
228+
return;
229+
}
230+
}
231+
232+
if (clientReceiver.isSubscribed()) {
233+
clientReceiver.onError(t);
234+
}
235+
if (serverReceiver.isSubscribed()) {
236+
serverReceiver.onError(t);
237+
}
238+
}
239+
240+
boolean notifyRequested() {
241+
final int currentState = incrementAndGetCheckingState();
242+
if (currentState == Integer.MIN_VALUE) {
243+
return false;
244+
}
245+
246+
if (isClient) {
247+
if (currentState == 2) {
248+
source.receive().subscribe(this);
249+
}
250+
} else {
251+
if (currentState == 1) {
252+
source.receive().subscribe(this);
253+
} else if (currentState == 3) {
254+
// means setup was consumed and we got request from client and server multiplexers
255+
s.request(Long.MAX_VALUE);
256+
}
257+
}
258+
259+
return true;
260+
}
261+
262+
int incrementAndGetCheckingState() {
263+
int prev, next;
264+
for (; ; ) {
265+
prev = this.state;
266+
267+
if (prev == Integer.MIN_VALUE) {
268+
return prev;
269+
}
270+
271+
next = prev + 1;
272+
if (STATE.compareAndSet(this, prev, next)) {
273+
return next;
274+
}
275+
}
276+
}
277+
278+
private static class InternalDuplexConnection extends Flux<ByteBuf>
279+
implements Subscription, DuplexConnection {
280+
private final ClientServerInputMultiplexer clientServerInputMultiplexer;
281+
private final DuplexConnection source;
282+
private final boolean debugEnabled;
283+
284+
private volatile int state;
285+
static final AtomicIntegerFieldUpdater<InternalDuplexConnection> STATE =
286+
AtomicIntegerFieldUpdater.newUpdater(InternalDuplexConnection.class, "state");
287+
288+
CoreSubscriber<? super ByteBuf> actual;
289+
290+
public InternalDuplexConnection(
291+
ClientServerInputMultiplexer clientServerInputMultiplexer, DuplexConnection source) {
292+
this.clientServerInputMultiplexer = clientServerInputMultiplexer;
293+
this.source = source;
294+
this.debugEnabled = LOGGER.isDebugEnabled();
295+
}
296+
297+
@Override
298+
public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
299+
if (this.state == 0 && STATE.compareAndSet(this, 0, 1)) {
300+
this.actual = actual;
301+
actual.onSubscribe(this);
302+
} else {
303+
Operators.error(
304+
actual,
305+
new IllegalStateException("InternalDuplexConnection allows only single subscription"));
306+
}
307+
}
308+
309+
@Override
310+
public void request(long n) {
311+
if (this.state == 1 && STATE.compareAndSet(this, 1, 2)) {
312+
final ClientServerInputMultiplexer multiplexer = clientServerInputMultiplexer;
313+
if (!multiplexer.notifyRequested()) {
314+
final Throwable t = multiplexer.t;
315+
if (t != null) {
316+
this.actual.onError(t);
317+
} else {
318+
this.actual.onComplete();
319+
}
320+
}
321+
}
322+
}
323+
324+
@Override
325+
public void cancel() {
326+
// no ops
327+
}
328+
329+
void onNext(ByteBuf frame) {
330+
this.actual.onNext(frame);
331+
}
332+
333+
void onComplete() {
334+
this.actual.onComplete();
335+
}
336+
337+
void onError(Throwable t) {
338+
this.actual.onError(t);
339+
}
340+
341+
@Override
342+
public Mono<Void> send(Publisher<ByteBuf> frame) {
343+
if (debugEnabled) {
344+
return Flux.from(frame)
345+
.doOnNext(f -> LOGGER.debug("sending -> " + FrameUtil.toString(f)))
346+
.as(source::send);
347+
}
348+
349+
return source.send(frame);
350+
}
351+
352+
@Override
353+
public Mono<Void> sendOne(ByteBuf frame) {
354+
if (debugEnabled) {
355+
LOGGER.debug("sending -> " + FrameUtil.toString(frame));
356+
}
357+
358+
return source.sendOne(frame);
359+
}
360+
361+
@Override
362+
public Flux<ByteBuf> receive() {
363+
if (debugEnabled) {
364+
return this.doOnNext(frame -> LOGGER.debug("receiving -> " + FrameUtil.toString(frame)));
365+
} else {
366+
return this;
367+
}
368+
}
369+
370+
@Override
371+
public ByteBufAllocator alloc() {
372+
return source.alloc();
373+
}
374+
375+
@Override
376+
public void dispose() {
377+
source.dispose();
378+
}
379+
380+
@Override
381+
public boolean isDisposed() {
382+
return source.isDisposed();
383+
}
384+
385+
public boolean isSubscribed() {
386+
return this.state != 0;
387+
}
388+
389+
@Override
390+
public Mono<Void> onClose() {
391+
return source.onClose();
392+
}
393+
394+
@Override
395+
public double availability() {
396+
return source.availability();
397+
}
398+
}
399+
}

0 commit comments

Comments
 (0)