Skip to content

Client to Sever Streaming Java Edition #8222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.microsoft.signalr;

import java.lang.reflect.Array;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -49,6 +50,7 @@ public class HubConnection {
private long tickRate = 1000;
private CompletableSubject handshakeResponseSubject;
private long handshakeResponseTimeout = 15*1000;
private Map<String, Observable> streamMap = new ConcurrentHashMap<>();
private TransportEnum transportEnum = TransportEnum.ALL;
private final Logger logger = LoggerFactory.getLogger(HubConnection.class);

Expand Down Expand Up @@ -495,8 +497,54 @@ public void send(String method, Object... args) {
throw new RuntimeException("The 'send' method cannot be called if the connection is not active.");
}

InvocationMessage invocationMessage = new InvocationMessage(null, method, args);
sendInvocationMessage(method, args);
}

private void sendInvocationMessage(String method, Object[] args) {
sendInvocationMessage(method, args, null, false);
}

private void sendInvocationMessage(String method, Object[] args, String id, Boolean isStreamInvocation) {
List<String> streamIds = new ArrayList<>();
args = checkUploadStream(args, streamIds);
InvocationMessage invocationMessage;
if (isStreamInvocation) {
invocationMessage = new StreamInvocationMessage(id, method, args, streamIds);
} else {
invocationMessage = new InvocationMessage(id, method, args, streamIds);
}

sendHubMessage(invocationMessage);
launchStreams(streamIds);
}

void launchStreams(List<String> streamIds) {
if (streamMap.isEmpty()) {
return;
}

for (String streamId: streamIds) {
Observable observable = this.streamMap.get(streamId);
observable.subscribe(
(item) -> sendHubMessage(new StreamItem(streamId, item)),
(error) -> sendHubMessage(new CompletionMessage(streamId, null, error.toString())),
() -> sendHubMessage(new CompletionMessage(streamId, null, null)));
}
}

Object[] checkUploadStream(Object[] args, List<String> streamIds) {
List<Object> params = new ArrayList<>(Arrays.asList(args));
for (Object arg: args) {
if(arg instanceof Observable) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if (

params.remove(arg);
Observable stream = (Observable)arg;
String streamId = connectionState.getNextInvocationId();
streamIds.add(streamId);
this.streamMap.put(streamId, stream);
}
}

return params.toArray();
}

/**
Expand All @@ -515,7 +563,6 @@ public <T> Single<T> invoke(Class<T> returnType, String method, Object... args)
}

String id = connectionState.getNextInvocationId();
InvocationMessage invocationMessage = new InvocationMessage(id, method, args);

SingleSubject<T> subject = SingleSubject.create();
InvocationRequest irq = new InvocationRequest(returnType, id);
Expand All @@ -535,8 +582,7 @@ public <T> Single<T> invoke(Class<T> returnType, String method, Object... args)

// Make sure the actual send is after setting up the callbacks otherwise there is a race
// where the map doesn't have the callbacks yet when the response is returned
sendHubMessage(invocationMessage);

sendInvocationMessage(method, args, id, false);
return subject;
}

Expand All @@ -553,7 +599,6 @@ public <T> Single<T> invoke(Class<T> returnType, String method, Object... args)
public <T> Observable<T> stream(Class<T> returnType, String method, Object ... args) {
String invocationId = connectionState.getNextInvocationId();
AtomicInteger subscriptionCount = new AtomicInteger();
StreamInvocationMessage streamInvocationMessage = new StreamInvocationMessage(invocationId, method, args);
InvocationRequest irq = new InvocationRequest(returnType, invocationId);
connectionState.addInvocation(irq);
ReplaySubject<T> subject = ReplaySubject.create();
Expand All @@ -569,9 +614,8 @@ public <T> Observable<T> stream(Class<T> returnType, String method, Object ... a
}, error -> subject.onError(error),
() -> subject.onComplete());

sendHubMessage(streamInvocationMessage);
Observable<T> observable = subject.doOnSubscribe((subscriber) -> subscriptionCount.incrementAndGet());

sendInvocationMessage(method, args, invocationId, true);
return observable.doOnDispose(() -> {
if (subscriptionCount.decrementAndGet() == 0) {
CancelInvocationMessage cancelInvocationMessage = new CancelInvocationMessage(invocationId);
Expand All @@ -591,8 +635,8 @@ private void sendHubMessage(HubMessage message) {
} else {
logger.debug("Sending {} message.", message.getMessageType().name());
}
transport.send(serializedMessage).subscribeWith(CompletableSubject.create());

transport.send(serializedMessage).subscribeWith(CompletableSubject.create());
resetKeepAlive();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,26 @@

package com.microsoft.signalr;

import java.util.Collection;

class InvocationMessage extends HubMessage {
private final int type = HubMessageType.INVOCATION.value;
int type = HubMessageType.INVOCATION.value;
private final String invocationId;
private final String target;
private final Object[] arguments;
private Collection<String> streamIds;

public InvocationMessage(String invocationId, String target, Object[] args) {
this(invocationId, target, args, null);
}

public InvocationMessage(String invocationId, String target, Object[] args, Collection<String> streamIds) {
this.invocationId = invocationId;
this.target = target;
this.arguments = args;
if(streamIds != null && !streamIds.isEmpty()) {
this.streamIds = streamIds;
}
}

public String getInvocationId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,18 @@

package com.microsoft.signalr;

final class StreamInvocationMessage extends HubMessage {
private final int type = HubMessageType.STREAM_INVOCATION.value;
private final String invocationId;
private final String target;
private final Object[] arguments;
import java.util.Collection;

public StreamInvocationMessage(String invocationId, String target, Object[] args) {
this.invocationId = invocationId;
this.target = target;
this.arguments = args;
}
final class StreamInvocationMessage extends InvocationMessage {

public String getInvocationId() {
return invocationId;
}

public String getTarget() {
return target;
public StreamInvocationMessage(String invocationId, String target, Object[] args) {
super(invocationId, target, args);
super.type = HubMessageType.STREAM_INVOCATION.value;
}

public Object[] getArguments() {
return arguments;
public StreamInvocationMessage(String invocationId, String target, Object[] args, Collection<String> streamIds) {
super(invocationId, target, args, streamIds);
super.type = HubMessageType.STREAM_INVOCATION.value;
}

@Override
Expand Down
Loading