Skip to content

Commit fdc9b1b

Browse files
authored
removes limit rate operator (#829)
1 parent cec7a78 commit fdc9b1b

File tree

4 files changed

+170
-8
lines changed

4 files changed

+170
-8
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,6 @@ void hookOnFirstRequest(long n) {
491491
receivers.put(streamId, receiver);
492492

493493
inboundFlux
494-
.limitRate(Queues.SMALL_BUFFER_SIZE)
495494
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
496495
.subscribe(upstreamSubscriber);
497496

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import reactor.core.Exceptions;
4848
import reactor.core.publisher.*;
4949
import reactor.util.annotation.Nullable;
50-
import reactor.util.concurrent.Queues;
5150

5251
/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
5352
class RSocketResponder implements RSocket {
@@ -526,10 +525,7 @@ protected void hookFinally(SignalType type) {
526525
};
527526

528527
sendingSubscriptions.put(streamId, subscriber);
529-
response
530-
.limitRate(Queues.SMALL_BUFFER_SIZE)
531-
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
532-
.subscribe(subscriber);
528+
response.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER).subscribe(subscriber);
533529
}
534530

535531
private void handleChannel(int streamId, Payload payload, long initialRequestN) {
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package io.rsocket.examples.transport.tcp.plugins;
2+
3+
import io.rsocket.Payload;
4+
import io.rsocket.RSocket;
5+
import io.rsocket.SocketAcceptor;
6+
import io.rsocket.core.RSocketConnector;
7+
import io.rsocket.core.RSocketServer;
8+
import io.rsocket.examples.transport.tcp.stream.StreamingClient;
9+
import io.rsocket.plugins.RSocketInterceptor;
10+
import io.rsocket.transport.netty.client.TcpClientTransport;
11+
import io.rsocket.transport.netty.server.TcpServerTransport;
12+
import io.rsocket.util.DefaultPayload;
13+
import io.rsocket.util.RSocketProxy;
14+
import java.time.Duration;
15+
import java.util.concurrent.ArrayBlockingQueue;
16+
import java.util.concurrent.BlockingQueue;
17+
import org.reactivestreams.Publisher;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
import reactor.core.publisher.Flux;
21+
import reactor.util.concurrent.Queues;
22+
23+
public class LimitRateInterceptorExample {
24+
25+
private static final Logger logger = LoggerFactory.getLogger(StreamingClient.class);
26+
27+
public static void main(String[] args) {
28+
BlockingQueue<String> requests = new ArrayBlockingQueue<>(100);
29+
RSocketServer.create(
30+
SocketAcceptor.with(
31+
new RSocket() {
32+
@Override
33+
public Flux<Payload> requestStream(Payload payload) {
34+
return Flux.interval(Duration.ofMillis(100))
35+
.doOnRequest(e -> requests.add("Responder requestN(" + e + ")"))
36+
.map(aLong -> DefaultPayload.create("Interval: " + aLong));
37+
}
38+
39+
@Override
40+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
41+
return Flux.from(payloads)
42+
.doOnRequest(e -> requests.add("Responder requestN(" + e + ")"));
43+
}
44+
}))
45+
.interceptors(
46+
ir ->
47+
ir.forRequester(LimitRateInterceptor.forRequester())
48+
.forResponder(LimitRateInterceptor.forResponder()))
49+
.bind(TcpServerTransport.create("localhost", 7000))
50+
.subscribe();
51+
52+
RSocket socket =
53+
RSocketConnector.create()
54+
.interceptors(
55+
ir ->
56+
ir.forRequester(LimitRateInterceptor.forRequester())
57+
.forResponder(LimitRateInterceptor.forResponder()))
58+
.connect(TcpClientTransport.create("localhost", 7000))
59+
.block();
60+
61+
socket
62+
.requestStream(DefaultPayload.create("Hello"))
63+
.doOnRequest(e -> requests.add("Requester requestN(" + e + ")"))
64+
.map(Payload::getDataUtf8)
65+
.doOnNext(logger::debug)
66+
.take(10)
67+
.then()
68+
.block();
69+
70+
requests.forEach(request -> logger.debug("Requested : {}", request));
71+
requests.clear();
72+
73+
logger.debug("-----------------------------------------------------------------");
74+
logger.debug("Does requestChannel");
75+
socket
76+
.requestChannel(
77+
Flux.<Payload, Long>generate(
78+
() -> 1L,
79+
(s, sink) -> {
80+
sink.next(DefaultPayload.create("Next " + s));
81+
return ++s;
82+
})
83+
.doOnRequest(e -> requests.add("Requester Upstream requestN(" + e + ")")))
84+
.doOnRequest(e -> requests.add("Requester Downstream requestN(" + e + ")"))
85+
.map(Payload::getDataUtf8)
86+
.doOnNext(logger::debug)
87+
.take(10)
88+
.then()
89+
.doFinally(signalType -> socket.dispose())
90+
.then()
91+
.block();
92+
93+
requests.forEach(request -> logger.debug("Requested : {}", request));
94+
}
95+
96+
static class LimitRateInterceptor implements RSocketInterceptor {
97+
98+
final boolean requesterSide;
99+
final int highTide;
100+
final int lowTide;
101+
102+
LimitRateInterceptor(boolean requesterSide, int highTide, int lowTide) {
103+
this.requesterSide = requesterSide;
104+
this.highTide = highTide;
105+
this.lowTide = lowTide;
106+
}
107+
108+
@Override
109+
public RSocket apply(RSocket socket) {
110+
return new LimitRateRSocket(socket, requesterSide, highTide, lowTide);
111+
}
112+
113+
public static LimitRateInterceptor forRequester() {
114+
return forRequester(Queues.SMALL_BUFFER_SIZE);
115+
}
116+
117+
public static LimitRateInterceptor forRequester(int limit) {
118+
return forRequester(limit, limit);
119+
}
120+
121+
public static LimitRateInterceptor forRequester(int highTide, int lowTide) {
122+
return new LimitRateInterceptor(true, highTide, lowTide);
123+
}
124+
125+
public static LimitRateInterceptor forResponder() {
126+
return forRequester(Queues.SMALL_BUFFER_SIZE);
127+
}
128+
129+
public static LimitRateInterceptor forResponder(int limit) {
130+
return forRequester(limit, limit);
131+
}
132+
133+
public static LimitRateInterceptor forResponder(int highTide, int lowTide) {
134+
return new LimitRateInterceptor(false, highTide, lowTide);
135+
}
136+
}
137+
138+
static class LimitRateRSocket extends RSocketProxy {
139+
140+
final boolean requesterSide;
141+
final int highTide;
142+
final int lowTide;
143+
144+
public LimitRateRSocket(RSocket source, boolean requesterSide, int highTide, int lowTide) {
145+
super(source);
146+
this.requesterSide = requesterSide;
147+
this.highTide = highTide;
148+
this.lowTide = lowTide;
149+
}
150+
151+
@Override
152+
public Flux<Payload> requestStream(Payload payload) {
153+
Flux<Payload> flux = super.requestStream(payload);
154+
if (requesterSide) {
155+
return flux;
156+
}
157+
return flux.limitRate(highTide, lowTide);
158+
}
159+
160+
@Override
161+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
162+
if (requesterSide) {
163+
return super.requestChannel(Flux.from(payloads).limitRate(highTide, lowTide));
164+
}
165+
return super.requestChannel(payloads).limitRate(highTide, lowTide);
166+
}
167+
}
168+
}

rsocket-test/src/main/java/io/rsocket/test/TransportTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,7 @@ default void requestChannel3() {
237237
.expectComplete()
238238
.verify(getTimeout());
239239

240-
Assertions.assertThat(requested.get())
241-
.isEqualTo(256L); // 256 because of eager behavior of limitRate
240+
Assertions.assertThat(requested.get()).isEqualTo(3L);
242241
}
243242

244243
@DisplayName("makes 1 requestChannel request with 512 payloads")

0 commit comments

Comments
 (0)