Skip to content

Commit 9fda946

Browse files
[Backport] Add CancelInvocation support to MsgPack in TS client (#7404)
1 parent a827740 commit 9fda946

File tree

8 files changed

+115
-16
lines changed

8 files changed

+115
-16
lines changed

src/SignalR/clients/ts/FunctionalTests/TestHub.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Reactive.Linq;
6+
using System.Threading;
67
using System.Threading.Channels;
78
using System.Threading.Tasks;
89
using Microsoft.AspNetCore.Http.Connections;
@@ -20,6 +21,13 @@ public class CustomObject
2021

2122
public class TestHub : Hub
2223
{
24+
private readonly IHubContext<TestHub> _context;
25+
26+
public TestHub(IHubContext<TestHub> context)
27+
{
28+
_context = context;
29+
}
30+
2331
public string Echo(string message)
2432
{
2533
return message;
@@ -50,6 +58,19 @@ public ChannelReader<string> Stream()
5058
return channel.Reader;
5159
}
5260

61+
public ChannelReader<string> InfiniteStream(CancellationToken token)
62+
{
63+
var channel = Channel.CreateUnbounded<string>();
64+
var connectionId = Context.ConnectionId;
65+
66+
token.Register(async (state) =>
67+
{
68+
await ((IHubContext<TestHub>)state).Clients.Client(connectionId).SendAsync("StreamCanceled");
69+
}, _context);
70+
71+
return channel.Reader;
72+
}
73+
5374
public ChannelReader<int> EmptyStream()
5475
{
5576
var channel = Channel.CreateUnbounded<int>();

src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,39 @@ describe("hubConnection", () => {
171171
});
172172
});
173173

174+
it("can stream server method and cancel stream", (done) => {
175+
const hubConnection = getConnectionBuilder(transportType)
176+
.withHubProtocol(protocol)
177+
.build();
178+
179+
hubConnection.onclose((error) => {
180+
expect(error).toBe(undefined);
181+
done();
182+
});
183+
184+
hubConnection.on("StreamCanceled", () => {
185+
hubConnection.stop();
186+
});
187+
188+
hubConnection.start().then(() => {
189+
const subscription = hubConnection.stream<string>("InfiniteStream").subscribe({
190+
complete() {
191+
},
192+
error(err) {
193+
fail(err);
194+
hubConnection.stop();
195+
},
196+
next() {
197+
},
198+
});
199+
200+
subscription.dispose();
201+
}).catch((e) => {
202+
fail(e);
203+
done();
204+
});
205+
});
206+
174207
it("rethrows an exception from the server when invoking", (done) => {
175208
const errorMessage = "An unexpected error occurred invoking 'ThrowException' on the server. InvalidOperationException: An error occurred.";
176209
const hubConnection = getConnectionBuilder(transportType)

src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
import { Buffer } from "buffer";
55
import * as msgpack5 from "msgpack5";
66

7-
import { CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage, LogLevel, MessageHeaders, MessageType, NullLogger, StreamInvocationMessage, StreamItemMessage, TransferFormat } from "@aspnet/signalr";
7+
import { CancelInvocationMessage, CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage,
8+
LogLevel, MessageHeaders, MessageType, NullLogger, StreamInvocationMessage, StreamItemMessage, TransferFormat } from "@aspnet/signalr";
89

910
import { BinaryMessageFormat } from "./BinaryMessageFormat";
1011
import { isArrayBuffer } from "./Utils";
@@ -70,6 +71,8 @@ export class MessagePackHubProtocol implements IHubProtocol {
7071
throw new Error(`Writing messages of type '${message.type}' is not supported.`);
7172
case MessageType.Ping:
7273
return BinaryMessageFormat.write(SERIALIZED_PING_MESSAGE);
74+
case MessageType.CancelInvocation:
75+
return this.writeCancelInvocation(message as CancelInvocationMessage);
7376
default:
7477
throw new Error("Invalid message type.");
7578
}
@@ -226,6 +229,13 @@ export class MessagePackHubProtocol implements IHubProtocol {
226229
return BinaryMessageFormat.write(payload.slice());
227230
}
228231

232+
private writeCancelInvocation(cancelInvocationMessage: CancelInvocationMessage): ArrayBuffer {
233+
const msgpack = msgpack5();
234+
const payload = msgpack.encode([MessageType.CancelInvocation, cancelInvocationMessage.headers || {}, cancelInvocationMessage.invocationId]);
235+
236+
return BinaryMessageFormat.write(payload.slice());
237+
}
238+
229239
private readHeaders(properties: any): MessageHeaders {
230240
const headers: MessageHeaders = properties[1] as MessageHeaders;
231241
if (typeof headers !== "object") {

src/SignalR/clients/ts/signalr-protocol-msgpack/tests/MessagePackHubProtocol.test.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,19 @@ describe("MessagePackHubProtocol", () => {
198198
const buffer = new MessagePackHubProtocol().writeMessage({ type: MessageType.Ping });
199199
expect(new Uint8Array(buffer)).toEqual(payload);
200200
});
201+
202+
it("can write cancel message", () => {
203+
const payload = new Uint8Array([
204+
0x07, // length prefix
205+
0x93, // message array length = 1 (fixarray)
206+
0x05, // type = 5 = CancelInvocation (fixnum)
207+
0x80, // headers
208+
0xa3, // invocationID = string length 3
209+
0x61, // a
210+
0x62, // b
211+
0x63, // c
212+
]);
213+
const buffer = new MessagePackHubProtocol().writeMessage({ type: MessageType.CancelInvocation, invocationId: "abc" });
214+
expect(new Uint8Array(buffer)).toEqual(payload);
215+
});
201216
});

src/SignalR/clients/ts/signalr/src/HubConnection.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,14 +154,18 @@ export class HubConnection {
154154
public stream<T = any>(methodName: string, ...args: any[]): IStreamResult<T> {
155155
const invocationDescriptor = this.createStreamInvocation(methodName, args);
156156

157-
const subject = new Subject<T>(() => {
157+
let promiseQueue: Promise<void>;
158+
const subject = new Subject<T>();
159+
subject.cancelCallback = () => {
158160
const cancelInvocation: CancelInvocationMessage = this.createCancelInvocation(invocationDescriptor.invocationId);
159161
const cancelMessage: any = this.protocol.writeMessage(cancelInvocation);
160162

161163
delete this.callbacks[invocationDescriptor.invocationId];
162164

163-
return this.sendMessage(cancelMessage);
164-
});
165+
return promiseQueue.then(() => {
166+
return this.sendMessage(cancelMessage);
167+
});
168+
};
165169

166170
this.callbacks[invocationDescriptor.invocationId] = (invocationEvent: CompletionMessage | StreamItemMessage | null, error?: Error) => {
167171
if (error) {
@@ -183,7 +187,7 @@ export class HubConnection {
183187

184188
const message = this.protocol.writeMessage(invocationDescriptor);
185189

186-
this.sendMessage(message)
190+
promiseQueue = this.sendMessage(message)
187191
.catch((e) => {
188192
subject.error(e);
189193
delete this.callbacks[invocationDescriptor.invocationId];

src/SignalR/clients/ts/signalr/src/Utils.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,10 @@ export function createLogger(logger?: ILogger | LogLevel) {
107107
/** @private */
108108
export class Subject<T> implements IStreamResult<T> {
109109
public observers: Array<IStreamSubscriber<T>>;
110-
public cancelCallback: () => Promise<void>;
110+
public cancelCallback?: () => Promise<void>;
111111

112-
constructor(cancelCallback: () => Promise<void>) {
112+
constructor() {
113113
this.observers = [];
114-
this.cancelCallback = cancelCallback;
115114
}
116115

117116
public next(item: T): void {
@@ -158,7 +157,7 @@ export class SubjectSubscription<T> implements ISubscription<T> {
158157
this.subject.observers.splice(index, 1);
159158
}
160159

161-
if (this.subject.observers.length === 0) {
160+
if (this.subject.observers.length === 0 && this.subject.cancelCallback) {
162161
this.subject.cancelCallback().catch((_) => { });
163162
}
164163
}

src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { IStreamSubscriber } from "../src/Stream";
1212
import { TextMessageFormat } from "../src/TextMessageFormat";
1313

1414
import { VerifyLogger } from "./Common";
15-
import { delay, PromiseSource, registerUnhandledRejectionHandler } from "./Utils";
15+
import { delayUntil, PromiseSource, registerUnhandledRejectionHandler } from "./Utils";
1616

1717
function createHubConnection(connection: IConnection, logger?: ILogger | null, protocol?: IHubProtocol | null) {
1818
return HubConnection.create(connection, logger || NullLogger.instance, protocol || new JsonHubProtocol());
@@ -65,7 +65,7 @@ describe("HubConnection", () => {
6565

6666
try {
6767
await hubConnection.start();
68-
await delay(500);
68+
await delayUntil(500);
6969

7070
const numPings = connection.sentData.filter((s) => JSON.parse(s).type === MessageType.Ping).length;
7171
expect(numPings).toBeGreaterThanOrEqual(2);
@@ -953,6 +953,8 @@ describe("HubConnection", () => {
953953
// Observer should no longer receive messages
954954
expect(observer.itemsReceived).toEqual([1]);
955955

956+
// Close message sent asynchronously so we need to wait
957+
await delayUntil(1000, () => connection.sentData.length === 3);
956958
// Verify the cancel is sent (+ handshake)
957959
expect(connection.sentData.length).toBe(3);
958960
expect(JSON.parse(connection.sentData[2])).toEqual({
@@ -1061,14 +1063,14 @@ describe("HubConnection", () => {
10611063
const connection = new TestConnection();
10621064
const hubConnection = createHubConnection(connection, logger);
10631065
try {
1064-
hubConnection.serverTimeoutInMilliseconds = 200;
1066+
hubConnection.serverTimeoutInMilliseconds = 400;
10651067

10661068
const p = new PromiseSource<Error>();
10671069
hubConnection.onclose((e) => p.resolve(e));
10681070

10691071
await hubConnection.start();
10701072

1071-
for (let i = 0; i < 6; i++) {
1073+
for (let i = 0; i < 12; i++) {
10721074
await pingAndWait(connection);
10731075
}
10741076

@@ -1108,7 +1110,7 @@ describe("HubConnection", () => {
11081110

11091111
async function pingAndWait(connection: TestConnection): Promise<void> {
11101112
await connection.receive({ type: MessageType.Ping });
1111-
await delay(50);
1113+
await delayUntil(50);
11121114
}
11131115

11141116
class TestConnection implements IConnection {

src/SignalR/clients/ts/signalr/tests/Utils.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,24 @@ export function registerUnhandledRejectionHandler(): void {
1313
});
1414
}
1515

16-
export function delay(durationInMilliseconds: number): Promise<void> {
16+
export function delayUntil(timeoutInMilliseconds: number, condition?: () => boolean): Promise<void> {
1717
const source = new PromiseSource<void>();
18-
setTimeout(() => source.resolve(), durationInMilliseconds);
18+
let timeWait: number = 0;
19+
const interval = setInterval(() => {
20+
timeWait += 10;
21+
if (condition) {
22+
if (condition() === true) {
23+
source.resolve();
24+
clearInterval(interval);
25+
} else if (timeoutInMilliseconds <= timeWait) {
26+
source.reject(new Error("Timed out waiting for condition"));
27+
clearInterval(interval);
28+
}
29+
} else if (timeoutInMilliseconds <= timeWait) {
30+
source.resolve();
31+
clearInterval(interval);
32+
}
33+
}, 10);
1934
return source.promise;
2035
}
2136

0 commit comments

Comments
 (0)