Skip to content

Commit 105f6c7

Browse files
authored
Server and Client event publishing (#217)
Client and Server event publishing __Problem__ No events are published for `ReactiveSocketClient` and `ReactiveSocketServer` __ Modification__ Added event publishing for `ReactiveSocketClient` and `ReactiveSocketServer` __Result__ Events for clients and server.
1 parent 82e45b6 commit 105f6c7

32 files changed

+1391
-120
lines changed

reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancingClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* An implementation of {@code ReactiveSocketClient} that operates on a cluster of target servers instead of a single
3232
* server.
3333
*/
34-
public class LoadBalancingClient implements ReactiveSocketClient {
34+
public class LoadBalancingClient extends AbstractReactiveSocketClient {
3535

3636
private final LoadBalancerInitializer initializer;
3737

reactivesocket-client/src/main/java/io/reactivesocket/client/filter/FailureAwareClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.reactivesocket.client.filter;
1717

1818
import io.reactivesocket.ReactiveSocket;
19+
import io.reactivesocket.client.AbstractReactiveSocketClient;
1920
import io.reactivesocket.client.ReactiveSocketClient;
2021
import io.reactivesocket.reactivestreams.extensions.Px;
2122
import io.reactivesocket.stat.Ewma;
@@ -34,7 +35,7 @@
3435
* lot of them when sending messages, we will still decrease the availability of the child
3536
* reducing the probability of connecting to it.
3637
*/
37-
public class FailureAwareClient implements ReactiveSocketClient {
38+
public class FailureAwareClient extends AbstractReactiveSocketClient {
3839

3940
private static final double EPSILON = 1e-4;
4041

@@ -44,6 +45,7 @@ public class FailureAwareClient implements ReactiveSocketClient {
4445
private final Ewma errorPercentage;
4546

4647
public FailureAwareClient(ReactiveSocketClient delegate, long halfLife, TimeUnit unit) {
48+
super(delegate);
4749
this.delegate = delegate;
4850
this.tau = Clock.unit().convert((long)(halfLife / Math.log(2)), unit);
4951
this.stamp = Clock.now();

reactivesocket-client/src/main/java/io/reactivesocket/client/filter/ReactiveSocketClients.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.reactivesocket.client.filter;
1818

1919
import io.reactivesocket.ReactiveSocket;
20+
import io.reactivesocket.client.AbstractReactiveSocketClient;
2021
import io.reactivesocket.client.ReactiveSocketClient;
2122
import io.reactivesocket.reactivestreams.extensions.Px;
2223
import io.reactivesocket.reactivestreams.extensions.Scheduler;
@@ -47,7 +48,7 @@ private ReactiveSocketClients() {
4748
*/
4849
public static ReactiveSocketClient connectTimeout(ReactiveSocketClient orig, long timeout, TimeUnit unit,
4950
Scheduler scheduler) {
50-
return new ReactiveSocketClient() {
51+
return new AbstractReactiveSocketClient(orig) {
5152
@Override
5253
public Publisher<? extends ReactiveSocket> connect() {
5354
return Px.from(orig.connect()).timeout(timeout, unit, scheduler);
@@ -82,7 +83,7 @@ public static ReactiveSocketClient detectFailures(ReactiveSocketClient orig) {
8283
* @return A new client wrapping the original.
8384
*/
8485
public static ReactiveSocketClient wrap(ReactiveSocketClient orig, Function<ReactiveSocket, ReactiveSocket> mapper) {
85-
return new ReactiveSocketClient() {
86+
return new AbstractReactiveSocketClient(orig) {
8687
@Override
8788
public Publisher<? extends ReactiveSocket> connect() {
8889
return Px.from(orig.connect()).map(mapper::apply);

reactivesocket-client/src/test/java/io/reactivesocket/client/FailureReactiveSocketTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ private void testReactiveSocket(BiConsumer<CountDownLatch, ReactiveSocket> f) th
114114
throw new RuntimeException();
115115
}
116116
});
117-
ReactiveSocketClient factory = new ReactiveSocketClient() {
117+
ReactiveSocketClient factory = new AbstractReactiveSocketClient() {
118118
@Override
119119
public Publisher<ReactiveSocket> connect() {
120120
return subscriber -> {

reactivesocket-client/src/test/java/io/reactivesocket/client/LoadBalancerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public void onComplete() {
134134
}
135135

136136
private static ReactiveSocketClient succeedingFactory(ReactiveSocket socket) {
137-
return new ReactiveSocketClient() {
137+
return new AbstractReactiveSocketClient() {
138138
@Override
139139
public Publisher<ReactiveSocket> connect() {
140140
return s -> s.onNext(socket);
@@ -149,7 +149,7 @@ public double availability() {
149149
}
150150

151151
private static ReactiveSocketClient failingClient(SocketAddress sa) {
152-
return new ReactiveSocketClient() {
152+
return new AbstractReactiveSocketClient() {
153153
@Override
154154
public Publisher<ReactiveSocket> connect() {
155155
Assert.fail();

reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,14 @@
1717
package io.reactivesocket;
1818

1919
import io.reactivesocket.client.KeepAliveProvider;
20+
import io.reactivesocket.events.EventListener;
21+
import io.reactivesocket.events.EventListener.RequestType;
22+
import io.reactivesocket.events.EventPublishingSocket;
23+
import io.reactivesocket.events.EventPublishingSocketImpl;
2024
import io.reactivesocket.exceptions.CancelException;
2125
import io.reactivesocket.exceptions.Exceptions;
26+
import io.reactivesocket.internal.DisabledEventPublisher;
27+
import io.reactivesocket.internal.EventPublisher;
2228
import io.reactivesocket.internal.KnownErrorFilter;
2329
import io.reactivesocket.internal.RemoteReceiver;
2430
import io.reactivesocket.internal.RemoteSender;
@@ -39,6 +45,7 @@
3945
import java.nio.charset.StandardCharsets;
4046
import java.util.function.Consumer;
4147

48+
import static io.reactivesocket.events.EventListener.RequestType.*;
4249
import static io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers.*;
4350

4451
/**
@@ -51,6 +58,7 @@ public class ClientReactiveSocket implements ReactiveSocket {
5158
private final Consumer<Throwable> errorConsumer;
5259
private final StreamIdSupplier streamIdSupplier;
5360
private final KeepAliveProvider keepAliveProvider;
61+
private final EventPublishingSocket eventPublishingSocket;
5462

5563
private final Int2ObjectHashMap<Subscription> senders;
5664
private final Int2ObjectHashMap<Subscriber<Frame>> receivers;
@@ -60,18 +68,26 @@ public class ClientReactiveSocket implements ReactiveSocket {
6068
private volatile Consumer<Lease> leaseConsumer; // Provided on start()
6169

6270
public ClientReactiveSocket(DuplexConnection connection, Consumer<Throwable> errorConsumer,
63-
StreamIdSupplier streamIdSupplier, KeepAliveProvider keepAliveProvider) {
71+
StreamIdSupplier streamIdSupplier, KeepAliveProvider keepAliveProvider,
72+
EventPublisher<? extends EventListener> publisher) {
6473
this.connection = connection;
6574
this.errorConsumer = new KnownErrorFilter(errorConsumer);
6675
this.streamIdSupplier = streamIdSupplier;
6776
this.keepAliveProvider = keepAliveProvider;
77+
eventPublishingSocket = publisher.isEventPublishingEnabled()? new EventPublishingSocketImpl(publisher, true)
78+
: EventPublishingSocket.DISABLED;
6879
senders = new Int2ObjectHashMap<>(256, 0.9f);
6980
receivers = new Int2ObjectHashMap<>(256, 0.9f);
7081
connection.onClose().subscribe(Subscribers.cleanup(() -> {
7182
cleanup();
7283
}));
7384
}
7485

86+
public ClientReactiveSocket(DuplexConnection connection, Consumer<Throwable> errorConsumer,
87+
StreamIdSupplier streamIdSupplier, KeepAliveProvider keepAliveProvider) {
88+
this(connection, errorConsumer, streamIdSupplier, keepAliveProvider, new DisabledEventPublisher<>());
89+
}
90+
7591
@Override
7692
public Publisher<Void> fireAndForget(Payload payload) {
7793
return Px.defer(() -> {
@@ -143,15 +159,17 @@ private Publisher<Payload> handleRequestResponse(final Payload payload) {
143159
Subscriber<Frame> fs = raw;
144160
receivers.put(streamId, fs);
145161
}
146-
Px.concatEmpty(connection.sendOne(requestFrame), Px.never())
147-
.cast(Payload.class)
148-
.doOnCancel(() -> {
162+
Publisher<Void> send = eventPublishingSocket.decorateSend(streamId, connection.sendOne(requestFrame), 0,
163+
RequestResponse);
164+
eventPublishingSocket.decorateReceive(streamId, Px.concatEmpty(send, Px.never())
165+
.cast(Payload.class)
166+
.doOnCancel(() -> {
149167
if (connection.availability() > 0.0) {
150168
connection.sendOne(Frame.Cancel.from(streamId))
151169
.subscribe(DefaultSubscriber.defaultInstance());
152170
}
153-
})
154-
.subscribe(subscriber);
171+
removeReceiver(streamId);
172+
}), RequestResponse).subscribe(subscriber);
155173
});
156174
}
157175

@@ -170,14 +188,15 @@ private Publisher<Payload> handleStreamResponse(Px<Payload> request, FrameType r
170188
}, requestN -> {
171189
transportReceiveSubscription.request(requestN);
172190
});
173-
connection.send(sender).subscribe(sendSub);
191+
eventPublishingSocket.decorateSend(streamId, connection.send(sender), 0,
192+
fromFrameType(requestType)).subscribe(sendSub);
174193
s.onSubscribe(sub);
175194
};
176195

177196
RemoteReceiver receiver = new RemoteReceiver(src, connection, streamId, removeReceiverLambda(streamId),
178197
true);
179198
registerSenderReceiver(streamId, sender, receiver);
180-
return receiver;
199+
return eventPublishingSocket.decorateReceive(streamId, receiver, fromFrameType(requestType));
181200
});
182201
}
183202

reactivesocket-core/src/main/java/io/reactivesocket/ServerReactiveSocket.java

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,31 @@
1919
import io.reactivesocket.Frame.Lease;
2020
import io.reactivesocket.Frame.Request;
2121
import io.reactivesocket.Frame.Response;
22+
import io.reactivesocket.events.EventListener;
23+
import io.reactivesocket.events.EventListener.RequestType;
24+
import io.reactivesocket.events.EventPublishingSocket;
25+
import io.reactivesocket.events.EventPublishingSocketImpl;
2226
import io.reactivesocket.exceptions.ApplicationException;
2327
import io.reactivesocket.exceptions.SetupException;
2428
import io.reactivesocket.frame.FrameHeaderFlyweight;
29+
import io.reactivesocket.internal.DisabledEventPublisher;
30+
import io.reactivesocket.internal.EventPublisher;
2531
import io.reactivesocket.internal.KnownErrorFilter;
2632
import io.reactivesocket.internal.RemoteReceiver;
2733
import io.reactivesocket.internal.RemoteSender;
2834
import io.reactivesocket.lease.LeaseEnforcingSocket;
2935
import io.reactivesocket.reactivestreams.extensions.Px;
3036
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
37+
import io.reactivesocket.util.Clock;
3138
import org.agrona.collections.Int2ObjectHashMap;
3239
import org.reactivestreams.Publisher;
3340
import org.reactivestreams.Subscription;
3441

3542
import java.util.Collection;
3643
import java.util.function.Consumer;
3744

45+
import static io.reactivesocket.events.EventListener.RequestType.*;
46+
3847
/**
3948
* Server side ReactiveSocket. Receives {@link Frame}s from a
4049
* {@link ClientReactiveSocket}
@@ -44,20 +53,28 @@ public class ServerReactiveSocket implements ReactiveSocket {
4453
private final DuplexConnection connection;
4554
private final Publisher<Frame> serverInput;
4655
private final Consumer<Throwable> errorConsumer;
56+
private final EventPublisher<? extends EventListener> eventPublisher;
4757

4858
private final Int2ObjectHashMap<Subscription> subscriptions;
4959
private final Int2ObjectHashMap<RemoteReceiver> channelProcessors;
5060

5161
private final ReactiveSocket requestHandler;
5262
private Subscription receiversSubscription;
63+
private final EventPublishingSocket eventPublishingSocket;
64+
5365
public ServerReactiveSocket(DuplexConnection connection, ReactiveSocket requestHandler,
54-
boolean clientHonorsLease, Consumer<Throwable> errorConsumer) {
66+
boolean clientHonorsLease, Consumer<Throwable> errorConsumer,
67+
EventPublisher<? extends EventListener> eventPublisher) {
5568
this.requestHandler = requestHandler;
5669
this.connection = connection;
5770
serverInput = connection.receive();
5871
this.errorConsumer = new KnownErrorFilter(errorConsumer);
72+
this.eventPublisher = eventPublisher;
5973
subscriptions = new Int2ObjectHashMap<>();
6074
channelProcessors = new Int2ObjectHashMap<>();
75+
eventPublishingSocket = eventPublisher.isEventPublishingEnabled()?
76+
new EventPublishingSocketImpl(eventPublisher, false) : EventPublishingSocket.DISABLED;
77+
6178
Px.from(connection.onClose()).subscribe(Subscribers.cleanup(() -> {
6279
cleanup();
6380
}));
@@ -74,6 +91,10 @@ public ServerReactiveSocket(DuplexConnection connection, ReactiveSocket requestH
7491
});
7592
}
7693
}
94+
public ServerReactiveSocket(DuplexConnection connection, ReactiveSocket requestHandler,
95+
boolean clientHonorsLease, Consumer<Throwable> errorConsumer) {
96+
this(connection, requestHandler, clientHonorsLease, errorConsumer, new DisabledEventPublisher<>());
97+
}
7798

7899
public ServerReactiveSocket(DuplexConnection connection, ReactiveSocket requestHandler,
79100
Consumer<Throwable> errorConsumer) {
@@ -165,19 +186,19 @@ private Publisher<Void> handleFrame(Frame frame) {
165186
case SETUP:
166187
return Px.error(new IllegalStateException("Setup frame received post setup."));
167188
case REQUEST_RESPONSE:
168-
return handleReceive(streamId, requestResponse(frame));
189+
return handleRequestResponse(streamId, requestResponse(frame));
169190
case CANCEL:
170191
return handleCancelFrame(streamId);
171192
case KEEPALIVE:
172193
return handleKeepAliveFrame(frame);
173194
case REQUEST_N:
174195
return handleRequestN(streamId, frame);
175196
case REQUEST_STREAM:
176-
return doReceive(streamId, requestStream(frame));
197+
return doReceive(streamId, requestStream(frame), RequestStream);
177198
case FIRE_AND_FORGET:
178199
return handleFireAndForget(streamId, fireAndForget(frame));
179200
case REQUEST_SUBSCRIPTION:
180-
return doReceive(streamId, requestSubscription(frame));
201+
return doReceive(streamId, requestSubscription(frame), RequestStream);
181202
case REQUEST_CHANNEL:
182203
return handleChannel(streamId, frame);
183204
case RESPONSE:
@@ -251,13 +272,14 @@ private synchronized void cleanup() {
251272
requestHandler.close().subscribe(Subscribers.empty());
252273
}
253274

254-
private Publisher<Void> handleReceive(int streamId, Publisher<Payload> response) {
275+
private Publisher<Void> handleRequestResponse(int streamId, Publisher<Payload> response) {
255276
final Runnable cleanup = () -> {
256277
synchronized (this) {
257278
subscriptions.remove(streamId);
258279
}
259280

260281
};
282+
long now = publishSingleFrameReceiveEvents(streamId, RequestResponse);
261283

262284
Px<Frame> frames =
263285
Px
@@ -282,26 +304,29 @@ private Publisher<Void> handleReceive(int streamId, Publisher<Payload> response)
282304
return Frame.Error.from(streamId, throwable);
283305
});
284306

285-
return Px.from(connection.send(frames));
307+
return Px.from(eventPublishingSocket.decorateSend(streamId, connection.send(frames), now, RequestResponse));
286308

287309
}
288310

289-
private Publisher<Void> doReceive(int streamId, Publisher<Payload> response) {
311+
private Publisher<Void> doReceive(int streamId, Publisher<Payload> response, RequestType requestType) {
312+
long now = publishSingleFrameReceiveEvents(streamId, requestType);
290313
Px<Frame> resp = Px.from(response)
291314
.map(payload -> Response.from(streamId, FrameType.RESPONSE, payload));
292315
RemoteSender sender = new RemoteSender(resp, () -> subscriptions.remove(streamId), streamId, 2);
293316
subscriptions.put(streamId, sender);
294-
return connection.send(sender);
317+
return eventPublishingSocket.decorateSend(streamId, connection.send(sender), now, requestType);
295318
}
296319

297320
private Publisher<Void> handleChannel(int streamId, Frame firstFrame) {
321+
long now = publishSingleFrameReceiveEvents(streamId, RequestChannel);
298322
int initialRequestN = Request.initialRequestN(firstFrame);
299323
Frame firstAsNext = Request.from(streamId, FrameType.NEXT, firstFrame, initialRequestN);
300324
RemoteReceiver receiver = new RemoteReceiver(connection, streamId, () -> removeChannelProcessor(streamId),
301325
firstAsNext, receiversSubscription, true);
302326
channelProcessors.put(streamId, receiver);
303327

304-
Px<Frame> response = Px.from(requestChannel(receiver))
328+
Px<Frame> response = Px.from(requestChannel(eventPublishingSocket.decorateReceive(streamId, receiver,
329+
RequestChannel)))
305330
.map(payload -> Response.from(streamId, FrameType.RESPONSE, payload));
306331

307332
RemoteSender sender = new RemoteSender(response, () -> removeSubscriptions(streamId), streamId,
@@ -310,7 +335,7 @@ private Publisher<Void> handleChannel(int streamId, Frame firstFrame) {
310335
subscriptions.put(streamId, sender);
311336
}
312337

313-
return connection.send(sender);
338+
return eventPublishingSocket.decorateSend(streamId, connection.send(sender), now, RequestChannel);
314339
}
315340

316341
private Publisher<Void> handleFireAndForget(int streamId, Publisher<Void> result) {
@@ -368,4 +393,14 @@ private synchronized void addSubscription(int streamId, Subscription subscriptio
368393
private synchronized void removeSubscription(int streamId) {
369394
subscriptions.remove(streamId);
370395
}
396+
397+
private long publishSingleFrameReceiveEvents(int streamId, RequestType requestType) {
398+
long now = Clock.now();
399+
if (eventPublisher.isEventPublishingEnabled()) {
400+
EventListener eventListener = eventPublisher.getEventListener();
401+
eventListener.requestReceiveStart(streamId, requestType);
402+
eventListener.requestReceiveComplete(streamId, requestType, Clock.elapsedSince(now), Clock.unit());
403+
}
404+
return now;
405+
}
371406
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.client;
15+
16+
import io.reactivesocket.events.AbstractEventSource;
17+
import io.reactivesocket.events.ClientEventListener;
18+
import io.reactivesocket.events.EventSource;
19+
20+
public abstract class AbstractReactiveSocketClient extends AbstractEventSource<ClientEventListener>
21+
implements ReactiveSocketClient{
22+
23+
protected AbstractReactiveSocketClient() {
24+
}
25+
26+
protected AbstractReactiveSocketClient(EventSource<ClientEventListener> delegate) {
27+
super(delegate);
28+
}
29+
}

0 commit comments

Comments
 (0)