Skip to content

Commit 3640182

Browse files
Refactor streaming from client to server (#4559)
1 parent 31c8423 commit 3640182

File tree

31 files changed

+769
-591
lines changed

31 files changed

+769
-591
lines changed

src/SignalR/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/DefaultHubDispatcherBenchmark.cs

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ public ReadOnlyMemory<byte> GetMessageBytes(HubMessage message)
8080

8181
public class NoErrorHubConnectionContext : HubConnectionContext
8282
{
83+
public TaskCompletionSource<object> ReceivedCompleted = new TaskCompletionSource<object>();
84+
8385
public NoErrorHubConnectionContext(ConnectionContext connectionContext, TimeSpan keepAliveInterval, ILoggerFactory loggerFactory) : base(connectionContext, keepAliveInterval, loggerFactory)
8486
{
8587
}
@@ -88,6 +90,8 @@ public override ValueTask WriteAsync(HubMessage message, CancellationToken cance
8890
{
8991
if (message is CompletionMessage completionMessage)
9092
{
93+
ReceivedCompleted.TrySetResult(null);
94+
9195
if (!string.IsNullOrEmpty(completionMessage.Error))
9296
{
9397
throw new Exception("Error invoking hub method: " + completionMessage.Error);
@@ -163,72 +167,116 @@ public ChannelReader<int> StreamChannelReaderCount(int count)
163167

164168
return channel.Reader;
165169
}
170+
171+
public async Task UploadStream(ChannelReader<string> channelReader)
172+
{
173+
while (await channelReader.WaitToReadAsync())
174+
{
175+
while (channelReader.TryRead(out var item))
176+
{
177+
}
178+
}
179+
}
166180
}
167181

168182
[Benchmark]
169183
public Task Invocation()
170184
{
171-
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "Invocation", Array.Empty<object>()));
185+
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "Invocation", Array.Empty<object>()));
172186
}
173187

174188
[Benchmark]
175189
public Task InvocationAsync()
176190
{
177-
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationAsync", Array.Empty<object>()));
191+
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationAsync", Array.Empty<object>()));
178192
}
179193

180194
[Benchmark]
181195
public Task InvocationReturnValue()
182196
{
183-
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationReturnValue", Array.Empty<object>()));
197+
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationReturnValue", Array.Empty<object>()));
184198
}
185199

186200
[Benchmark]
187201
public Task InvocationReturnAsync()
188202
{
189-
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationReturnAsync", Array.Empty<object>()));
203+
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationReturnAsync", Array.Empty<object>()));
190204
}
191205

192206
[Benchmark]
193207
public Task InvocationValueTaskAsync()
194208
{
195-
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationValueTaskAsync", Array.Empty<object>()));
209+
return _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", "InvocationValueTaskAsync", Array.Empty<object>()));
196210
}
197211

198212
[Benchmark]
199213
public Task StreamChannelReader()
200214
{
201-
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReader", Array.Empty<object>()));
215+
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReader", Array.Empty<object>()));
202216
}
203217

204218
[Benchmark]
205219
public Task StreamChannelReaderAsync()
206220
{
207-
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderAsync", Array.Empty<object>()));
221+
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderAsync", Array.Empty<object>()));
208222
}
209223

210224
[Benchmark]
211225
public Task StreamChannelReaderValueTaskAsync()
212226
{
213-
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderValueTaskAsync", Array.Empty<object>()));
227+
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderValueTaskAsync", Array.Empty<object>()));
214228
}
215229

216230
[Benchmark]
217-
public Task StreamChannelReaderCount_Zero()
231+
public async Task StreamChannelReaderCount_Zero()
218232
{
219-
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 0 }));
233+
await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 0 }));
234+
235+
await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task;
236+
(_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource<object>();
220237
}
221238

222239
[Benchmark]
223-
public Task StreamChannelReaderCount_One()
240+
public async Task StreamChannelReaderCount_One()
224241
{
225-
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 1 }));
242+
await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 1 }));
243+
244+
await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task;
245+
(_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource<object>();
246+
}
247+
248+
[Benchmark]
249+
public async Task StreamChannelReaderCount_Thousand()
250+
{
251+
await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 1000 }));
252+
253+
await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task;
254+
(_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource<object>();
255+
}
256+
257+
[Benchmark]
258+
public async Task UploadStream_One()
259+
{
260+
await _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", nameof(TestHub.UploadStream), Array.Empty<object>(), streamIds: new string[] { "1" }));
261+
await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamItemMessage("1", "test"));
262+
await _dispatcher.DispatchMessageAsync(_connectionContext, CompletionMessage.Empty("1"));
263+
264+
await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task;
265+
(_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource<object>();
226266
}
227267

