Skip to content

Commit 9e4faf8

Browse files
[SignalR][TS client] Stateful reconnect (#49940)
1 parent 6008981 commit 9e4faf8

18 files changed

+1420
-32
lines changed

src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ import { Encoder, Decoder } from "@msgpack/msgpack";
66
import { MessagePackOptions } from "./MessagePackOptions";
77

88
import {
9+
AckMessage,
910
CancelInvocationMessage, CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage,
10-
LogLevel, MessageHeaders, MessageType, NullLogger, StreamInvocationMessage, StreamItemMessage, TransferFormat,
11+
LogLevel, MessageHeaders, MessageType, NullLogger, SequenceMessage, StreamInvocationMessage, StreamItemMessage, TransferFormat,
1112
} from "@microsoft/signalr";
1213

1314
import { BinaryMessageFormat } from "./BinaryMessageFormat";
@@ -114,6 +115,10 @@ export class MessagePackHubProtocol implements IHubProtocol {
114115
return this._writeCancelInvocation(message as CancelInvocationMessage);
115116
case MessageType.Close:
116117
return this._writeClose();
118+
case MessageType.Ack:
119+
return this._writeAck(message as AckMessage);
120+
case MessageType.Sequence:
121+
return this._writeSequence(message as SequenceMessage);
117122
default:
118123
throw new Error("Invalid message type.");
119124
}
@@ -142,6 +147,10 @@ export class MessagePackHubProtocol implements IHubProtocol {
142147
return this._createPingMessage(properties);
143148
case MessageType.Close:
144149
return this._createCloseMessage(properties);
150+
case MessageType.Ack:
151+
return this._createAckMessage(properties);
152+
case MessageType.Sequence:
153+
return this._createSequenceMessage(properties);
145154
default:
146155
// Future protocol changes can add message types, old clients can ignore them
147156
logger.log(LogLevel.Information, "Unknown message type '" + messageType + "' ignored.");
@@ -252,6 +261,30 @@ export class MessagePackHubProtocol implements IHubProtocol {
252261
return completionMessage;
253262
}
254263

264+
private _createAckMessage(properties: any[]): HubMessage {
265+
// check minimum length to allow protocol to add items to the end of objects in future releases
266+
if (properties.length < 1) {
267+
throw new Error("Invalid payload for Ack message.");
268+
}
269+
270+
return {
271+
sequenceId: properties[1],
272+
type: MessageType.Ack,
273+
} as HubMessage;
274+
}
275+
276+
private _createSequenceMessage(properties: any[]): HubMessage {
277+
// check minimum length to allow protocol to add items to the end of objects in future releases
278+
if (properties.length < 1) {
279+
throw new Error("Invalid payload for Sequence message.");
280+
}
281+
282+
return {
283+
sequenceId: properties[1],
284+
type: MessageType.Sequence,
285+
} as HubMessage;
286+
}
287+
255288
private _writeInvocation(invocationMessage: InvocationMessage): ArrayBuffer {
256289
let payload: any;
257290
if (invocationMessage.streamIds) {
@@ -317,6 +350,18 @@ export class MessagePackHubProtocol implements IHubProtocol {
317350
return BinaryMessageFormat.write(payload.slice());
318351
}
319352

353+
private _writeAck(ackMessage: AckMessage): ArrayBuffer {
354+
const payload = this._encoder.encode([MessageType.Ack, ackMessage.sequenceId]);
355+
356+
return BinaryMessageFormat.write(payload.slice());
357+
}
358+
359+
private _writeSequence(sequenceMessage: SequenceMessage): ArrayBuffer {
360+
const payload = this._encoder.encode([MessageType.Sequence, sequenceMessage.sequenceId]);
361+
362+
return BinaryMessageFormat.write(payload.slice());
363+
}
364+
320365
private _readHeaders(properties: any): MessageHeaders {
321366
const headers: MessageHeaders = properties[1] as MessageHeaders;
322367
if (typeof headers !== "object") {

src/SignalR/clients/ts/signalr-protocol-msgpack/tests/MessagePackHubProtocol.test.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4-
import { CloseMessage, CompletionMessage, InvocationMessage, MessageType, NullLogger, StreamItemMessage } from "@microsoft/signalr";
4+
import { AckMessage, CloseMessage, CompletionMessage, InvocationMessage, MessageType, NullLogger, SequenceMessage, StreamItemMessage } from "@microsoft/signalr";
55
import { MessagePackHubProtocol } from "../src/MessagePackHubProtocol";
66

77
describe("MessagePackHubProtocol", () => {
@@ -284,4 +284,35 @@ describe("MessagePackHubProtocol", () => {
284284
expect(parsedMessages.length).toEqual(1);
285285
expect(parsedMessages[0].type).toEqual(MessageType.Close);
286286
});
287+
288+
it("can write/read Ack message", () => {
289+
const ackMessage = {
290+
sequenceId: 13,
291+
type: MessageType.Ack,
292+
} as AckMessage;
293+
294+
const protocol = new MessagePackHubProtocol();
295+
const parsedMessages = protocol.parseMessages(protocol.writeMessage(ackMessage), NullLogger.instance);
296+
expect(parsedMessages.length).toEqual(1);
297+
expect(parsedMessages[0].type).toEqual(MessageType.Ack);
298+
expect(parsedMessages[0]).toEqual({
299+
sequenceId: 13,
300+
type: MessageType.Ack
301+
});
302+
});
303+
304+
it("can write/read Sequence message", () => {
305+
const sequenceMessage = {
306+
sequenceId: 24,
307+
type: MessageType.Sequence,
308+
} as SequenceMessage;
309+
310+
const protocol = new MessagePackHubProtocol();
311+
const parsedMessages = protocol.parseMessages(protocol.writeMessage(sequenceMessage), NullLogger.instance);
312+
expect(parsedMessages.length).toEqual(1);
313+
expect(parsedMessages[0]).toEqual({
314+
sequenceId: 24,
315+
type: MessageType.Sequence
316+
});
317+
});
287318
});

src/SignalR/clients/ts/signalr/src/HttpConnection.ts

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ export interface INegotiateResponse {
3131
url?: string;
3232
accessToken?: string;
3333
error?: string;
34+
useAck?: boolean;
3435
}
3536

3637
/** @private */
@@ -331,6 +332,11 @@ export class HttpConnection implements IConnection {
331332
// So we set it equal to connectionId so all our logic can use connectionToken without being aware of the negotiate version
332333
negotiateResponse.connectionToken = negotiateResponse.connectionId;
333334
}
335+
336+
if (negotiateResponse.useAck && this._options._useStatefulReconnect !== true) {
337+
return Promise.reject(new FailedToNegotiateWithServerError("Client didn't negotiate Stateful Reconnect but the server did."));
338+
}
339+
334340
return negotiateResponse;
335341
} catch (e) {
336342
let errorMessage = "Failed to complete negotiation with the server: " + e;
@@ -368,7 +374,7 @@ export class HttpConnection implements IConnection {
368374
const transports = negotiateResponse.availableTransports || [];
369375
let negotiate: INegotiateResponse | undefined = negotiateResponse;
370376
for (const endpoint of transports) {
371-
const transportOrError = this._resolveTransportOrError(endpoint, requestedTransport, requestedTransferFormat);
377+
const transportOrError = this._resolveTransportOrError(endpoint, requestedTransport, requestedTransferFormat, negotiate?.useAck === true);
372378
if (transportOrError instanceof Error) {
373379
// Store the error and continue, we don't want to cause a re-negotiate in these cases
374380
transportExceptions.push(`${endpoint.transport} failed:`);
@@ -413,7 +419,8 @@ export class HttpConnection implements IConnection {
413419
if (!this._options.WebSocket) {
414420
throw new Error("'WebSocket' is not supported in your environment.");
415421
}
416-
return new WebSocketTransport(this._httpClient, this._accessTokenFactory, this._logger, this._options.logMessageContent!, this._options.WebSocket, this._options.headers || {});
422+
return new WebSocketTransport(this._httpClient, this._accessTokenFactory, this._logger, this._options.logMessageContent!,
423+
this._options.WebSocket, this._options.headers || {});
417424
case HttpTransportType.ServerSentEvents:
418425
if (!this._options.EventSource) {
419426
throw new Error("'EventSource' is not supported in your environment.");
@@ -428,11 +435,34 @@ export class HttpConnection implements IConnection {
428435

429436
private _startTransport(url: string, transferFormat: TransferFormat): Promise<void> {
430437
this.transport!.onreceive = this.onreceive;
431-
this.transport!.onclose = (e) => this._stopConnection(e);
438+
if (this.features.reconnect) {
439+
this.transport!.onclose = async (e) => {
440+
let callStop = false;
441+
if (this.features.resend) {
442+
try {
443+
this.features.disconnected();
444+
await this.transport!.connect(url, transferFormat);
445+
await this.features.resend();
446+
} catch {
447+
callStop = true;
448+
}
449+
} else {
450+
this._stopConnection(e);
451+
return;
452+
}
453+
454+
if (callStop) {
455+
this._stopConnection(e);
456+
}
457+
};
458+
} else {
459+
this.transport!.onclose = (e) => this._stopConnection(e);
460+
}
432461
return this.transport!.connect(url, transferFormat);
433462
}
434463

435-
private _resolveTransportOrError(endpoint: IAvailableTransport, requestedTransport: HttpTransportType | undefined, requestedTransferFormat: TransferFormat): ITransport | Error | unknown {
464+
private _resolveTransportOrError(endpoint: IAvailableTransport, requestedTransport: HttpTransportType | undefined,
465+
requestedTransferFormat: TransferFormat, useAcks: boolean): ITransport | Error | unknown {
436466
const transport = HttpTransportType[endpoint.transport];
437467
if (transport === null || transport === undefined) {
438468
this._logger.log(LogLevel.Debug, `Skipping transport '${endpoint.transport}' because it is not supported by this client.`);
@@ -448,6 +478,7 @@ export class HttpConnection implements IConnection {
448478
} else {
449479
this._logger.log(LogLevel.Debug, `Selecting transport '${HttpTransportType[transport]}'.`);
450480
try {
481+
this.features.reconnect = transport === HttpTransportType.WebSockets ? useAcks : undefined;
451482
return this._constructTransport(transport);
452483
} catch (ex) {
453484
return ex;
@@ -544,19 +575,30 @@ export class HttpConnection implements IConnection {
544575
}
545576

546577
private _resolveNegotiateUrl(url: string): string {
547-
const index = url.indexOf("?");
548-
let negotiateUrl = url.substring(0, index === -1 ? url.length : index);
549-
if (negotiateUrl[negotiateUrl.length - 1] !== "/") {
550-
negotiateUrl += "/";
578+
const negotiateUrl = new URL(url);
579+
580+
if (negotiateUrl.pathname.endsWith('/')) {
581+
negotiateUrl.pathname += "negotiate";
582+
} else {
583+
negotiateUrl.pathname += "/negotiate";
551584
}
552-
negotiateUrl += "negotiate";
553-
negotiateUrl += index === -1 ? "" : url.substring(index);
585+
const searchParams = new URLSearchParams(negotiateUrl.searchParams);
554586

555-
if (negotiateUrl.indexOf("negotiateVersion") === -1) {
556-
negotiateUrl += index === -1 ? "?" : "&";
557-
negotiateUrl += "negotiateVersion=" + this._negotiateVersion;
587+
if (!searchParams.has("negotiateVersion")) {
588+
searchParams.append("negotiateVersion", this._negotiateVersion.toString());
558589
}
559-
return negotiateUrl;
590+
591+
if (searchParams.has("useAck")) {
592+
if (searchParams.get("useAck") === "true") {
593+
this._options._useStatefulReconnect = true;
594+
}
595+
} else if (this._options._useStatefulReconnect === true) {
596+
searchParams.append("useAck", "true");
597+
}
598+
599+
negotiateUrl.search = searchParams.toString();
600+
601+
return negotiateUrl.toString();
560602
}
561603
}
562604

src/SignalR/clients/ts/signalr/src/HubConnection.ts

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ import { IRetryPolicy } from "./IRetryPolicy";
1010
import { IStreamResult } from "./Stream";
1111
import { Subject } from "./Subject";
1212
import { Arg, getErrorString, Platform } from "./Utils";
13+
import { MessageBuffer } from "./MessageBuffer";
1314

1415
const DEFAULT_TIMEOUT_IN_MS: number = 30 * 1000;
1516
const DEFAULT_PING_INTERVAL_IN_MS: number = 15 * 1000;
17+
const DEFAULT_STATEFUL_RECONNECT_BUFFER_SIZE = 100_000;
1618

1719
/** Describes the current state of the {@link HubConnection} to the server. */
1820
export enum HubConnectionState {
@@ -36,11 +38,13 @@ export class HubConnection {
3638
private readonly connection: IConnection;
3739
private readonly _logger: ILogger;
3840
private readonly _reconnectPolicy?: IRetryPolicy;
41+
private readonly _statefulReconnectBufferSize: number;
3942
private _protocol: IHubProtocol;
4043
private _handshakeProtocol: HandshakeProtocol;
4144
private _callbacks: { [invocationId: string]: (invocationEvent: StreamItemMessage | CompletionMessage | null, error?: Error) => void };
4245
private _methods: { [name: string]: (((...args: any[]) => void) | ((...args: any[]) => any))[] };
4346
private _invocationId: number;
47+
private _messageBuffer?: MessageBuffer;
4448

4549
private _closedCallbacks: ((error?: Error) => void)[];
4650
private _reconnectingCallbacks: ((error?: Error) => void)[];
@@ -98,8 +102,10 @@ export class HubConnection {
98102
protocol: IHubProtocol,
99103
reconnectPolicy?: IRetryPolicy,
100104
serverTimeoutInMilliseconds?: number,
101-
keepAliveIntervalInMilliseconds?: number): HubConnection {
102-
return new HubConnection(connection, logger, protocol, reconnectPolicy, serverTimeoutInMilliseconds, keepAliveIntervalInMilliseconds);
105+
keepAliveIntervalInMilliseconds?: number,
106+
statefulReconnectBufferSize?: number): HubConnection {
107+
return new HubConnection(connection, logger, protocol, reconnectPolicy,
108+
serverTimeoutInMilliseconds, keepAliveIntervalInMilliseconds, statefulReconnectBufferSize);
103109
}
104110

105111
private constructor(
@@ -108,14 +114,17 @@ export class HubConnection {
108114
protocol: IHubProtocol,
109115
reconnectPolicy?: IRetryPolicy,
110116
serverTimeoutInMilliseconds?: number,
111-
keepAliveIntervalInMilliseconds?: number) {
117+
keepAliveIntervalInMilliseconds?: number,
118+
statefulReconnectBufferSize?: number) {
112119
Arg.isRequired(connection, "connection");
113120
Arg.isRequired(logger, "logger");
114121
Arg.isRequired(protocol, "protocol");
115122

116123
this.serverTimeoutInMilliseconds = serverTimeoutInMilliseconds ?? DEFAULT_TIMEOUT_IN_MS;
117124
this.keepAliveIntervalInMilliseconds = keepAliveIntervalInMilliseconds ?? DEFAULT_PING_INTERVAL_IN_MS;
118125

126+
this._statefulReconnectBufferSize = statefulReconnectBufferSize ?? DEFAULT_STATEFUL_RECONNECT_BUFFER_SIZE;
127+
119128
this._logger = logger;
120129
this._protocol = protocol;
121130
this.connection = connection;
@@ -248,6 +257,17 @@ export class HubConnection {
248257
throw this._stopDuringStartError;
249258
}
250259

260+
const useAck = this.connection.features.reconnect || false;
261+
if (useAck) {
262+
this._messageBuffer = new MessageBuffer(this._protocol, this.connection, this._statefulReconnectBufferSize);
263+
this.connection.features.disconnected = this._messageBuffer._disconnected.bind(this._messageBuffer);
264+
this.connection.features.resend = () => {
265+
if (this._messageBuffer) {
266+
return this._messageBuffer._resend();
267+
}
268+
}
269+
}
270+
251271
if (!this.connection.features.inherentKeepAlive) {
252272
await this._sendMessage(this._cachedPingMessage);
253273
}
@@ -271,6 +291,7 @@ export class HubConnection {
271291
public async stop(): Promise<void> {
272292
// Capture the start promise before the connection might be restarted in an onclose callback.
273293
const startPromise = this._startPromise;
294+
this.connection.features.reconnect = false;
274295

275296
this._stopPromise = this._stopInternal();
276297
await this._stopPromise;
@@ -399,7 +420,11 @@ export class HubConnection {
399420
* @param message The js object to serialize and send.
400421
*/
401422
private _sendWithProtocol(message: any) {
402-
return this._sendMessage(this._protocol.writeMessage(message));
423+
if (this._messageBuffer) {
424+
return this._messageBuffer._send(message);
425+
} else {
426+
return this._sendMessage(this._protocol.writeMessage(message));
427+
}
403428
}
404429

405430
/** Invokes a hub method on the server using the specified name and arguments. Does not wait for a response from the receiver.
@@ -575,6 +600,11 @@ export class HubConnection {
575600
const messages = this._protocol.parseMessages(data, this._logger);
576601

577602
for (const message of messages) {
603+
if (this._messageBuffer && !this._messageBuffer._shouldProcessMessage(message)) {
604+
// Don't process the message, we are either waiting for a SequenceMessage or received a duplicate message
605+
continue;
606+
}
607+
578608
switch (message.type) {
579609
case MessageType.Invocation:
580610
// eslint-disable-next-line @typescript-eslint/no-floating-promises
@@ -616,6 +646,16 @@ export class HubConnection {
616646

617647
break;
618648
}
649+
case MessageType.Ack:
650+
if (this._messageBuffer) {
651+
this._messageBuffer._ack(message);
652+
}
653+
break;
654+
case MessageType.Sequence:
655+
if (this._messageBuffer) {
656+
this._messageBuffer._resetSequence(message);
657+
}
658+
break;
619659
default:
620660
this._logger.log(LogLevel.Warning, `Invalid message type: ${message.type}.`);
621661
break;
@@ -800,6 +840,10 @@ export class HubConnection {
800840
if (this._connectionStarted) {
801841
this._connectionState = HubConnectionState.Disconnected;
802842
this._connectionStarted = false;
843+
if (this._messageBuffer) {
844+
this._messageBuffer._dispose(error ?? new Error("Connection closed."));
845+
this._messageBuffer = undefined;
846+
}
803847

804848
if (Platform.isBrowser) {
805849
window.document.removeEventListener("freeze", this._freezeEventListener);

0 commit comments

Comments
 (0)