Skip to content

Commit 01c5add

Browse files
Add CancelInvocation support to MsgPack in TS client (#7224)
1 parent 32da267 commit 01c5add

File tree

7 files changed

+109
-10
lines changed

7 files changed

+109
-10
lines changed

src/SignalR/clients/ts/FunctionalTests/TestHub.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.Reactive.Linq;
66
using System.Text;
7+
using System.Threading;
78
using System.Threading.Channels;
89
using System.Threading.Tasks;
910
using Microsoft.AspNetCore.Http.Connections;
@@ -21,6 +22,13 @@ public class CustomObject
2122

2223
public class TestHub : Hub
2324
{
25+
private readonly IHubContext<TestHub> _context;
26+
27+
public TestHub(IHubContext<TestHub> context)
28+
{
29+
_context = context;
30+
}
31+
2432
public string Echo(string message)
2533
{
2634
return message;
@@ -51,6 +59,19 @@ public ChannelReader<string> Stream()
5159
return channel.Reader;
5260
}
5361

62+
public ChannelReader<string> InfiniteStream(CancellationToken token)
63+
{
64+
var channel = Channel.CreateUnbounded<string>();
65+
var connectionId = Context.ConnectionId;
66+
67+
token.Register(async (state) =>
68+
{
69+
await ((IHubContext<TestHub>)state).Clients.Client(connectionId).SendAsync("StreamCanceled");
70+
}, _context);
71+
72+
return channel.Reader;
73+
}
74+
5475
public async Task<string> StreamingConcat(ChannelReader<string> stream)
5576
{
5677
var sb = new StringBuilder();

src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,39 @@ describe("hubConnection", () => {
184184
});
185185
});
186186

187+
it("can stream server method and cancel stream", (done) => {
188+
const hubConnection = getConnectionBuilder(transportType)
189+
.withHubProtocol(protocol)
190+
.build();
191+
192+
hubConnection.onclose((error) => {
193+
expect(error).toBe(undefined);
194+
done();
195+
});
196+
197+
hubConnection.on("StreamCanceled", () => {
198+
hubConnection.stop();
199+
});
200+
201+
hubConnection.start().then(() => {
202+
const subscription = hubConnection.stream<string>("InfiniteStream").subscribe({
203+
complete() {
204+
},
205+
error(err) {
206+
fail(err);
207+
hubConnection.stop();
208+
},
209+
next() {
210+
},
211+
});
212+
213+
subscription.dispose();
214+
}).catch((e) => {
215+
fail(e);
216+
done();
217+
});
218+
});
219+
187220
it("rethrows an exception from the server when invoking", (done) => {
188221
const errorMessage = "An unexpected error occurred invoking 'ThrowException' on the server. InvalidOperationException: An error occurred.";
189222
const hubConnection = getConnectionBuilder(transportType)

src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
import { Buffer } from "buffer";
55
import * as msgpack5 from "msgpack5";
66

7-
import { CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage, LogLevel, MessageHeaders, MessageType, NullLogger, StreamInvocationMessage, StreamItemMessage, TransferFormat } from "@aspnet/signalr";
7+
import { CancelInvocationMessage, CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage,
8+
LogLevel, MessageHeaders, MessageType, NullLogger, StreamInvocationMessage, StreamItemMessage, TransferFormat } from "@aspnet/signalr";
89

910
import { BinaryMessageFormat } from "./BinaryMessageFormat";
1011
import { isArrayBuffer } from "./Utils";
@@ -75,6 +76,8 @@ export class MessagePackHubProtocol implements IHubProtocol {
7576
return this.writeCompletion(message as CompletionMessage);
7677
case MessageType.Ping:
7778
return BinaryMessageFormat.write(SERIALIZED_PING_MESSAGE);
79+
case MessageType.CancelInvocation:
80+
return this.writeCancelInvocation(message as CancelInvocationMessage);
7881
default:
7982
throw new Error("Invalid message type.");
8083
}
@@ -257,6 +260,13 @@ export class MessagePackHubProtocol implements IHubProtocol {
257260
return BinaryMessageFormat.write(payload.slice());
258261
}
259262

263+
private writeCancelInvocation(cancelInvocationMessage: CancelInvocationMessage): ArrayBuffer {
264+
const msgpack = msgpack5();
265+
const payload = msgpack.encode([MessageType.CancelInvocation, cancelInvocationMessage.headers || {}, cancelInvocationMessage.invocationId]);
266+
267+
return BinaryMessageFormat.write(payload.slice());
268+
}
269+
260270
private readHeaders(properties: any): MessageHeaders {
261271
const headers: MessageHeaders = properties[1] as MessageHeaders;
262272
if (typeof headers !== "object") {

src/SignalR/clients/ts/signalr-protocol-msgpack/tests/MessagePackHubProtocol.test.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,4 +202,19 @@ describe("MessagePackHubProtocol", () => {
202202
const buffer = new MessagePackHubProtocol().writeMessage({ type: MessageType.Ping });
203203
expect(new Uint8Array(buffer)).toEqual(payload);
204204
});
205+
206+
it("can write cancel message", () => {
207+
const payload = new Uint8Array([
208+
0x07, // length prefix
209+
0x93, // message array length = 1 (fixarray)
210+
0x05, // type = 5 = CancelInvocation (fixnum)
211+
0x80, // headers
212+
0xa3, // invocationID = string length 3
213+
0x61, // a
214+
0x62, // b
215+
0x63, // c
216+
]);
217+
const buffer = new MessagePackHubProtocol().writeMessage({ type: MessageType.CancelInvocation, invocationId: "abc" });
218+
expect(new Uint8Array(buffer)).toEqual(payload);
219+
});
205220
});

src/SignalR/clients/ts/signalr/src/HubConnection.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,16 @@ export class HubConnection {
156156
const [streams, streamIds] = this.replaceStreamingParams(args);
157157
const invocationDescriptor = this.createStreamInvocation(methodName, args, streamIds);
158158

159+
let promiseQueue: Promise<void>;
159160
const subject = new Subject<T>();
160161
subject.cancelCallback = () => {
161162
const cancelInvocation: CancelInvocationMessage = this.createCancelInvocation(invocationDescriptor.invocationId);
162163

163164
delete this.callbacks[invocationDescriptor.invocationId];
164165

165-
return this.sendWithProtocol(cancelInvocation);
166+
return promiseQueue.then(() => {
167+
return this.sendWithProtocol(cancelInvocation);
168+
});
166169
};
167170

168171
this.callbacks[invocationDescriptor.invocationId] = (invocationEvent: CompletionMessage | StreamItemMessage | null, error?: Error) => {
@@ -183,7 +186,7 @@ export class HubConnection {
183186
}
184187
};
185188

186-
const promiseQueue = this.sendWithProtocol(invocationDescriptor)
189+
promiseQueue = this.sendWithProtocol(invocationDescriptor)
187190
.catch((e) => {
188191
subject.error(e);
189192
delete this.callbacks[invocationDescriptor.invocationId];

src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import { Subject } from "../src/Subject";
1313
import { TextMessageFormat } from "../src/TextMessageFormat";
1414

1515
import { VerifyLogger } from "./Common";
16-
import { delay, PromiseSource, registerUnhandledRejectionHandler } from "./Utils";
16+
import { delayUntil, PromiseSource, registerUnhandledRejectionHandler } from "./Utils";
1717

1818
function createHubConnection(connection: IConnection, logger?: ILogger | null, protocol?: IHubProtocol | null) {
1919
return HubConnection.create(connection, logger || NullLogger.instance, protocol || new JsonHubProtocol());
@@ -66,7 +66,7 @@ describe("HubConnection", () => {
6666

6767
try {
6868
await hubConnection.start();
69-
await delay(500);
69+
await delayUntil(500);
7070

7171
const numPings = connection.sentData.filter((s) => JSON.parse(s).type === MessageType.Ping).length;
7272
expect(numPings).toBeGreaterThanOrEqual(2);
@@ -1075,6 +1075,8 @@ describe("HubConnection", () => {
10751075
// Observer should no longer receive messages
10761076
expect(observer.itemsReceived).toEqual([1]);
10771077

1078+
// Close message sent asynchronously so we need to wait
1079+
await delayUntil(1000, () => connection.sentData.length === 3);
10781080
// Verify the cancel is sent (+ handshake)
10791081
expect(connection.sentData.length).toBe(3);
10801082
expect(JSON.parse(connection.sentData[2])).toEqual({
@@ -1183,14 +1185,14 @@ describe("HubConnection", () => {
11831185
const connection = new TestConnection();
11841186
const hubConnection = createHubConnection(connection, logger);
11851187
try {
1186-
hubConnection.serverTimeoutInMilliseconds = 200;
1188+
hubConnection.serverTimeoutInMilliseconds = 400;
11871189

11881190
const p = new PromiseSource<Error>();
11891191
hubConnection.onclose((e) => p.resolve(e));
11901192

11911193
await hubConnection.start();
11921194

1193-
for (let i = 0; i < 6; i++) {
1195+
for (let i = 0; i < 12; i++) {
11941196
await pingAndWait(connection);
11951197
}
11961198

@@ -1230,7 +1232,7 @@ describe("HubConnection", () => {
12301232

12311233
async function pingAndWait(connection: TestConnection): Promise<void> {
12321234
await connection.receive({ type: MessageType.Ping });
1233-
await delay(50);
1235+
await delayUntil(50);
12341236
}
12351237

12361238
class TestConnection implements IConnection {

src/SignalR/clients/ts/signalr/tests/Utils.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,24 @@ export function registerUnhandledRejectionHandler(): void {
1313
});
1414
}
1515

16-
export function delay(durationInMilliseconds: number): Promise<void> {
16+
export function delayUntil(timeoutInMilliseconds: number, condition?: () => boolean): Promise<void> {
1717
const source = new PromiseSource<void>();
18-
setTimeout(() => source.resolve(), durationInMilliseconds);
18+
let timeWait: number = 0;
19+
const interval = setInterval(() => {
20+
timeWait += 10;
21+
if (condition) {
22+
if (condition() === true) {
23+
source.resolve();
24+
clearInterval(interval);
25+
} else if (timeoutInMilliseconds <= timeWait) {
26+
source.reject(new Error("Timed out waiting for condition"));
27+
clearInterval(interval);
28+
}
29+
} else if (timeoutInMilliseconds <= timeWait) {
30+
source.resolve();
31+
clearInterval(interval);
32+
}
33+
}, 10);
1934
return source.promise;
2035
}
2136

0 commit comments

Comments
 (0)