228268
[Benchmark]
229-
public Task StreamChannelReaderCount_Thousand()
269+
public async Task UploadStream_Thousand()
230270
{
231-
return _dispatcher.DispatchMessageAsync(_connectionContext, new StreamInvocationMessage("123", "StreamChannelReaderCount", new object[] { 1000 }));
271+
await _dispatcher.DispatchMessageAsync(_connectionContext, new InvocationMessage("123", nameof(TestHub.UploadStream), Array.Empty<object>(), streamIds: new string[] { "1" }));
272+
for (var i = 0; i < 1000; ++i)
273+
{
274+
await _dispatcher.DispatchMessageAsync(_connectionContext, new StreamItemMessage("1", "test"));
275+
}
276+
await _dispatcher.DispatchMessageAsync(_connectionContext, CompletionMessage.Empty("1"));
277+
278+
await (_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted.Task;
279+
(_connectionContext as NoErrorHubConnectionContext).ReceivedCompleted = new TaskCompletionSource<object>();
232280
}
233281
}
234282
}

src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import { Buffer } from "buffer";
55
import * as msgpack5 from "msgpack5";
66

7-
import { CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage, LogLevel, MessageHeaders, MessageType, NullLogger, StreamCompleteMessage, StreamDataMessage, StreamInvocationMessage, StreamItemMessage, TransferFormat } from "@aspnet/signalr";
7+
import { CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage, LogLevel, MessageHeaders, MessageType, NullLogger, StreamInvocationMessage, StreamItemMessage, TransferFormat } from "@aspnet/signalr";
88

