Skip to content

Commit 2680d32

Browse files
committed
provides well documented lease example
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent d8da87a commit 2680d32

File tree

1 file changed

+211
-0
lines changed

1 file changed

+211
-0
lines changed
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.examples.transport.tcp.lease;
18+
19+
import io.rsocket.Payload;
20+
import io.rsocket.RSocket;
21+
import io.rsocket.core.RSocketConnector;
22+
import io.rsocket.core.RSocketServer;
23+
import io.rsocket.lease.Lease;
24+
import io.rsocket.lease.LeaseStats;
25+
import io.rsocket.lease.Leases;
26+
import io.rsocket.lease.MissingLeaseException;
27+
import io.rsocket.transport.netty.client.TcpClientTransport;
28+
import io.rsocket.transport.netty.server.CloseableChannel;
29+
import io.rsocket.transport.netty.server.TcpServerTransport;
30+
import io.rsocket.util.ByteBufPayload;
31+
import java.time.Duration;
32+
import java.util.Objects;
33+
import java.util.Optional;
34+
import java.util.concurrent.ArrayBlockingQueue;
35+
import java.util.concurrent.BlockingQueue;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.function.Consumer;
38+
import java.util.function.Function;
39+
import reactor.core.publisher.Flux;
40+
import reactor.core.publisher.Mono;
41+
import reactor.core.publisher.ReplayProcessor;
42+
import reactor.util.retry.Retry;
43+
44+
public class RateLimitingWithLeaseExample {
45+
private static final String SERVER_TAG = "server";
46+
private static final String CLIENT_TAG = "client";
47+
48+
public static void main(String[] args) {
49+
// Queue for incoming messages represented as Flux
50+
// Imagine that every fireAndForget that is pushed is processed by a worker
51+
52+
int queueCapacity = 50;
53+
BlockingQueue<String> messagesQueue = new ArrayBlockingQueue<>(queueCapacity);
54+
55+
// emulating a worker that process data from the queue
56+
Thread workerThread =
57+
new Thread(
58+
() -> {
59+
try {
60+
while (!Thread.currentThread().isInterrupted()) {
61+
String message = messagesQueue.poll(Long.MAX_VALUE, TimeUnit.DAYS);
62+
System.out.println("Process message {" + message + "}");
63+
Thread.sleep(500); // emulating processing
64+
}
65+
} catch (InterruptedException e) {
66+
throw new RuntimeException(e);
67+
}
68+
});
69+
70+
workerThread.start();
71+
72+
CloseableChannel server =
73+
RSocketServer.create(
74+
(setup, sendingSocket) ->
75+
Mono.just(
76+
new RSocket() {
77+
@Override
78+
public Mono<Void> fireAndForget(Payload payload) {
79+
// add element. if overflows errors and terminates execution
80+
// specifically to show that lease can limit rate of fnf requests in
81+
// that example
82+
try {
83+
messagesQueue.add(payload.getDataUtf8());
84+
} catch (IllegalStateException e) {
85+
e.printStackTrace();
86+
sendingSocket.dispose();
87+
workerThread.interrupt();
88+
throw e;
89+
}
90+
payload.release();
91+
return Mono.empty();
92+
}
93+
}))
94+
.lease(() -> Leases.create().sender(new LeaseCalculator(SERVER_TAG, messagesQueue)))
95+
.bindNow(TcpServerTransport.create("localhost", 7000));
96+
97+
LeaseReceiver receiver = new LeaseReceiver(CLIENT_TAG);
98+
RSocket clientRSocket =
99+
RSocketConnector.create()
100+
.lease(() -> Leases.create().receiver(receiver))
101+
.connect(TcpClientTransport.create(server.address()))
102+
.block();
103+
104+
Objects.requireNonNull(clientRSocket);
105+
106+
// generate stream of fnfs
107+
Flux.generate(
108+
() -> 0L,
109+
(state, sink) -> {
110+
sink.next(state);
111+
return state + 1;
112+
})
113+
// here we wait for the first lease for the responder side and start execution
114+
// on if there is allowance
115+
.delaySubscription(receiver.notifyWhenNewLease().then())
116+
.concatMap(
117+
tick -> {
118+
System.out.println("Sending " + tick);
119+
return Mono.defer(() -> clientRSocket.fireAndForget(ByteBufPayload.create("" + tick)))
120+
.retryWhen(
121+
Retry.indefinitely()
122+
// ensures that error is the result of missed lease
123+
.filter(t -> t instanceof MissingLeaseException)
124+
.doBeforeRetryAsync(
125+
rs -> {
126+
// here we create a mechanism to delay the retry until
127+
// the new lease allowance comes in.
128+
System.out.println("Ran out of leases " + rs);
129+
return receiver.notifyWhenNewLease().then();
130+
}));
131+
})
132+
.blockLast();
133+
134+
clientRSocket.onClose().block();
135+
server.dispose();
136+
}
137+
138+
/**
139+
* This is a class responsible for making decision on whether Responder is ready to receive new
140+
* FireAndForget or not base in the number of messages enqueued. <br>
141+
* In the nutshell this is responder-side rate-limiter logic which is created for every new
142+
* connection.<br>
143+
* In real-world projects this class has to issue leases based on real metrics
144+
*/
145+
private static class LeaseCalculator implements Function<Optional<LeaseStats>, Flux<Lease>> {
146+
final String tag;
147+
final BlockingQueue<?> queue;
148+
149+
public LeaseCalculator(String tag, BlockingQueue<?> queue) {
150+
this.tag = tag;
151+
this.queue = queue;
152+
}
153+
154+
@Override
155+
public Flux<Lease> apply(Optional<LeaseStats> leaseStats) {
156+
System.out.println(
157+
String.format("%s stats are %s", tag, leaseStats.isPresent() ? "present" : "absent"));
158+
Duration ttlDuration = Duration.ofSeconds(5);
159+
// The interval function is used only for the demo purpose and should not be
160+
// considered as the way to issue leases.
161+
return Flux.interval(Duration.ZERO, ttlDuration.dividedBy(2))
162+
.handle(
163+
(__, sink) -> {
164+
// put queue.remainingCapacity() + 1 here if you want to observe that app is
165+
// terminated because of the queue overflowing
166+
int requests = queue.remainingCapacity();
167+
168+
// reissue new lease only if queue has remaining capacity to
169+
// accept more requests
170+
if (requests > 0) {
171+
long ttl = ttlDuration.toMillis();
172+
sink.next(Lease.create((int) ttl, requests));
173+
}
174+
});
175+
}
176+
}
177+
178+
/**
179+
* Requester-side Lease listener.<br>
180+
* In the nutshell this class implements mechanism to listen (and do appropriate actions as
181+
* needed) to incoming leases issued by the Responder
182+
*/
183+
private static class LeaseReceiver implements Consumer<Flux<Lease>> {
184+
final String tag;
185+
final ReplayProcessor<Lease> lastLeaseReplay = ReplayProcessor.cacheLast();
186+
187+
public LeaseReceiver(String tag) {
188+
this.tag = tag;
189+
}
190+
191+
@Override
192+
public void accept(Flux<Lease> receivedLeases) {
193+
receivedLeases.subscribe(
194+
l -> {
195+
System.out.println(
196+
String.format(
197+
"%s received leases - ttl: %d, requests: %d",
198+
tag, l.getTimeToLiveMillis(), l.getAllowedRequests()));
199+
lastLeaseReplay.onNext(l);
200+
});
201+
}
202+
203+
/**
204+
* This method allows to listen to new incoming leases and delay some action (e.g . retry) until
205+
* new valid lease has come in
206+
*/
207+
public Mono<Lease> notifyWhenNewLease() {
208+
return lastLeaseReplay.filter(l -> l.isValid()).next();
209+
}
210+
}
211+
}

0 commit comments

Comments
 (0)