Skip to content

Commit 69203c4

Browse files
Send CloseMessage from client to server (#48577)
1 parent c5e43cf commit 69203c4

File tree

23 files changed

+293
-40
lines changed

23 files changed

+293
-40
lines changed

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,8 @@ private async Task StartAsyncCore(CancellationToken cancellationToken)
489489
{
490490
Log.ErrorStartingConnection(_logger, ex);
491491

492+
startingConnectionState.Cleanup();
493+
492494
// Can't have any invocations to cancel, we're in the lock.
493495
await CloseAsync(startingConnectionState.Connection).ConfigureAwait(false);
494496
throw;
@@ -543,6 +545,8 @@ private async Task StopAsyncCore(bool disposing)
543545

544546
ConnectionState? connectionState;
545547

548+
var connectionStateStopTask = Task.CompletedTask;
549+
546550
try
547551
{
548552
if (disposing && _disposed)
@@ -559,6 +563,19 @@ private async Task StopAsyncCore(bool disposing)
559563
if (connectionState != null)
560564
{
561565
connectionState.Stopping = true;
566+
// Try to send CloseMessage
567+
var writeTask = SendHubMessage(connectionState, CloseMessage.Empty);
568+
if (writeTask.IsFaulted || writeTask.IsCanceled || !writeTask.IsCompleted)
569+
{
570+
// Ignore exception from write, this is a best effort attempt to let the server know the client closed gracefully.
571+
// We are already closing the connection via an explicit StopAsync call from the user so don't care about any potential
572+
// errors that might happen.
573+
_ = writeTask.ContinueWith(
574+
static t => _ = t.Exception,
575+
CancellationToken.None,
576+
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted,
577+
TaskScheduler.Default);
578+
}
562579
}
563580
else
564581
{
@@ -579,17 +596,20 @@ private async Task StopAsyncCore(bool disposing)
579596
(_serviceProvider as IDisposable)?.Dispose();
580597
}
581598
}
599+
600+
if (connectionState != null)
601+
{
602+
// Start Stop inside the lock so a closure from the transport side at the same time as this doesn't cause an ODE
603+
// But don't await the call in the lock as that could deadlock with HandleConnectionClose in the ReceiveLoop
604+
connectionStateStopTask = connectionState.StopAsync();
605+
}
582606
}
583607
finally
584608
{
585609
_state.ReleaseConnectionLock();
586610
}
587611

588-
// Now stop the connection we captured
589-
if (connectionState != null)
590-
{
591-
await connectionState.StopAsync().ConfigureAwait(false);
592-
}
612+
await connectionStateStopTask.ConfigureAwait(false);
593613
}
594614

595615
/// <summary>
@@ -1459,6 +1479,7 @@ private async Task HandleConnectionClose(ConnectionState connectionState)
14591479

14601480
// Cancel any outstanding invocations within the connection lock
14611481
connectionState.CancelOutstandingInvocations(connectionState.CloseException);
1482+
connectionState.Cleanup();
14621483

14631484
if (connectionState.Stopping || _reconnectPolicy == null)
14641485
{
@@ -1965,9 +1986,9 @@ public Task StopAsync()
19651986

19661987
private async Task StopAsyncCore()
19671988
{
1968-
Log.Stopping(_logger);
1989+
_hubConnection._state.AssertInConnectionLock();
19691990

1970-
_messageBuffer?.Dispose();
1991+
Log.Stopping(_logger);
19711992

19721993
// Complete our write pipe, which should cause everything to shut down
19731994
Log.TerminatingReceiveLoop(_logger);
@@ -1983,6 +2004,11 @@ private async Task StopAsyncCore()
19832004
_stopTcs!.TrySetResult(null);
19842005
}
19852006

2007+
public void Cleanup()
2008+
{
2009+
_messageBuffer?.Dispose();
2010+
}
2011+
19862012
public async Task TimerLoop(TimerAwaitable timer)
19872013
{
19882014
// initialize the timers

src/SignalR/clients/csharp/Client/test/FunctionalTests/Startup.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,9 @@ public void ConfigureServices(IServiceCollection services)
5959
});
6060
services.AddAuthentication(NegotiateDefaults.AuthenticationScheme).AddNegotiate();
6161

