Skip to content

Commit b1f828e

Browse files
authored
Client to Sever Streaming Java Edition (#8222)
1 parent 5170c31 commit b1f828e

File tree

4 files changed

+346
-29
lines changed

4 files changed

+346
-29
lines changed

src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package com.microsoft.signalr;
55

6+
import java.lang.reflect.Array;
67
import java.util.*;
78
import java.util.concurrent.*;
89
import java.util.concurrent.atomic.AtomicInteger;
@@ -49,6 +50,7 @@ public class HubConnection {
4950
private long tickRate = 1000;
5051
private CompletableSubject handshakeResponseSubject;
5152
private long handshakeResponseTimeout = 15*1000;
53+
private Map<String, Observable> streamMap = new ConcurrentHashMap<>();
5254
private TransportEnum transportEnum = TransportEnum.ALL;
5355
private final Logger logger = LoggerFactory.getLogger(HubConnection.class);
5456

@@ -495,8 +497,54 @@ public void send(String method, Object... args) {
495497
throw new RuntimeException("The 'send' method cannot be called if the connection is not active.");
496498
}
497499

498-
InvocationMessage invocationMessage = new InvocationMessage(null, method, args);
500+
sendInvocationMessage(method, args);
501+
}
502+
503+
private void sendInvocationMessage(String method, Object[] args) {
504+
sendInvocationMessage(method, args, null, false);
505+
}
506+
507+
private void sendInvocationMessage(String method, Object[] args, String id, Boolean isStreamInvocation) {
508+
List<String> streamIds = new ArrayList<>();
509+
args = checkUploadStream(args, streamIds);
510+
InvocationMessage invocationMessage;
511+
if (isStreamInvocation) {
512+
invocationMessage = new StreamInvocationMessage(id, method, args, streamIds);
513+
} else {
514+
invocationMessage = new InvocationMessage(id, method, args, streamIds);
515+
}
516+
499517
sendHubMessage(invocationMessage);
518+
launchStreams(streamIds);
519+
}
520+
521+
void launchStreams(List<String> streamIds) {
522+
if (streamMap.isEmpty()) {
523+
return;
524+
}
525+
526+
for (String streamId: streamIds) {
527+
Observable observable = this.streamMap.get(streamId);
528+
observable.subscribe(
529+
(item) -> sendHubMessage(new StreamItem(streamId, item)),
530+
(error) -> sendHubMessage(new CompletionMessage(streamId, null, error.toString())),
531+
() -> sendHubMessage(new CompletionMessage(streamId, null, null)));
532+
}
533+
}
534+
535+
Object[] checkUploadStream(Object[] args, List<String> streamIds) {
536+
List<Object> params = new ArrayList<>(Arrays.asList(args));
537+
for (Object arg: args) {
538+
if(arg instanceof Observable) {
539+
params.remove(arg);
540+
Observable stream = (Observable)arg;
541+
String streamId = connectionState.getNextInvocationId();
542+
streamIds.add(streamId);
543+
this.streamMap.put(streamId, stream);
544+
}
545+
}
546+
547+
return params.toArray();
500548
}
501549

502550
/**
@@ -515,7 +563,6 @@ public <T> Single<T> invoke(Class<T> returnType, String method, Object... args)
515563
}
516564

517565
String id = connectionState.getNextInvocationId();
518-
InvocationMessage invocationMessage = new InvocationMessage(id, method, args);
519566

520567
SingleSubject<T> subject = SingleSubject.create();
521568
InvocationRequest irq = new InvocationRequest(returnType, id);
@@ -535,8 +582,7 @@ public <T> Single<T> invoke(Class<T> returnType, String method, Object... args)
535582

536583
// Make sure the actual send is after setting up the callbacks otherwise there is a race
537584
// where the map doesn't have the callbacks yet when the response is returned
538-
sendHubMessage(invocationMessage);
539-
585+
sendInvocationMessage(method, args, id, false);
540586
return subject;
541587
}
542588

@@ -553,7 +599,6 @@ public <T> Single<T> invoke(Class<T> returnType, String method, Object... args)
553599
public <T> Observable<T> stream(Class<T> returnType, String method, Object ... args) {
554600
String invocationId = connectionState.getNextInvocationId();
555601
AtomicInteger subscriptionCount = new AtomicInteger();
556-
StreamInvocationMessage streamInvocationMessage = new StreamInvocationMessage(invocationId, method, args);
557602
InvocationRequest irq = new InvocationRequest(returnType, invocationId);
558603
connectionState.addInvocation(irq);
559604
ReplaySubject<T> subject = ReplaySubject.create();
@@ -569,9 +614,8 @@ public <T> Observable<T> stream(Class<T> returnType, String method, Object ... a
569614
}, error -> subject.onError(error),
570615
() -> subject.onComplete());
571616

572-
sendHubMessage(streamInvocationMessage);
573617
Observable<T> observable = subject.doOnSubscribe((subscriber) -> subscriptionCount.incrementAndGet());
574-
618+
sendInvocationMessage(method, args, invocationId, true);
575619
return observable.doOnDispose(() -> {
576620
if (subscriptionCount.decrementAndGet() == 0) {
577621
CancelInvocationMessage cancelInvocationMessage = new CancelInvocationMessage(invocationId);
@@ -591,8 +635,8 @@ private void sendHubMessage(HubMessage message) {
591635
} else {
592636
logger.debug("Sending {} message.", message.getMessageType().name());
593637
}
594-
transport.send(serializedMessage).subscribeWith(CompletableSubject.create());
595638

639+
transport.send(serializedMessage).subscribeWith(CompletableSubject.create());
596640
resetKeepAlive();
597641
}
598642

src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/InvocationMessage.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,26 @@
33

44
package com.microsoft.signalr;
55

6+
import java.util.Collection;
7+
68
class InvocationMessage extends HubMessage {
7-
private final int type = HubMessageType.INVOCATION.value;
9+
int type = HubMessageType.INVOCATION.value;
810
private final String invocationId;
911
private final String target;
1012
private final Object[] arguments;
13+
private Collection<String> streamIds;
1114

1215
public InvocationMessage(String invocationId, String target, Object[] args) {
16+
this(invocationId, target, args, null);
17+
}
18+
19+
public InvocationMessage(String invocationId, String target, Object[] args, Collection<String> streamIds) {
1320
this.invocationId = invocationId;
1421
this.target = target;
1522
this.arguments = args;
23+
if(streamIds != null && !streamIds.isEmpty()) {
24+
this.streamIds = streamIds;
25+
}
1626
}
1727

1828
public String getInvocationId() {

src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/StreamInvocationMessage.java

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,18 @@
33

44
package com.microsoft.signalr;
55

6-
final class StreamInvocationMessage extends HubMessage {
7-
private final int type = HubMessageType.STREAM_INVOCATION.value;
8-
private final String invocationId;
9-
private final String target;
10-
private final Object[] arguments;
6+
import java.util.Collection;
117

12-
public StreamInvocationMessage(String invocationId, String target, Object[] args) {
13-
this.invocationId = invocationId;
14-
this.target = target;
15-
this.arguments = args;
16-
}
8+
final class StreamInvocationMessage extends InvocationMessage {
179

18-
public String getInvocationId() {
19-
return invocationId;
20-
}
21-
22-
public String getTarget() {
23-
return target;
10+
public StreamInvocationMessage(String invocationId, String target, Object[] args) {
11+
super(invocationId, target, args);
12+
super.type = HubMessageType.STREAM_INVOCATION.value;
2413
}
2514

26-
public Object[] getArguments() {
27-
return arguments;
15+
public StreamInvocationMessage(String invocationId, String target, Object[] args, Collection<String> streamIds) {
16+
super(invocationId, target, args, streamIds);
17+
super.type = HubMessageType.STREAM_INVOCATION.value;
2818
}
2919

3020
@Override

0 commit comments

Comments
 (0)