Skip to content

Refactor streaming from client to server #4559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public ReadOnlyMemory<byte> GetMessageBytes(HubMessage message)

public class NoErrorHubConnectionContext : HubConnectionContext
{
public TaskCompletionSource<object> ReceivedCompleted = new TaskCompletionSource<object>();

public NoErrorHubConnectionContext(ConnectionContext connectionContext, TimeSpan keepAliveInterval, ILoggerFactory loggerFactory) : base(connectionContext, keepAliveInterval, loggerFactory)
{
}
Expand All @@ -88,6 +90,8 @@ public override ValueTask WriteAsync(HubMessage message, CancellationToken cance
{
if (message is CompletionMessage completionMessage)
{
ReceivedCompleted.TrySetResult(null);

if (!string.IsNullOrEmpty(completionMessage.Error))
{
throw new Exception("Error invoking hub method: " + completionMessage.Error);
Expand Down Expand Up @@ -163,72 +167,116 @@ public ChannelReader<int> StreamChannelReaderCount(int count)

return channel.Reader;
}

public async Task UploadStream(ChannelReader<string> channelReader)
{
while (await channelReader.WaitToReadAsync())
{
while (channelReader.TryRead(out var item))
{
}
}
}
}

[Benchmark]
public Task Invocation()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "Invocation", Array.Empty<object>()));
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "Invocation", Array.Empty<object>()));
}

[Benchmark]
public Task InvocationAsync()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationAsync", Array.Empty<object>()));
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationAsync", Array.Empty<object>()));
}

[Benchmark]
public Task InvocationReturnValue()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationReturnValue", Array.Empty<object>()));
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationReturnValue", Array.Empty<object>()));
}

[Benchmark]
public Task InvocationReturnAsync()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationReturnAsync", Array.Empty<object>()));
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationReturnAsync", Array.Empty<object>()));
}

[Benchmark]
public Task InvocationValueTaskAsync()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationValueTaskAsync", Array.Empty<object>()));
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationValueTaskAsync", Array.Empty<object>()));
}

[Benchmark]
public Task StreamChannelReader()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReader", Array.Empty<object>()));
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReader", Array.Empty<object>()));
}

[Benchmark]
public Task StreamChannelReaderAsync()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderAsync", Array.Empty<object>()));
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderAsync", Array.Empty<object>()));
}

[Benchmark]
public Task StreamChannelReaderValueTaskAsync()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderValueTaskAsync", Array.Empty<object>()));
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderValueTaskAsync", Array.Empty<object>()));
}

[Benchmark]
public Task StreamChannelReaderCount_Zero()
public async Task StreamChannelReaderCount_Zero()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 0 }));
await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 0 }));

await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task;
(_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource<object>();
}

[Benchmark]
public Task StreamChannelReaderCount_One()
public async Task StreamChannelReaderCount_One()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 1 }));
await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 1 }));

await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task;
(_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource<object>();
}

[Benchmark]
public async Task StreamChannelReaderCount_Thousand()
{
await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 1000 }));

await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task;
(_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource<object>();
}

[Benchmark]
public async Task UploadStream_One()
{
await _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", nameof(TestHub.UploadStream), Array.Empty<object>(), streamIds: new string[] { "1" }));
await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamItemMessage("1", "test"));
await _dispatcher.DispatchMessageAsync(_connectionContext, CompletionMessage.Empty("1"));

await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task;
(_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource<object>();
}

[Benchmark]
public Task StreamChannelReaderCount_Thousand()
public async Task UploadStream_Thousand()
{
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 1000 }));
await _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", nameof(TestHub.UploadStream), Array.Empty<object>(), streamIds: new string[] { "1" }));
for (var i = 0; i < 1000; ++i)
{
await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamItemMessage("1", "test"));
}
await _dispatcher.DispatchMessageAsync(_connectionContext, CompletionMessage.Empty("1"));

