Skip to content

Client to Sever Streaming Java Edition #8222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 7, 2019
Merged

Conversation

mikaelm12
Copy link
Contributor

Fixes #7151
Adding support for client to server streaming in the Java client

@mikaelm12 mikaelm12 marked this pull request as ready for review March 6, 2019 01:10
@Eilon Eilon added the area-signalr Includes: SignalR clients and servers label Mar 6, 2019
Copy link
Member

@halter73 halter73 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

@@ -495,8 +497,40 @@ public void send(String method, Object... args) {
throw new RuntimeException("The 'send' method cannot be called if the connection is not active.");
}

InvocationMessage invocationMessage = new InvocationMessage(null, method, args);
List<String> streamIds = new ArrayList<>();
args = checkUploadStream(args, streamIds);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
args = checkUploadStream(args, streamIds);
args = checkUploadStream(args, streamIds);


resetKeepAlive();
return sendCompletable;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems most callers don't use this return value. Is this needed for Observable.subscribe? If so, why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't needed actually. this was for a thought I had initially, but I went a different direction and forgot to revert this.

@@ -571,6 +609,7 @@ public void send(String method, Object... args) {

sendHubMessage(streamInvocationMessage);
Observable<T> observable = subject.doOnSubscribe((subscriber) -> subscriptionCount.incrementAndGet());
launchStreams(streamIds);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the new logic that calls checkUploadStream/launchStreams be consolidated into the sendHubMessage method? Perhaps one that specialized in InvocationMessages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with a sendInvocationMessage method for send invoke and stream that handles the logic for upload streams as well.

@@ -3,16 +3,26 @@

package com.microsoft.signalr;

import java.util.Collection;

final class StreamInvocationMessage extends HubMessage {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this extend HubMessage instead of InvocationMessage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly it had to do with how the messages get serialized. I remember making the same comment a while back when we first introduced this message type. I'll double check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated StreamInvocationMessage to extend from InvocationMessage.

Copy link
Member

@BrennanConroy BrennanConroy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you manually tried a functional test with this?

}

void launchStreams(List<String> streamIds) {
if(streamMap.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if (

Object[] checkUploadStream(Object[] args, List<String> streamIds) {
List<Object> params = new ArrayList<>(Arrays.asList(args));
for (Object arg: args) {
if(arg instanceof Observable) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if (

@@ -49,6 +50,7 @@
private long tickRate = 1000;
private CompletableSubject handshakeResponseSubject;
private long handshakeResponseTimeout = 15*1000;
private Map<String, Observable> streamMap = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be thread safe

@mikaelm12
Copy link
Contributor Author

I assume you manually tried a functional test with this?

Yup

@mikaelm12
Copy link
Contributor Author

E:\A\_work\2\s\.nuget\packages\microsoft.dotnet.helix.sdk\2.0.0-beta.19105.2\tools\Microsoft.DotNet.Helix.Sdk.MultiQueue.targets(71,5): error : Job '3c993538-d2d7-48ec-8555-ed6bf13b1609' has 1 failed work items. [E:\A\_work\2\s\eng\helix\helix.proj]

Build FAILED.

cc @aspnet/brt Helix failure that seems to be unrelated to my change.

@BrennanConroy
Copy link
Member

[xUnit.net 00:00:01.25]     Microsoft.AspNetCore.Identity.InMemory.Test.InMemoryUserStoreTest.ChangeEmailFailsWithEmail [FAIL]

[xUnit.net 00:00:01.25]       System.Security.Cryptography.CryptographicException : An error occurred while trying to encrypt the provided data. Refer to the inner exception for more information.

[xUnit.net 00:00:01.25]       ---- System.IO.IOException : The process cannot access the file 'C:\Users\runner\AppData\Local\ASP.NET\DataProtection-Keys\key-cfdf131e-7f8d-4f13-b608-95d28c026468.xml' because it is being used by another process.

@analogrelay
Copy link
Contributor

@mikaelm12 mikaelm12 merged commit b1f828e into master Mar 7, 2019
@BrennanConroy BrennanConroy deleted the mikaelm12/ClientStreaming branch March 7, 2019 17:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-signalr Includes: SignalR clients and servers
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants