Skip to content

Commit 0a74415

Browse files
committed
fix(NODE-5749): RTTPinger always sends legacy hello
1 parent 08c9fb4 commit 0a74415

File tree

3 files changed

+122
-41
lines changed

3 files changed

+122
-41
lines changed

src/sdam/monitor.ts

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ const kConnection = Symbol('connection');
2626
/** @internal */
2727
const kCancellationToken = Symbol('cancellationToken');
2828
/** @internal */
29-
const kRTTPinger = Symbol('rttPinger');
30-
/** @internal */
3129
const kRoundTripTime = Symbol('roundTripTime');
3230

3331
const STATE_IDLE = 'idle';
@@ -81,7 +79,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
8179
[kCancellationToken]: CancellationToken;
8280
/** @internal */
8381
[kMonitorId]?: MonitorInterval;
84-
[kRTTPinger]?: RTTPinger;
82+
rttPinger?: RTTPinger;
8583

8684
get connection(): Connection | undefined {
8785
return this[kConnection];
@@ -198,8 +196,8 @@ function resetMonitorState(monitor: Monitor) {
198196
monitor[kMonitorId]?.stop();
199197
monitor[kMonitorId] = undefined;
200198

201-
monitor[kRTTPinger]?.close();
202-
monitor[kRTTPinger] = undefined;
199+
monitor.rttPinger?.close();
200+
monitor.rttPinger = undefined;
203201

204202
monitor[kCancellationToken].emit('cancel');
205203

@@ -262,8 +260,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
262260
}
263261
: { socketTimeoutMS: connectTimeoutMS };
264262

265-
if (isAwaitable && monitor[kRTTPinger] == null) {
266-
monitor[kRTTPinger] = new RTTPinger(
263+
if (isAwaitable && monitor.rttPinger == null) {
264+
monitor.rttPinger = new RTTPinger(
267265
monitor[kCancellationToken],
268266
Object.assign(
269267
{ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS },
@@ -282,9 +280,10 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
282280
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
283281
}
284282

285-
const rttPinger = monitor[kRTTPinger];
286283
const duration =
287-
isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start);
284+
isAwaitable && monitor.rttPinger
285+
? monitor.rttPinger.roundTripTime
286+
: calculateDurationInMs(start);
288287

289288
const awaited = isAwaitable && hello.topologyVersion != null;
290289
monitor.emit(
@@ -301,8 +300,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
301300
);
302301
start = now();
303302
} else {
304-
monitor[kRTTPinger]?.close();
305-
monitor[kRTTPinger] = undefined;
303+
monitor.rttPinger?.close();
304+
monitor.rttPinger = undefined;
306305

307306
callback(undefined, hello);
308307
}
@@ -399,8 +398,7 @@ export interface RTTPingerOptions extends ConnectionOptions {
399398

400399
/** @internal */
401400
export class RTTPinger {
402-
/** @internal */
403-
[kConnection]?: Connection;
401+
connection?: Connection;
404402
/** @internal */
405403
[kCancellationToken]: CancellationToken;
406404
/** @internal */
@@ -410,7 +408,7 @@ export class RTTPinger {
410408
closed: boolean;
411409

412410
constructor(cancellationToken: CancellationToken, options: RTTPingerOptions) {
413-
this[kConnection] = undefined;
411+
this.connection = undefined;
414412
this[kCancellationToken] = cancellationToken;
415413
this[kRoundTripTime] = 0;
416414
this.closed = false;
@@ -427,8 +425,8 @@ export class RTTPinger {
427425
this.closed = true;
428426
clearTimeout(this[kMonitorId]);
429427

430-
this[kConnection]?.destroy({ force: true });
431-
this[kConnection] = undefined;
428+
this.connection?.destroy({ force: true });
429+
this.connection = undefined;
432430
}
433431
}
434432

@@ -447,8 +445,8 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
447445
return;
448446
}
449447

450-
if (rttPinger[kConnection] == null) {
451-
rttPinger[kConnection] = conn;
448+
if (rttPinger.connection == null) {
449+
rttPinger.connection = conn;
452450
}
453451

454452
rttPinger[kRoundTripTime] = calculateDurationInMs(start);
@@ -458,11 +456,11 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
458456
);
459457
}
460458

461-
const connection = rttPinger[kConnection];
459+
const connection = rttPinger.connection;
462460
if (connection == null) {
463461
connect(options, (err, conn) => {
464462
if (err) {
465-
rttPinger[kConnection] = undefined;
463+
rttPinger.connection = undefined;
466464
rttPinger[kRoundTripTime] = 0;
467465
return;
468466
}
@@ -473,15 +471,17 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
473471
return;
474472
}
475473

476-
connection.command(ns('admin.$cmd'), { [LEGACY_HELLO_COMMAND]: 1 }, undefined, err => {
477-
if (err) {
478-
rttPinger[kConnection] = undefined;
474+
const commandName =
475+
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
476+
connection.commandAsync(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
477+
() => measureAndReschedule(),
478+
() => {
479+
rttPinger.connection?.destroy({ force: true });
480+
rttPinger.connection = undefined;
479481
rttPinger[kRoundTripTime] = 0;
480482
return;
481483
}
482-
483-
measureAndReschedule();
484-
});
484+
);
485485
}
486486

487487
/**

src/sdam/server.ts

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,6 @@ const stateTransition = makeStateMachine({
7474
[STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED]
7575
});
7676

77-
/** @internal */
78-
const kMonitor = Symbol('monitor');
79-
8077
/** @internal */
8178
export type ServerOptions = Omit<ConnectionPoolOptions, 'id' | 'generation' | 'hostAddress'> &
8279
MonitorOptions;
@@ -119,7 +116,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
119116
serverApi?: ServerApi;
120117
hello?: Document;
121118
commandAsync: (ns: MongoDBNamespace, cmd: Document, options: CommandOptions) => Promise<Document>;
122-
[kMonitor]: Monitor | null;
119+
monitor: Monitor | null;
123120

124121
/** @event */
125122
static readonly SERVER_HEARTBEAT_STARTED = SERVER_HEARTBEAT_STARTED;
@@ -175,22 +172,20 @@ export class Server extends TypedEventEmitter<ServerEvents> {
175172
});
176173

177174
if (this.loadBalanced) {
178-
this[kMonitor] = null;
175+
this.monitor = null;
179176
// monitoring is disabled in load balancing mode
180177
return;
181178
}
182179

183180
// create the monitor
184-
// TODO(NODE-4144): Remove new variable for type narrowing
185-
const monitor = new Monitor(this, this.s.options);
186-
this[kMonitor] = monitor;
181+
this.monitor = new Monitor(this, this.s.options);
187182

188183
for (const event of HEARTBEAT_EVENTS) {
189-
monitor.on(event, (e: any) => this.emit(event, e));
184+
this.monitor.on(event, (e: any) => this.emit(event, e));
190185
}
191186

192-
monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error));
193-
monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
187+
this.monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error));
188+
this.monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
194189
this.emit(
195190
Server.DESCRIPTION_RECEIVED,
196191
new ServerDescription(this.description.hostAddress, event.reply, {
@@ -246,7 +241,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
246241
// a load balancer. It never transitions out of this state and
247242
// has no monitor.
248243
if (!this.loadBalanced) {
249-
this[kMonitor]?.connect();
244+
this.monitor?.connect();
250245
} else {
251246
stateTransition(this, STATE_CONNECTED);
252247
this.emit(Server.CONNECT, this);
@@ -272,7 +267,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
272267
stateTransition(this, STATE_CLOSING);
273268

274269
if (!this.loadBalanced) {
275-
this[kMonitor]?.close();
270+
this.monitor?.close();
276271
}
277272

278273
this.pool.close(options, err => {
@@ -290,7 +285,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
290285
*/
291286
requestCheck(): void {
292287
if (!this.loadBalanced) {
293-
this[kMonitor]?.requestCheck();
288+
this.monitor?.requestCheck();
294289
}
295290
}
296291

@@ -465,7 +460,7 @@ function markServerUnknown(server: Server, error?: MongoServerError) {
465460
}
466461

467462
if (error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError)) {
468-
server[kMonitor]?.reset();
463+
server.monitor?.reset();
469464
}
470465

471466
server.emit(
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import { expect } from 'chai';
2+
import * as semver from 'semver';
3+
import * as sinon from 'sinon';
4+
5+
import { type MongoClient } from '../../mongodb';
6+
import { sleep } from '../../tools/utils';
7+
8+
describe('class RTTPinger', () => {
9+
afterEach(() => sinon.restore());
10+
11+
context('when serverApi is enabled', () => {
12+
let serverApiClient: MongoClient;
13+
beforeEach(async function () {
14+
if (semver.gte('5.0.0', this.configuration.version)) {
15+
if (this.currentTest)
16+
this.currentTest.skipReason = 'Test requires serverApi, needs to be on MongoDB 5.0+';
17+
return this.skip();
18+
}
19+
20+
serverApiClient = this.configuration.newClient(
21+
{},
22+
{ serverApi: { version: '1', strict: true }, heartbeatFrequencyMS: 10 }
23+
);
24+
});
25+
26+
afterEach(async () => {
27+
await serverApiClient?.close();
28+
});
29+
30+
it('measures rtt with a hello command', async function () {
31+
await serverApiClient.connect();
32+
await sleep(1001); // rttPinger creation
33+
34+
const rttPingers = Array.from(serverApiClient.topology?.s.servers.values() ?? [], s => {
35+
if (s.monitor?.rttPinger) return s.monitor?.rttPinger;
36+
else expect.fail('expected rttPinger to be defined');
37+
});
38+
39+
await sleep(11); // rttPinger connection creation
40+
41+
const spies = rttPingers.map(rtt => sinon.spy(rtt.connection!, 'command'));
42+
43+
await sleep(11); // allow for another ping after spies have been made
44+
45+
expect(spies).to.have.length.above(1);
46+
for (const spy of spies) {
47+
expect(spy).to.have.been.calledWith(sinon.match.any, { hello: 1 }, sinon.match.any);
48+
}
49+
});
50+
});
51+
52+
context('when rtt hello receives an error', () => {
53+
let client: MongoClient;
54+
beforeEach(async function () {
55+
client = this.configuration.newClient({}, { heartbeatFrequencyMS: 10 });
56+
});
57+
58+
afterEach(async () => {
59+
await client?.close();
60+
});
61+
62+
it('destroys the connection', async function () {
63+
await client.connect();
64+
await sleep(1001); // rttPinger creation
65+
66+
const rttPingers = Array.from(client.topology?.s.servers.values() ?? [], s => {
67+
if (s.monitor?.rttPinger) return s.monitor?.rttPinger;
68+
else expect.fail('expected rttPinger to be defined');
69+
});
70+
71+
await sleep(11); // rttPinger connection creation
72+
73+
const spies = rttPingers.map(rtt =>
74+
sinon.stub(rtt.connection!, 'command').yieldsRight(new Error('any'))
75+
);
76+
const destroySpies = rttPingers.map(rtt => sinon.spy(rtt.connection!, 'destroy'));
77+
78+
await sleep(11); // allow for another ping after spies have been made
79+
80+
expect(destroySpies).to.have.length.above(1);
81+
for (const spy of spies) {
82+
expect(spy).to.have.been.called;
83+
}
84+
});
85+
});
86+
});

0 commit comments

Comments
 (0)