99
import { BinaryMessageFormat } from "./BinaryMessageFormat";
1010
import { isArrayBuffer } from "./Utils";
@@ -25,6 +25,10 @@ export class MessagePackHubProtocol implements IHubProtocol {
2525
/** The TransferFormat of the protocol. */
2626
public readonly transferFormat: TransferFormat = TransferFormat.Binary;
2727

28+
private readonly errorResult = 1;
29+
private readonly voidResult = 2;
30+
private readonly nonVoidResult = 3;
31+
2832
/** Creates an array of HubMessage objects from the specified serialized representation.
2933
*
3034
* @param {ArrayBuffer | Buffer} input An ArrayBuffer containing the serialized representation.
@@ -65,15 +69,12 @@ export class MessagePackHubProtocol implements IHubProtocol {
6569
return this.writeInvocation(message as InvocationMessage);
6670
case MessageType.StreamInvocation:
6771
return this.writeStreamInvocation(message as StreamInvocationMessage);
68-
case MessageType.StreamData:
69-
return this.writeStreamData(message as StreamDataMessage);
7072
case MessageType.StreamItem:
73+
return this.writeStreamItem(message as StreamItemMessage);
7174
case MessageType.Completion:
72-
throw new Error(`Writing messages of type '${message.type}' is not supported.`);
75+
return this.writeCompletion(message as CompletionMessage);
7376
case MessageType.Ping:
7477
return BinaryMessageFormat.write(SERIALIZED_PING_MESSAGE);
75-
case MessageType.StreamComplete:
76-
return this.writeStreamComplete(message as StreamCompleteMessage);
7778
default:
7879
throw new Error("Invalid message type.");
7980
}
@@ -147,13 +148,15 @@ export class MessagePackHubProtocol implements IHubProtocol {
147148
arguments: properties[4],
148149
headers,
149150
invocationId,
151+
streamIds: [],
150152
target: properties[3] as string,
151153
type: MessageType.Invocation,
152154
};
153155
} else {
154156
return {
155157
arguments: properties[4],
156158
headers,
159+
streamIds: [],
157160
target: properties[3],
158161
type: MessageType.Invocation,
159162
};
@@ -181,24 +184,20 @@ export class MessagePackHubProtocol implements IHubProtocol {
181184
throw new Error("Invalid payload for Completion message.");
182185
}
183186

184-
const errorResult = 1;
185-
const voidResult = 2;
186-
const nonVoidResult = 3;
187-
188187
const resultKind = properties[3];
189188

190-
if (resultKind !== voidResult && properties.length < 5) {
189+
if (resultKind !== this.voidResult && properties.length < 5) {
191190
throw new Error("Invalid payload for Completion message.");
192191
}
193192

194193
let error: string | undefined;
195194
let result: any;
196195

197196
switch (resultKind) {
198-
case errorResult:
197+
case this.errorResult:
199198
error = properties[4];
200199
break;
201-
case nonVoidResult:
200+
case this.nonVoidResult:
202201
result = properties[4];
203202
break;
204203
}
@@ -217,31 +216,43 @@ export class MessagePackHubProtocol implements IHubProtocol {
217216
private writeInvocation(invocationMessage: InvocationMessage): ArrayBuffer {
218217
const msgpack = msgpack5();
219218
const payload = msgpack.encode([MessageType.Invocation, invocationMessage.headers || {}, invocationMessage.invocationId || null,
220-
invocationMessage.target, invocationMessage.arguments]);
219+
invocationMessage.target, invocationMessage.arguments, invocationMessage.streamIds]);
221220

222221
return BinaryMessageFormat.write(payload.slice());
223222
}
224223

225224
private writeStreamInvocation(streamInvocationMessage: StreamInvocationMessage): ArrayBuffer {
226225
const msgpack = msgpack5();
227226
const payload = msgpack.encode([MessageType.StreamInvocation, streamInvocationMessage.headers || {}, streamInvocationMessage.invocationId,
228-
streamInvocationMessage.target, streamInvocationMessage.arguments]);
227+
streamInvocationMessage.target, streamInvocationMessage.arguments, streamInvocationMessage.streamIds]);
229228

230229
return BinaryMessageFormat.write(payload.slice());
231230
}
232231

233-
private writeStreamData(streamDataMessage: StreamDataMessage): ArrayBuffer {
232+
private writeStreamItem(streamItemMessage: StreamItemMessage): ArrayBuffer {
234233
const msgpack = msgpack5();
235-
const payload = msgpack.encode([MessageType.StreamData, streamDataMessage.streamId,
236-
streamDataMessage.item]);
234+
const payload = msgpack.encode([MessageType.StreamItem, streamItemMessage.headers || {}, streamItemMessage.invocationId,
235+
streamItemMessage.item]);
237236

238237
return BinaryMessageFormat.write(payload.slice());
239238
}
240239

241-
private writeStreamComplete(streamCompleteMessage: StreamCompleteMessage): ArrayBuffer {
240+
private writeCompletion(completionMessage: CompletionMessage): ArrayBuffer {
242241
const msgpack = msgpack5();
243-
const payload = msgpack.encode([MessageType.StreamComplete, streamCompleteMessage.streamId,
244-
streamCompleteMessage.error || null]);
242+
const resultKind = completionMessage.error ? this.errorResult : completionMessage.result ? this.nonVoidResult : this.voidResult;
243+
244+
let payload: any;
245+
switch (resultKind) {
246+
case this.errorResult:
247+
payload = msgpack.encode([MessageType.Completion, completionMessage.headers || {}, completionMessage.invocationId, resultKind, completionMessage.error]);
248+
break;
249+
case this.voidResult:
250+
payload = msgpack.encode([MessageType.Completion, completionMessage.headers || {}, completionMessage.invocationId, resultKind]);
251+
break;
252+
case this.nonVoidResult:
253+
payload = msgpack.encode([MessageType.Completion, completionMessage.headers || {}, completionMessage.invocationId, resultKind, completionMessage.result]);
254+
break;
255+
}
245256

246257
return BinaryMessageFormat.write(payload.slice());
247258
}

src/SignalR/clients/ts/signalr-protocol-msgpack/tests/MessagePackHubProtocol.test.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ describe("MessagePackHubProtocol", () => {
99
const invocation = {
1010
arguments: [42, true, "test", ["x1", "y2"], null],
1111
headers: {},
12+
streamIds: [],
1213
target: "myMethod",
1314
type: MessageType.Invocation,
1415
} as InvocationMessage;
@@ -22,6 +23,7 @@ describe("MessagePackHubProtocol", () => {
2223
const invocation = {
2324
arguments: [new Date(Date.UTC(2018, 1, 1, 12, 34, 56))],
2425
headers: {},
26+
streamIds: [],
2527
target: "mymethod",
2628
type: MessageType.Invocation,
2729
} as InvocationMessage;
@@ -37,6 +39,7 @@ describe("MessagePackHubProtocol", () => {
3739
headers: {
3840
foo: "bar",
3941
},
42+
streamIds: [],
4043
target: "myMethod",
4144
type: MessageType.Invocation,
4245
} as InvocationMessage;
@@ -51,6 +54,7 @@ describe("MessagePackHubProtocol", () => {
5154
arguments: [42, true, "test", ["x1", "y2"], null],
5255
headers: {},
5356
invocationId: "123",
57+
streamIds: [],
5458
target: "myMethod",
5559
type: MessageType.Invocation,
5660
} as InvocationMessage;

0 commit comments

Comments
 (0)