await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task;
(_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource<object>();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import { Buffer } from "buffer";
import * as msgpack5 from "msgpack5";

import { CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage, LogLevel, MessageHeaders, MessageType, NullLogger, StreamCompleteMessage, StreamDataMessage, StreamInvocationMessage, StreamItemMessage, TransferFormat } from "@aspnet/signalr";
import { CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage, LogLevel, MessageHeaders, MessageType, NullLogger, StreamInvocationMessage, StreamItemMessage, TransferFormat } from "@aspnet/signalr";

import { BinaryMessageFormat } from "./BinaryMessageFormat";
import { isArrayBuffer } from "./Utils";
Expand All @@ -25,6 +25,10 @@ export class MessagePackHubProtocol implements IHubProtocol {
/** The TransferFormat of the protocol. */
public readonly transferFormat: TransferFormat = TransferFormat.Binary;

private readonly errorResult = 1;
private readonly voidResult = 2;
private readonly nonVoidResult = 3;

/** Creates an array of HubMessage objects from the specified serialized representation.
*
* @param {ArrayBuffer | Buffer} input An ArrayBuffer containing the serialized representation.
Expand Down Expand Up @@ -65,15 +69,12 @@ export class MessagePackHubProtocol implements IHubProtocol {
return this.writeInvocation(message as InvocationMessage);
case MessageType.StreamInvocation:
return this.writeStreamInvocation(message as StreamInvocationMessage);
case MessageType.StreamData:
return this.writeStreamData(message as StreamDataMessage);
case MessageType.StreamItem:
return this.writeStreamItem(message as StreamItemMessage);
case MessageType.Completion:
throw new Error(`Writing messages of type '${message.type}' is not supported.`);
return this.writeCompletion(message as CompletionMessage);
case MessageType.Ping:
return BinaryMessageFormat.write(SERIALIZED_PING_MESSAGE);
case MessageType.StreamComplete:
return this.writeStreamComplete(message as StreamCompleteMessage);
default:
throw new Error("Invalid message type.");
}
Expand Down Expand Up @@ -147,13 +148,15 @@ export class MessagePackHubProtocol implements IHubProtocol {
arguments: properties[4],
headers,
invocationId,
streamIds: [],
target: properties[3] as string,
type: MessageType.Invocation,
};
} else {
return {
arguments: properties[4],
headers,
streamIds: [],
target: properties[3],
type: MessageType.Invocation,
};
Expand Down Expand Up @@ -181,24 +184,20 @@ export class MessagePackHubProtocol implements IHubProtocol {
throw new Error("Invalid payload for Completion message.");
}

const errorResult = 1;
const voidResult = 2;
const nonVoidResult = 3;

const resultKind = properties[3];

if (resultKind !== voidResult && properties.length < 5) {
if (resultKind !== this.voidResult && properties.length < 5) {
throw new Error("Invalid payload for Completion message.");
}

let error: string | undefined;
let result: any;

switch (resultKind) {
case errorResult:
case this.errorResult:
error = properties[4];
break;
case nonVoidResult:
case this.nonVoidResult:
result = properties[4];
break;
}
Expand All @@ -217,31 +216,43 @@ export class MessagePackHubProtocol implements IHubProtocol {
private writeInvocation(invocationMessage: InvocationMessage): ArrayBuffer {
const msgpack = msgpack5();
const payload = msgpack.encode([MessageType.Invocation, invocationMessage.headers || {}, invocationMessage.invocationId || null,
invocationMessage.target, invocationMessage.arguments]);
invocationMessage.target, invocationMessage.arguments, invocationMessage.streamIds]);

return BinaryMessageFormat.write(payload.slice());
}

private writeStreamInvocation(streamInvocationMessage: StreamInvocationMessage): ArrayBuffer {
const msgpack = msgpack5();
const payload = msgpack.encode([MessageType.StreamInvocation, streamInvocationMessage.headers || {}, streamInvocationMessage.invocationId,
streamInvocationMessage.target, streamInvocationMessage.arguments]);
streamInvocationMessage.target, streamInvocationMessage.arguments, streamInvocationMessage.streamIds]);

return BinaryMessageFormat.write(payload.slice());
}

private writeStreamData(streamDataMessage: StreamDataMessage): ArrayBuffer {
private writeStreamItem(streamItemMessage: StreamItemMessage): ArrayBuffer {
const msgpack = msgpack5();
const payload = msgpack.encode([MessageType.StreamData, streamDataMessage.streamId,
streamDataMessage.item]);
const payload = msgpack.encode([MessageType.StreamItem, streamItemMessage.headers || {}, streamItemMessage.invocationId,
streamItemMessage.item]);

return BinaryMessageFormat.write(payload.slice());
}

private writeStreamComplete(streamCompleteMessage: StreamCompleteMessage): ArrayBuffer {
private writeCompletion(completionMessage: CompletionMessage): ArrayBuffer {
const msgpack = msgpack5();
const payload = msgpack.encode([MessageType.StreamComplete, streamCompleteMessage.streamId,
streamCompleteMessage.error || null]);
const resultKind = completionMessage.error ? this.errorResult : completionMessage.result ? this.nonVoidResult : this.voidResult;

let payload: any;
switch (resultKind) {
case this.errorResult:
payload = msgpack.encode([MessageType.Completion, completionMessage.headers || {}, completionMessage.invocationId, resultKind, completionMessage.error]);
break;
case this.voidResult:
payload = msgpack.encode([MessageType.Completion, completionMessage.headers || {}, completionMessage.invocationId, resultKind]);
break;
case this.nonVoidResult:
payload = msgpack.encode([MessageType.Completion, completionMessage.headers || {}, completionMessage.invocationId, resultKind, completionMessage.result]);
break;
}

return BinaryMessageFormat.write(payload.slice());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ describe("MessagePackHubProtocol", () => {
const invocation = {
arguments: [42, true, "test", ["x1", "y2"], null],
headers: {},
streamIds: [],
target: "myMethod",
type: MessageType.Invocation,
} as InvocationMessage;
Expand All @@ -22,6 +23,7 @@ describe("MessagePackHubProtocol", () => {
const invocation = {
arguments: [new Date(Date.UTC(2018, 1, 1, 12, 34, 56))],
headers: {},
streamIds: [],
target: "mymethod",
type: MessageType.Invocation,
} as InvocationMessage;
Expand All @@ -37,6 +39,7 @@ describe("MessagePackHubProtocol", () => {
headers: {
foo: "bar",
},
streamIds: [],
target: "myMethod",
type: MessageType.Invocation,
} as InvocationMessage;
Expand All @@ -51,6 +54,7 @@ describe("MessagePackHubProtocol", () => {
arguments: [42, true, "test", ["x1", "y2"], null],
headers: {},
invocationId: "123",
streamIds: [],
target: "myMethod",
type: MessageType.Invocation,
} as InvocationMessage;
Expand Down
Loading