Skip to content

Commit 4b71165

Browse files
committed
provides well documented lease example
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent d8da87a commit 4b71165

File tree

1 file changed

+193
-0
lines changed

1 file changed

+193
-0
lines changed
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.examples.transport.tcp.lease;
18+
19+
import io.rsocket.RSocket;
20+
import io.rsocket.SocketAcceptor;
21+
import io.rsocket.core.RSocketConnector;
22+
import io.rsocket.core.RSocketServer;
23+
import io.rsocket.lease.Lease;
24+
import io.rsocket.lease.LeaseStats;
25+
import io.rsocket.lease.Leases;
26+
import io.rsocket.lease.MissingLeaseException;
27+
import io.rsocket.transport.netty.client.TcpClientTransport;
28+
import io.rsocket.transport.netty.server.CloseableChannel;
29+
import io.rsocket.transport.netty.server.TcpServerTransport;
30+
import io.rsocket.util.ByteBufPayload;
31+
import java.time.Duration;
32+
import java.util.Objects;
33+
import java.util.Optional;
34+
import java.util.Queue;
35+
import java.util.concurrent.ArrayBlockingQueue;
36+
import java.util.concurrent.BlockingQueue;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.function.Consumer;
39+
import java.util.function.Function;
40+
import reactor.core.publisher.Flux;
41+
import reactor.core.publisher.Mono;
42+
import reactor.core.publisher.ReplayProcessor;
43+
import reactor.util.retry.Retry;
44+
45+
public class RateLimitingWithLeaseExample {
46+
private static final String SERVER_TAG = "server";
47+
private static final String CLIENT_TAG = "client";
48+
49+
public static void main(String[] args) {
50+
// Queue for incoming messages represented as Flux
51+
// Imagine that every fireAndForget that is pushed is processed by a worker
52+
53+
BlockingQueue<String> messagesQueue = new ArrayBlockingQueue<>(50);
54+
55+
// emulating a worker that process data from the queue
56+
new Thread(
57+
() -> {
58+
try {
59+
String message = messagesQueue.poll(Long.MAX_VALUE, TimeUnit.DAYS);
60+
System.out.println("Process message {" + message + "}");
61+
Thread.sleep(500); // emulating processing
62+
} catch (InterruptedException e) {
63+
throw new RuntimeException(e);
64+
}
65+
})
66+
.start();
67+
68+
CloseableChannel server =
69+
RSocketServer.create(
70+
SocketAcceptor.forFireAndForget(
71+
payload -> {
72+
// add element. if overflows errors and terminates execution
73+
// specifically to show that lease can limit rate of fnf requests in
74+
// that example
75+
messagesQueue.add(payload.getDataUtf8());
76+
payload.release();
77+
return Mono.empty();
78+
}))
79+
.lease(() -> Leases.create().sender(new LeaseCalculator(SERVER_TAG, messagesQueue)))
80+
.bindNow(TcpServerTransport.create("localhost", 7000));
81+
82+
LeaseReceiver receiver = new LeaseReceiver(CLIENT_TAG);
83+
RSocket clientRSocket =
84+
RSocketConnector.create()
85+
.lease(() -> Leases.create().receiver(receiver))
86+
.connect(TcpClientTransport.create(server.address()))
87+
.block();
88+
89+
Objects.requireNonNull(clientRSocket);
90+
91+
// generate stream of fnfs
92+
Flux.generate(
93+
() -> 0L,
94+
(state, sink) -> {
95+
sink.next(state);
96+
return state + 1;
97+
})
98+
// here we wait for the first lease for the responder side and start execution
99+
// on if there is allowance
100+
.delaySubscription(receiver.notifyWhenNewLease().then())
101+
.concatMap(
102+
tick -> {
103+
System.out.println("Sending " + tick);
104+
return Mono.defer(() -> clientRSocket.fireAndForget(ByteBufPayload.create("" + tick)))
105+
.retryWhen(
106+
Retry.indefinitely()
107+
// ensures that error is the result of missed lease
108+
.filter(t -> t instanceof MissingLeaseException)
109+
.doBeforeRetryAsync(
110+
rs -> {
111+
// here we create a mechanism to delay the retry until
112+
// the new lease allowance comes in.
113+
System.out.println("Ran out of leases " + rs);
114+
return receiver.notifyWhenNewLease().then();
115+
}));
116+
})
117+
.blockLast();
118+
119+
clientRSocket.onClose().block();
120+
server.dispose();
121+
}
122+
123+
/**
124+
* This is a class responsible for making decision on whether Responder is ready to receive new
125+
* FireAndForget or not base in the number of messages enqueued. <br>
126+
* In the nutshell this is responder-side rate-limiter logic which is created for every new
127+
* connection.<br>
128+
* In real-world projects this class has to issue leases based on real metrics
129+
*/
130+
private static class LeaseCalculator implements Function<Optional<LeaseStats>, Flux<Lease>> {
131+
final String tag;
132+
final Queue<?> queue;
133+
134+
public LeaseCalculator(String tag, Queue<?> queue) {
135+
this.tag = tag;
136+
this.queue = queue;
137+
}
138+
139+
@Override
140+
public Flux<Lease> apply(Optional<LeaseStats> leaseStats) {
141+
System.out.println(
142+
String.format("%s stats are %s", tag, leaseStats.isPresent() ? "present" : "absent"));
143+
Duration ttlDuration = Duration.ofSeconds(5);
144+
// The interval function is used only for the demo purpose and should not be
145+
// considered as the way to issue leases.
146+
return Flux.interval(Duration.ZERO, ttlDuration.dividedBy(2))
147+
.handle(
148+
(__, sink) -> {
149+
int requests = 50 - queue.size();
150+
// reissue new lease only if queue has remaining capacity to
151+
// accept more requests
152+
if (requests > 0) {
153+
long ttl = ttlDuration.toMillis();
154+
sink.next(Lease.create((int) ttl, requests));
155+
}
156+
});
157+
}
158+
}
159+
160+
/**
161+
* Requester-side Lease listener.<br>
162+
* In the nutshell this class implements mechanism to listen (and do appropriate actions as
163+
* needed) to incoming leases issued by the Responder
164+
*/
165+
private static class LeaseReceiver implements Consumer<Flux<Lease>> {
166+
final String tag;
167+
final ReplayProcessor<Lease> lastLeaseReplay = ReplayProcessor.cacheLast();
168+
169+
public LeaseReceiver(String tag) {
170+
this.tag = tag;
171+
}
172+
173+
@Override
174+
public void accept(Flux<Lease> receivedLeases) {
175+
receivedLeases.subscribe(
176+
l -> {
177+
lastLeaseReplay.onNext(l);
178+
System.out.println(
179+
String.format(
180+
"%s received leases - ttl: %d, requests: %d",
181+
tag, l.getTimeToLiveMillis(), l.getAllowedRequests()));
182+
});
183+
}
184+
185+
/**
186+
* This method allows to listen to new incoming leases and delay some action (e.g . retry) until
187+
* new valid lease has come in
188+
*/
189+
public Mono<Lease> notifyWhenNewLease() {
190+
return lastLeaseReplay.filter(l -> l.isValid()).next();
191+
}
192+
}
193+
}

0 commit comments

Comments
 (0)