62-
// Since tests run in parallel, it's possible multiple servers will startup and read files being written by another test
63-
// Use a unique directory per server to avoid this collision
64-
services.AddDataProtection()
65-
.PersistKeysToFileSystem(Directory.CreateDirectory(Path.GetRandomFileName()));
62+
// Since tests run in parallel, it's possible multiple servers will startup,
63+
// we use an ephemeral key provider to avoid filesystem contention issues
64+
services.AddSingleton<IDataProtectionProvider, EphemeralDataProtectionProvider>();
6665
}
6766

6867
public void Configure(IApplicationBuilder app)

src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.Protocol.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,9 @@ public async Task ClientWithInherentKeepAliveDoesNotPing()
668668
await hubConnection.DisposeAsync().DefaultTimeout();
669669
await connection.DisposeAsync().DefaultTimeout();
670670

671-
Assert.Equal(0, (await connection.ReadAllSentMessagesAsync(ignorePings: false).DefaultTimeout()).Count);
671+
var messages = await connection.ReadAllSentMessagesAsync(ignorePings: false).DefaultTimeout();
672+
var message = Assert.Single(messages);
673+
Assert.Equal("{\"type\":7}", message);
672674
}
673675
finally
674676
{

src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ await Assert.ThrowsAsync<TaskCanceledException>(() =>
193193
await hubConnection.StopAsync().DefaultTimeout();
194194

195195
// Assert that InvokeAsync didn't send a message
196-
Assert.Null(await connection.ReadSentTextMessageAsync().DefaultTimeout());
196+
Assert.Equal("{\"type\":7}", await connection.ReadSentTextMessageAsync().DefaultTimeout());
197197
}
198198
}
199199

@@ -212,7 +212,7 @@ await Assert.ThrowsAsync<TaskCanceledException>(() =>
212212
await hubConnection.StopAsync().DefaultTimeout();
213213

214214
// Assert that SendAsync didn't send a message
215-
Assert.Null(await connection.ReadSentTextMessageAsync().DefaultTimeout());
215+
Assert.Equal("{\"type\":7}", await connection.ReadSentTextMessageAsync().DefaultTimeout());
216216
}
217217
}
218218

@@ -254,7 +254,7 @@ await Assert.ThrowsAsync<TaskCanceledException>(() =>
254254
await hubConnection.StopAsync().DefaultTimeout();
255255

256256
// Assert that StreamAsChannelAsync didn't send a message
257-
Assert.Null(await connection.ReadSentTextMessageAsync().DefaultTimeout());
257+
Assert.Equal("{\"type\":7}", await connection.ReadSentTextMessageAsync().DefaultTimeout());
258258
}
259259
}
260260

@@ -273,7 +273,7 @@ public async Task StreamAsyncCanceledWhenPassedCanceledToken()
273273
await hubConnection.StopAsync().DefaultTimeout();
274274

275275
// Assert that StreamAsync didn't send a message
276-
Assert.Null(await connection.ReadSentTextMessageAsync().DefaultTimeout());
276+
Assert.Equal("{\"type\":7}", await connection.ReadSentTextMessageAsync().DefaultTimeout());
277277
}
278278
}
279279

