Skip to content

Server and Client event publishing #217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* An implementation of {@code ReactiveSocketClient} that operates on a cluster of target servers instead of a single
* server.
*/
public class LoadBalancingClient implements ReactiveSocketClient {
public class LoadBalancingClient extends AbstractReactiveSocketClient {

private final LoadBalancerInitializer initializer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.reactivesocket.client.filter;

import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.AbstractReactiveSocketClient;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.stat.Ewma;
Expand All @@ -34,7 +35,7 @@
* lot of them when sending messages, we will still decrease the availability of the child
* reducing the probability of connecting to it.
*/
public class FailureAwareClient implements ReactiveSocketClient {
public class FailureAwareClient extends AbstractReactiveSocketClient {

private static final double EPSILON = 1e-4;

Expand All @@ -44,6 +45,7 @@ public class FailureAwareClient implements ReactiveSocketClient {
private final Ewma errorPercentage;

public FailureAwareClient(ReactiveSocketClient delegate, long halfLife, TimeUnit unit) {
super(delegate);
this.delegate = delegate;
this.tau = Clock.unit().convert((long)(halfLife / Math.log(2)), unit);
this.stamp = Clock.now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.reactivesocket.client.filter;

import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.AbstractReactiveSocketClient;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.Scheduler;
Expand Down Expand Up @@ -47,7 +48,7 @@ private ReactiveSocketClients() {
*/
public static ReactiveSocketClient connectTimeout(ReactiveSocketClient orig, long timeout, TimeUnit unit,
Scheduler scheduler) {
return new ReactiveSocketClient() {
return new AbstractReactiveSocketClient(orig) {
@Override
public Publisher<? extends ReactiveSocket> connect() {
return Px.from(orig.connect()).timeout(timeout, unit, scheduler);
Expand Down Expand Up @@ -82,7 +83,7 @@ public static ReactiveSocketClient detectFailures(ReactiveSocketClient orig) {
* @return A new client wrapping the original.
*/
public static ReactiveSocketClient wrap(ReactiveSocketClient orig, Function<ReactiveSocket, ReactiveSocket> mapper) {
return new ReactiveSocketClient() {
return new AbstractReactiveSocketClient(orig) {
@Override
public Publisher<? extends ReactiveSocket> connect() {
return Px.from(orig.connect()).map(mapper::apply);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void testReactiveSocket(BiConsumer<CountDownLatch, ReactiveSocket> f) th
throw new RuntimeException();
}
});
ReactiveSocketClient factory = new ReactiveSocketClient() {
ReactiveSocketClient factory = new AbstractReactiveSocketClient() {
@Override
public Publisher<ReactiveSocket> connect() {
return subscriber -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void onComplete() {
}

private static ReactiveSocketClient succeedingFactory(ReactiveSocket socket) {
return new ReactiveSocketClient() {
return new AbstractReactiveSocketClient() {
@Override
public Publisher<ReactiveSocket> connect() {
return s -> s.onNext(socket);
Expand All @@ -149,7 +149,7 @@ public double availability() {
}

private static ReactiveSocketClient failingClient(SocketAddress sa) {
return new ReactiveSocketClient() {
return new AbstractReactiveSocketClient() {
@Override
public Publisher<ReactiveSocket> connect() {
Assert.fail();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@
package io.reactivesocket;

import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.events.EventListener;
import io.reactivesocket.events.EventListener.RequestType;
import io.reactivesocket.events.EventPublishingSocket;
import io.reactivesocket.events.EventPublishingSocketImpl;
import io.reactivesocket.exceptions.CancelException;
import io.reactivesocket.exceptions.Exceptions;
import io.reactivesocket.internal.DisabledEventPublisher;
import io.reactivesocket.internal.EventPublisher;
import io.reactivesocket.internal.KnownErrorFilter;
import io.reactivesocket.internal.RemoteReceiver;
import io.reactivesocket.internal.RemoteSender;
Expand All @@ -39,6 +45,7 @@
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;

import static io.reactivesocket.events.EventListener.RequestType.*;
import static io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers.*;

/**
Expand All @@ -51,6 +58,7 @@ public class ClientReactiveSocket implements ReactiveSocket {
private final Consumer<Throwable> errorConsumer;
private final StreamIdSupplier streamIdSupplier;
private final KeepAliveProvider keepAliveProvider;
private final EventPublishingSocket eventPublishingSocket;

private final Int2ObjectHashMap<Subscription> senders;
private final Int2ObjectHashMap<Subscriber<Frame>> receivers;
Expand All @@ -60,18 +68,26 @@ public class ClientReactiveSocket implements ReactiveSocket {
private volatile Consumer<Lease> leaseConsumer; // Provided on start()

public ClientReactiveSocket(DuplexConnection connection, Consumer<Throwable> errorConsumer,
StreamIdSupplier streamIdSupplier, KeepAliveProvider keepAliveProvider) {
StreamIdSupplier streamIdSupplier, KeepAliveProvider keepAliveProvider,
EventPublisher<? extends EventListener> publisher) {
this.connection = connection;
this.errorConsumer = new KnownErrorFilter(errorConsumer);
this.streamIdSupplier = streamIdSupplier;
this.keepAliveProvider = keepAliveProvider;
eventPublishingSocket = publisher.isEventPublishingEnabled()? new EventPublishingSocketImpl(publisher, true)
: EventPublishingSocket.DISABLED;
senders = new Int2ObjectHashMap<>(256, 0.9f);
receivers = new Int2ObjectHashMap<>(256, 0.9f);
connection.onClose().subscribe(Subscribers.cleanup(() -> {
cleanup();
}));
}

public ClientReactiveSocket(DuplexConnection connection, Consumer<Throwable> errorConsumer,
StreamIdSupplier streamIdSupplier, KeepAliveProvider keepAliveProvider) {
this(connection, errorConsumer, streamIdSupplier, keepAliveProvider, new DisabledEventPublisher<>());
}

@Override
public Publisher<Void> fireAndForget(Payload payload) {
return Px.defer(() -> {
Expand Down Expand Up @@ -143,15 +159,17 @@ private Publisher<Payload> handleRequestResponse(final Payload payload) {
Subscriber<Frame> fs = raw;
receivers.put(streamId, fs);
}
Px.concatEmpty(connection.sendOne(requestFrame), Px.never())
.cast(Payload.class)
.doOnCancel(() -> {
Publisher<Void> send = eventPublishingSocket.decorateSend(streamId, connection.sendOne(requestFrame), 0,
RequestResponse);
eventPublishingSocket.decorateReceive(streamId, Px.concatEmpty(send, Px.never())
.cast(Payload.class)
.doOnCancel(() -> {
if (connection.availability() > 0.0) {
connection.sendOne(Frame.Cancel.from(streamId))
.subscribe(DefaultSubscriber.defaultInstance());
}
})
.subscribe(subscriber);
removeReceiver(streamId);
}), RequestResponse).subscribe(subscriber);
});
}

Expand All @@ -170,14 +188,15 @@ private Publisher<Payload> handleStreamResponse(Px<Payload> request, FrameType r
}, requestN -> {
transportReceiveSubscription.request(requestN);
});
connection.send(sender).subscribe(sendSub);
eventPublishingSocket.decorateSend(streamId, connection.send(sender), 0,
fromFrameType(requestType)).subscribe(sendSub);
s.onSubscribe(sub);
};

RemoteReceiver receiver = new RemoteReceiver(src, connection, streamId, removeReceiverLambda(streamId),
true);
registerSenderReceiver(streamId, sender, receiver);
return receiver;
return eventPublishingSocket.decorateReceive(streamId, receiver, fromFrameType(requestType));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,31 @@
import io.reactivesocket.Frame.Lease;
import io.reactivesocket.Frame.Request;
import io.reactivesocket.Frame.Response;
import io.reactivesocket.events.EventListener;
import io.reactivesocket.events.EventListener.RequestType;
import io.reactivesocket.events.EventPublishingSocket;
import io.reactivesocket.events.EventPublishingSocketImpl;
import io.reactivesocket.exceptions.ApplicationException;
import io.reactivesocket.exceptions.SetupException;
import io.reactivesocket.frame.FrameHeaderFlyweight;
import io.reactivesocket.internal.DisabledEventPublisher;
import io.reactivesocket.internal.EventPublisher;
import io.reactivesocket.internal.KnownErrorFilter;
import io.reactivesocket.internal.RemoteReceiver;
import io.reactivesocket.internal.RemoteSender;
import io.reactivesocket.lease.LeaseEnforcingSocket;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
import io.reactivesocket.util.Clock;
import org.agrona.collections.Int2ObjectHashMap;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

import java.util.Collection;
import java.util.function.Consumer;

import static io.reactivesocket.events.EventListener.RequestType.*;

/**
* Server side ReactiveSocket. Receives {@link Frame}s from a
* {@link ClientReactiveSocket}
Expand All @@ -44,20 +53,28 @@ public class ServerReactiveSocket implements ReactiveSocket {
private final DuplexConnection connection;
private final Publisher<Frame> serverInput;
private final Consumer<Throwable> errorConsumer;
private final EventPublisher<? extends EventListener> eventPublisher;

private final Int2ObjectHashMap<Subscription> subscriptions;
private final Int2ObjectHashMap<RemoteReceiver> channelProcessors;

private final ReactiveSocket requestHandler;
private Subscription receiversSubscription;
private final EventPublishingSocket eventPublishingSocket;

public ServerReactiveSocket(DuplexConnection connection, ReactiveSocket requestHandler,
boolean clientHonorsLease, Consumer<Throwable> errorConsumer) {
boolean clientHonorsLease, Consumer<Throwable> errorConsumer,
EventPublisher<? extends EventListener> eventPublisher) {
this.requestHandler = requestHandler;
this.connection = connection;
serverInput = connection.receive();
this.errorConsumer = new KnownErrorFilter(errorConsumer);
this.eventPublisher = eventPublisher;
subscriptions = new Int2ObjectHashMap<>();
channelProcessors = new Int2ObjectHashMap<>();
eventPublishingSocket = eventPublisher.isEventPublishingEnabled()?
new EventPublishingSocketImpl(eventPublisher, false) : EventPublishingSocket.DISABLED;

Px.from(connection.onClose()).subscribe(Subscribers.cleanup(() -> {
cleanup();
}));
Expand All @@ -74,6 +91,10 @@ public ServerReactiveSocket(DuplexConnection connection, ReactiveSocket requestH
});
}
}
public ServerReactiveSocket(DuplexConnection connection, ReactiveSocket requestHandler,
boolean clientHonorsLease, Consumer<Throwable> errorConsumer) {
this(connection, requestHandler, clientHonorsLease, errorConsumer, new DisabledEventPublisher<>());
}

public ServerReactiveSocket(DuplexConnection connection, ReactiveSocket requestHandler,
Consumer<Throwable> errorConsumer) {
Expand Down Expand Up @@ -165,19 +186,19 @@ private Publisher<Void> handleFrame(Frame frame) {
case SETUP:
return Px.error(new IllegalStateException("Setup frame received post setup."));
case REQUEST_RESPONSE:
return handleReceive(streamId, requestResponse(frame));
return handleRequestResponse(streamId, requestResponse(frame));
case CANCEL:
return handleCancelFrame(streamId);
case KEEPALIVE:
return handleKeepAliveFrame(frame);
case REQUEST_N:
return handleRequestN(streamId, frame);
case REQUEST_STREAM:
return doReceive(streamId, requestStream(frame));
return doReceive(streamId, requestStream(frame), RequestStream);
case FIRE_AND_FORGET:
return handleFireAndForget(streamId, fireAndForget(frame));
case REQUEST_SUBSCRIPTION:
return doReceive(streamId, requestSubscription(frame));
return doReceive(streamId, requestSubscription(frame), RequestStream);
case REQUEST_CHANNEL:
return handleChannel(streamId, frame);
case RESPONSE:
Expand Down Expand Up @@ -251,13 +272,14 @@ private synchronized void cleanup() {
requestHandler.close().subscribe(Subscribers.empty());
}

private Publisher<Void> handleReceive(int streamId, Publisher<Payload> response) {
private Publisher<Void> handleRequestResponse(int streamId, Publisher<Payload> response) {
final Runnable cleanup = () -> {
synchronized (this) {
subscriptions.remove(streamId);
}

};
long now = publishSingleFrameReceiveEvents(streamId, RequestResponse);

Px<Frame> frames =
Px
Expand All @@ -282,26 +304,29 @@ private Publisher<Void> handleReceive(int streamId, Publisher<Payload> response)
return Frame.Error.from(streamId, throwable);
});

return Px.from(connection.send(frames));
return Px.from(eventPublishingSocket.decorateSend(streamId, connection.send(frames), now, RequestResponse));

}

private Publisher<Void> doReceive(int streamId, Publisher<Payload> response) {
private Publisher<Void> doReceive(int streamId, Publisher<Payload> response, RequestType requestType) {
long now = publishSingleFrameReceiveEvents(streamId, requestType);
Px<Frame> resp = Px.from(response)
.map(payload -> Response.from(streamId, FrameType.RESPONSE, payload));
RemoteSender sender = new RemoteSender(resp, () -> subscriptions.remove(streamId), streamId, 2);
subscriptions.put(streamId, sender);
return connection.send(sender);
return eventPublishingSocket.decorateSend(streamId, connection.send(sender), now, requestType);
}

private Publisher<Void> handleChannel(int streamId, Frame firstFrame) {
long now = publishSingleFrameReceiveEvents(streamId, RequestChannel);
int initialRequestN = Request.initialRequestN(firstFrame);
Frame firstAsNext = Request.from(streamId, FrameType.NEXT, firstFrame, initialRequestN);
RemoteReceiver receiver = new RemoteReceiver(connection, streamId, () -> removeChannelProcessor(streamId),
firstAsNext, receiversSubscription, true);
channelProcessors.put(streamId, receiver);

Px<Frame> response = Px.from(requestChannel(receiver))
Px<Frame> response = Px.from(requestChannel(eventPublishingSocket.decorateReceive(streamId, receiver,
RequestChannel)))
.map(payload -> Response.from(streamId, FrameType.RESPONSE, payload));

RemoteSender sender = new RemoteSender(response, () -> removeSubscriptions(streamId), streamId,
Expand All @@ -310,7 +335,7 @@ private Publisher<Void> handleChannel(int streamId, Frame firstFrame) {
subscriptions.put(streamId, sender);
}

return connection.send(sender);
return eventPublishingSocket.decorateSend(streamId, connection.send(sender), now, RequestChannel);
}

private Publisher<Void> handleFireAndForget(int streamId, Publisher<Void> result) {
Expand Down Expand Up @@ -368,4 +393,14 @@ private synchronized void addSubscription(int streamId, Subscription subscriptio
private synchronized void removeSubscription(int streamId) {
subscriptions.remove(streamId);
}

private long publishSingleFrameReceiveEvents(int streamId, RequestType requestType) {
long now = Clock.now();
if (eventPublisher.isEventPublishingEnabled()) {
EventListener eventListener = eventPublisher.getEventListener();
eventListener.requestReceiveStart(streamId, requestType);
eventListener.requestReceiveComplete(streamId, requestType, Clock.elapsedSince(now), Clock.unit());
}
return now;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2016 Netflix, Inc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2017 ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy new year :)

Looks like a copy-paste otherwise my template has year parameterized.

* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.reactivesocket.client;

import io.reactivesocket.events.AbstractEventSource;
import io.reactivesocket.events.ClientEventListener;
import io.reactivesocket.events.EventSource;

public abstract class AbstractReactiveSocketClient extends AbstractEventSource<ClientEventListener>
implements ReactiveSocketClient{

protected AbstractReactiveSocketClient() {
}

protected AbstractReactiveSocketClient(EventSource<ClientEventListener> delegate) {
super(delegate);
}
}
Loading