-
Notifications
You must be signed in to change notification settings - Fork 10.4k
Client to Sever Streaming Java Edition #8222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
@@ -495,8 +497,40 @@ public void send(String method, Object... args) { | |||
throw new RuntimeException("The 'send' method cannot be called if the connection is not active."); | |||
} | |||
|
|||
InvocationMessage invocationMessage = new InvocationMessage(null, method, args); | |||
List<String> streamIds = new ArrayList<>(); | |||
args = checkUploadStream(args, streamIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
args = checkUploadStream(args, streamIds); | |
args = checkUploadStream(args, streamIds); |
|
||
resetKeepAlive(); | ||
return sendCompletable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems most callers don't use this return value. Is this needed for Observable.subscribe
? If so, why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't needed actually. this was for a thought I had initially, but I went a different direction and forgot to revert this.
@@ -571,6 +609,7 @@ public void send(String method, Object... args) { | |||
|
|||
sendHubMessage(streamInvocationMessage); | |||
Observable<T> observable = subject.doOnSubscribe((subscriber) -> subscriptionCount.incrementAndGet()); | |||
launchStreams(streamIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the new logic that calls checkUploadStream/launchStreams be consolidated into the sendHubMessage method? Perhaps one that specialized in InvocationMessages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated with a sendInvocationMessage
method for send
invoke
and stream
that handles the logic for upload streams as well.
@@ -3,16 +3,26 @@ | |||
|
|||
package com.microsoft.signalr; | |||
|
|||
import java.util.Collection; | |||
|
|||
final class StreamInvocationMessage extends HubMessage { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this extend HubMessage instead of InvocationMessage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly it had to do with how the messages get serialized. I remember making the same comment a while back when we first introduced this message type. I'll double check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated StreamInvocationMessage
to extend from InvocationMessage
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume you manually tried a functional test with this?
} | ||
|
||
void launchStreams(List<String> streamIds) { | ||
if(streamMap.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if (
Object[] checkUploadStream(Object[] args, List<String> streamIds) { | ||
List<Object> params = new ArrayList<>(Arrays.asList(args)); | ||
for (Object arg: args) { | ||
if(arg instanceof Observable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if (
@@ -49,6 +50,7 @@ | |||
private long tickRate = 1000; | |||
private CompletableSubject handshakeResponseSubject; | |||
private long handshakeResponseTimeout = 15*1000; | |||
private Map<String, Observable> streamMap = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be thread safe
src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java
Show resolved
Hide resolved
Yup |
cc @aspnet/brt Helix failure that seems to be unrelated to my change. |
|
Fixes #7151
Adding support for client to server streaming in the Java client