src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,6 @@ private async Task StartSending(WebSocket socket, bool ignoreFirstCanceled)
491491
{
492492
if (result.IsCanceled && !ignoreFirstCanceled)
493493
{
494-
_logger.LogInformation("send canceled");
495494
break;
496495
}
497496

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/CloseMessage.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package com.microsoft.signalr;
55

66
public final class CloseMessage extends HubMessage {
7+
private final int type = HubMessageType.CLOSE.value;
8+
79
private final String error;
810
private final boolean allowReconnect;
911

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/GsonHubProtocol.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.google.gson.stream.JsonToken;
2020

2121
public final class GsonHubProtocol implements HubProtocol {
22-
private final JsonParser jsonParser = new JsonParser();
2322
private final Gson gson;
2423
private static final String RECORD_SEPARATOR = "\u001e";
2524

@@ -95,7 +94,7 @@ public List<HubMessage> parseMessages(ByteBuffer payload, InvocationBinder binde
9594
case "result":
9695
case "item":
9796
if (invocationId == null || binder.getReturnType(invocationId) == null) {
98-
resultToken = jsonParser.parse(reader);
97+
resultToken = JsonParser.parseReader(reader);
9998
} else {
10099
result = gson.fromJson(reader, binder.getReturnType(invocationId));
101100
}
@@ -123,7 +122,7 @@ public List<HubMessage> parseMessages(ByteBuffer payload, InvocationBinder binde
123122
}
124123
}
125124
} else {
126-
argumentsToken = (JsonArray)jsonParser.parse(reader);
125+
argumentsToken = (JsonArray)JsonParser.parseReader(reader);
127126
}
128127
break;
129128
case "headers":

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,9 @@ private Completable stop(String errorMessage) {
431431
connectionState.stopError = errorMessage;
432432
logger.error("HubConnection disconnected with an error: {}.", errorMessage);
433433
} else {
434+
if (this.state.getHubConnectionState() == HubConnectionState.CONNECTED) {
435+
sendHubMessageWithLock(new CloseMessage());
436+
}
434437
logger.debug("Stopping HubConnection.");
435438
}
436439

src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/GsonHubProtocolTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ public void parsePingMessage() {
5656
assertEquals(HubMessageType.PING, messages.get(0).getMessageType());
5757
}
5858

59+
@Test
60+
public void writeCloseMessage() {
61+
CloseMessage closeMessage = new CloseMessage();
62+
String result = TestUtils.byteBufferToString(hubProtocol.writeMessage(closeMessage));
63+
String expectedResult = "{\"type\":7,\"allowReconnect\":false}\u001E";
64+
65+
assertEquals(expectedResult, result);
66+
}
67+
5968
@Test
6069
public void parseCloseMessage() {
6170
String stringifiedMessage = "{\"type\":7}\u001E";

src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4022,4 +4022,21 @@ public void WebsocketStopLoggedOnce() {
40224022
assertEquals(1, count);
40234023
}
40244024
}
4025+
4026+
@Test
4027+
public void sendsCloseMessageOnStop() throws InterruptedException {
4028+
MockTransport mockTransport = new MockTransport(true, false);
4029+
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
4030+
4031+
hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();
4032+
4033+
hubConnection.stop().timeout(30, TimeUnit.SECONDS).blockingAwait();
4034+
4035+
ByteBuffer[] messages = mockTransport.getSentMessages();
4036+
4037+
// handshake, close
4038+
assertEquals(2, messages.length);
4039+
String message = TestUtils.byteBufferToString(messages[1]);
4040+
assertEquals("{\"type\":7,\"allowReconnect\":false}" + RECORD_SEPARATOR, message);
4041+
}
40254042
}

src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ export class MessagePackHubProtocol implements IHubProtocol {
112112
return BinaryMessageFormat.write(SERIALIZED_PING_MESSAGE);
113113
case MessageType.CancelInvocation:
114114
return this._writeCancelInvocation(message as CancelInvocationMessage);
115+
case MessageType.Close:
116+
return this._writeClose();
115117
default:
116118
throw new Error("Invalid message type.");
117119
}
@@ -309,6 +311,12 @@ export class MessagePackHubProtocol implements IHubProtocol {
309311
return BinaryMessageFormat.write(payload.slice());
310312
}
311313

314+
private _writeClose(): ArrayBuffer {
315+
const payload = this._encoder.encode([MessageType.Close, null]);
316+
317+
return BinaryMessageFormat.write(payload.slice());
318+
}
319+
312320
private _readHeaders(properties: any): MessageHeaders {
313321
const headers: MessageHeaders = properties[1] as MessageHeaders;
314322
if (typeof headers !== "object") {

src/SignalR/clients/ts/signalr-protocol-msgpack/tests/MessagePackHubProtocol.test.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4-
import { CompletionMessage, InvocationMessage, MessageType, NullLogger, StreamItemMessage } from "@microsoft/signalr";
4+
import { CloseMessage, CompletionMessage, InvocationMessage, MessageType, NullLogger, StreamItemMessage } from "@microsoft/signalr";
55
import { MessagePackHubProtocol } from "../src/MessagePackHubProtocol";
66

77
describe("MessagePackHubProtocol", () => {
@@ -273,4 +273,15 @@ describe("MessagePackHubProtocol", () => {
273273
type: 1,
274274
});
275275
});
276+
277+
it("can write/read Close message", () => {
278+
const closeMessage = {
279+
type: MessageType.Close,
280+
} as CloseMessage;
281+
282+
const protocol = new MessagePackHubProtocol();
283+
const parsedMessages = protocol.parseMessages(protocol.writeMessage(closeMessage), NullLogger.instance);
284+
expect(parsedMessages.length).toEqual(1);
285+
expect(parsedMessages[0].type).toEqual(MessageType.Close);
286+
});
276287
});

src/SignalR/clients/ts/signalr/src/HubConnection.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import { HandshakeProtocol, HandshakeRequestMessage, HandshakeResponseMessage } from "./HandshakeProtocol";
55
import { IConnection } from "./IConnection";
66
import { AbortError } from "./Errors";
7-
import { CancelInvocationMessage, CompletionMessage, IHubProtocol, InvocationMessage, MessageType, StreamInvocationMessage, StreamItemMessage } from "./IHubProtocol";
7+
import { CancelInvocationMessage, CloseMessage, CompletionMessage, IHubProtocol, InvocationMessage, MessageType, StreamInvocationMessage, StreamItemMessage } from "./IHubProtocol";
88
import { ILogger, LogLevel } from "./ILogger";
99
import { IRetryPolicy } from "./IRetryPolicy";
1010
import { IStreamResult } from "./Stream";
@@ -294,6 +294,7 @@ export class HubConnection {
294294
return this._stopPromise!;
295295
}
296296

297+
const state = this._connectionState;
297298
this._connectionState = HubConnectionState.Disconnecting;
298299

299300
this._logger.log(LogLevel.Debug, "Stopping HubConnection.");
@@ -311,6 +312,11 @@ export class HubConnection {
311312
return Promise.resolve();
312313
}
313314

315+
if (state === HubConnectionState.Connected) {
316+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
317+
this._sendCloseMessage();
318+
}
319+
314320
this._cleanupTimeout();
315321
this._cleanupPingTimer();
316322
this._stopDuringStartError = error || new AbortError("The connection was stopped before the hub handshake could complete.");
@@ -321,6 +327,14 @@ export class HubConnection {
321327
return this.connection.stop(error);
322328
}
323329

330+
private async _sendCloseMessage() {
331+
try {
332+
await this._sendWithProtocol(this._createCloseMessage());
333+
} catch {
334+
// Ignore, this is a best effort attempt to let the server know the client closed gracefully.
335+
}
336+
}
337+
324338
/** Invokes a streaming hub method on the server using the specified name and arguments.
325339
*
326340
* @typeparam T The type of the items returned by the server.
@@ -1077,4 +1091,8 @@ export class HubConnection {
10771091
type: MessageType.Completion,
10781092
};
10791093
}
1094+
1095+
private _createCloseMessage(): CloseMessage {
1096+
return { type: MessageType.Close };
1097+
}
10801098
}

src/SignalR/clients/ts/signalr/src/LongPollingTransport.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,26 @@ export class LongPollingTransport implements ITransport {
175175
timeout: this._options.timeout,
176176
withCredentials: this._options.withCredentials,
177177
};
178-
await this._httpClient.delete(this._url!, deleteOptions);
179178

180-
this._logger.log(LogLevel.Trace, "(LongPolling transport) DELETE request sent.");
179+
let error;
180+
try {
181+
await this._httpClient.delete(this._url!, deleteOptions);
182+
} catch (err) {
183+
error = err;
184+
}
185+
186+
if (error) {
187+
if (error instanceof HttpError) {
188+
if (error.statusCode === 404) {
189+
this._logger.log(LogLevel.Trace, "(LongPolling transport) A 404 response was returned from sending a DELETE request.");
190+
} else {
191+
this._logger.log(LogLevel.Trace, `(LongPolling transport) Error sending a DELETE request: ${error}`);
192+
}
193+
}
194+
} else {
195+
this._logger.log(LogLevel.Trace, "(LongPolling transport) DELETE request accepted.");
196+
}
197+
181198
} finally {
182199
this._logger.log(LogLevel.Trace, "(LongPolling transport) Stop finished.");
183200

src/SignalR/clients/ts/signalr/tests/HubConnection.Reconnect.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -768,10 +768,10 @@ describe("auto reconnect", () => {
768768
const connection = new TestConnection();
769769
const hubConnection = HubConnection.create(connection, logger, new JsonHubProtocol(), new DefaultReconnectPolicy());
770770
try {
771-
let isClosed = false;
771+
const p = new PromiseSource<void>();
772772
let closeError: Error | undefined;
773773
hubConnection.onclose((e) => {
774-
isClosed = true;
774+
p.resolve();
775775
closeError = e;
776776
});
777777

@@ -782,7 +782,7 @@ describe("auto reconnect", () => {
782782
type: MessageType.Close,
783783
});
784784

785-
expect(isClosed).toEqual(true);
785+
await p;
786786
expect(closeError!.message).toEqual("Server returned an error on close: Error!");
787787
} finally {
788788
await hubConnection.stop();

0 commit comments

Comments
 (0)