Skip to content

Feature/perf tunning 3 #726

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 57 additions & 52 deletions rsocket-core/src/main/java/io/rsocket/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,28 @@
import io.netty.util.collection.IntObjectMap;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.Exceptions;
import io.rsocket.frame.*;
import io.rsocket.frame.CancelFrameFlyweight;
import io.rsocket.frame.ErrorFrameFlyweight;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.MetadataPushFrameFlyweight;
import io.rsocket.frame.PayloadFrameFlyweight;
import io.rsocket.frame.RequestChannelFrameFlyweight;
import io.rsocket.frame.RequestFireAndForgetFrameFlyweight;
import io.rsocket.frame.RequestNFrameFlyweight;
import io.rsocket.frame.RequestResponseFrameFlyweight;
import io.rsocket.frame.RequestStreamFrameFlyweight;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.RateLimitableRequestPublisher;
import io.rsocket.internal.SynchronizedIntObjectHashMap;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.internal.UnicastMonoEmpty;
import io.rsocket.internal.UnicastMonoProcessor;
import io.rsocket.keepalive.KeepAliveFramesAcceptor;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.keepalive.KeepAliveSupport;
import io.rsocket.lease.RequesterLeaseHandler;
import io.rsocket.util.OnceConsumer;
import io.rsocket.util.MonoLifecycleHandler;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
Expand All @@ -46,8 +57,11 @@
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.*;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.concurrent.Queues;

/**
Expand Down Expand Up @@ -170,23 +184,19 @@ private Mono<Void> handleFireAndForget(Payload payload) {

final int streamId = streamIdSupplier.nextStreamId(receivers);

return emptyUnicastMono()
.doOnSubscribe(
new OnceConsumer<Subscription>() {
@Override
public void acceptOnce(@Nonnull Subscription subscription) {
ByteBuf requestFrame =
RequestFireAndForgetFrameFlyweight.encode(
allocator,
streamId,
false,
payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
payload.sliceData().retain());
payload.release();

sendProcessor.onNext(requestFrame);
}
});
return UnicastMonoEmpty.newInstance(
() -> {
ByteBuf requestFrame =
RequestFireAndForgetFrameFlyweight.encode(
allocator,
streamId,
false,
payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
payload.sliceData().retain());
payload.release();

sendProcessor.onNext(requestFrame);
});
}

private Mono<Payload> handleRequestResponse(final Payload payload) {
Expand All @@ -199,14 +209,11 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
int streamId = streamIdSupplier.nextStreamId(receivers);
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;

UnicastMonoProcessor<Payload> receiver = UnicastMonoProcessor.create();
receivers.put(streamId, receiver);

return receiver
.doOnSubscribe(
new OnceConsumer<Subscription>() {
UnicastMonoProcessor<Payload> receiver =
UnicastMonoProcessor.create(
new MonoLifecycleHandler<Payload>() {
@Override
public void acceptOnce(@Nonnull Subscription subscription) {
public void doOnSubscribe() {
final ByteBuf requestFrame =
RequestResponseFrameFlyweight.encode(
allocator,
Expand All @@ -218,15 +225,23 @@ public void acceptOnce(@Nonnull Subscription subscription) {

sendProcessor.onNext(requestFrame);
}
})
.doOnError(t -> sendProcessor.onNext(ErrorFrameFlyweight.encode(allocator, streamId, t)))
.doFinally(
s -> {
if (s == SignalType.CANCEL) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));

@Override
public void doOnTerminal(
@Nonnull SignalType signalType,
@Nullable Payload element,
@Nullable Throwable e) {
if (signalType == SignalType.ON_ERROR) {
sendProcessor.onNext(ErrorFrameFlyweight.encode(allocator, streamId, e));
} else if (signalType == SignalType.CANCEL) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
}
removeStreamReceiver(streamId);
}
removeStreamReceiver(streamId);
});
receivers.put(streamId, receiver);

return receiver;
}

private Flux<Payload> handleRequestStream(final Payload payload) {
Expand Down Expand Up @@ -390,24 +405,14 @@ private Mono<Void> handleMetadataPush(Payload payload) {
return Mono.error(err);
}

return emptyUnicastMono()
.doOnSubscribe(
new OnceConsumer<Subscription>() {
@Override
public void acceptOnce(@Nonnull Subscription subscription) {
ByteBuf metadataPushFrame =
MetadataPushFrameFlyweight.encode(allocator, payload.sliceMetadata().retain());
payload.release();

sendProcessor.onNext(metadataPushFrame);
}
});
}
return UnicastMonoEmpty.newInstance(
() -> {
ByteBuf metadataPushFrame =
MetadataPushFrameFlyweight.encode(allocator, payload.sliceMetadata().retain());
payload.release();

private static UnicastMonoProcessor<Void> emptyUnicastMono() {
UnicastMonoProcessor<Void> result = UnicastMonoProcessor.create();
result.onComplete();
return result;
sendProcessor.onNext(metadataPushFrame);
});
}

private Throwable checkAvailable() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.rsocket.internal;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.annotation.Nullable;

/**
* Represents an empty publisher which only calls onSubscribe and onComplete.
*
* <p>This Publisher is effectively stateless and only a single instance exists. Use the {@link
* #instance()} method to obtain a properly type-parametrized view of it.
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
public final class UnicastMonoEmpty extends Mono<Object> implements Scannable {

final Runnable onSubscribe;

volatile int once;

@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<UnicastMonoEmpty> ONCE =
AtomicIntegerFieldUpdater.newUpdater(UnicastMonoEmpty.class, "once");

UnicastMonoEmpty(Runnable onSubscribe) {
this.onSubscribe = onSubscribe;
}

@Override
public void subscribe(CoreSubscriber<? super Object> actual) {
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
onSubscribe.run();
Operators.complete(actual);
} else {
Operators.error(
actual, new IllegalStateException("UnicastMonoEmpty allows only a single Subscriber"));
}
}

/**
* Returns a properly parametrized instance of this empty Publisher.
*
* @param <T> the output type
* @return a properly parametrized instance of this empty Publisher
*/
@SuppressWarnings("unchecked")
public static <T> Mono<T> newInstance(Runnable onSubscribe) {
return (Mono<T>) new UnicastMonoEmpty(onSubscribe);
}

@Override
@Nullable
public Object block(Duration m) {
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
onSubscribe.run();
return null;
} else {
throw new IllegalStateException("UnicastMonoEmpty allows only a single Subscriber");
}
}

@Override
@Nullable
public Object block() {
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
onSubscribe.run();
return null;
} else {
throw new IllegalStateException("UnicastMonoEmpty allows only a single Subscriber");
}
}

@Override
public Object scanUnsafe(Attr key) {
return null; // no particular key to be represented, still useful in hooks
}

@Override
public String stepName() {
return "source(UnicastMonoEmpty)";
}
}
Loading