Skip to content

Commit 0ce246f

Browse files
committed
update monitor
1 parent d46fc15 commit 0ce246f

File tree

1 file changed

+209
-30
lines changed

1 file changed

+209
-30
lines changed

src/sdam/monitor.ts

Lines changed: 209 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
105105
rttPinger?: RTTPinger;
106106
/** @internal */
107107
override component = MongoLoggableComponent.TOPOLOGY;
108-
private rttSamplesMS: MovingWindow;
108+
/** @internal */
109+
rttSamplesMS: MovingWindow;
109110

110111
constructor(server: Server, options: MonitorOptions) {
111112
super();
@@ -244,6 +245,8 @@ function resetMonitorState(monitor: Monitor) {
244245

245246
monitor.connection?.destroy();
246247
monitor.connection = null;
248+
249+
monitor.clearRttSamples();
247250
}
248251

249252
function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion | null): boolean {
@@ -277,7 +280,6 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
277280
function onHeartbeatFailed(err: Error) {
278281
monitor.connection?.destroy();
279282
monitor.connection = null;
280-
281283
monitor.emitAndLogHeartbeat(
282284
Server.SERVER_HEARTBEAT_FAILED,
283285
monitor[kServer].topology.s.id,
@@ -306,7 +308,11 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
306308
// NOTE: here we use the latestRTT as this measurment corresponds with the value
307309
// obtained for this successful heartbeat
308310
const duration =
309-
isAwaitable && monitor.rttPinger ? monitor.rttPinger.latestRTT : calculateDurationInMs(start);
311+
isAwaitable && monitor.rttPinger
312+
? monitor.rttPinger.latestRTT ?? calculateDurationInMs(start)
313+
: calculateDurationInMs(start);
314+
315+
monitor.addRttSample(duration);
310316

311317
monitor.emitAndLogHeartbeat(
312318
Server.SERVER_HEARTBEAT_SUCCEEDED,
@@ -406,6 +412,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
406412
connection.destroy();
407413
return;
408414
}
415+
const duration = calculateDurationInMs(start);
416+
monitor.addRttSample(duration);
409417

410418
monitor.connection = connection;
411419
monitor.emitAndLogHeartbeat(
@@ -414,7 +422,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
414422
connection.hello?.connectionId,
415423
new ServerHeartbeatSucceededEvent(
416424
monitor.address,
417-
calculateDurationInMs(start),
425+
duration,
418426
connection.hello,
419427
useStreamingProtocol(monitor, connection.hello?.topologyVersion)
420428
)
@@ -430,6 +438,173 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
430438
);
431439
}
432440

441+
async function _checkServer(monitor: Monitor): Promise<Document | null> {
442+
let start: number;
443+
let awaited: boolean;
444+
const topologyVersion = monitor[kServer].description.topologyVersion;
445+
const isAwaitable = useStreamingProtocol(monitor, topologyVersion);
446+
monitor.emitAndLogHeartbeat(
447+
Server.SERVER_HEARTBEAT_STARTED,
448+
monitor[kServer].topology.s.id,
449+
undefined,
450+
new ServerHeartbeatStartedEvent(monitor.address, isAwaitable)
451+
);
452+
453+
function onHeartbeatFailed(err: Error) {
454+
monitor.connection?.destroy();
455+
monitor.connection = null;
456+
monitor.emitAndLogHeartbeat(
457+
Server.SERVER_HEARTBEAT_FAILED,
458+
monitor[kServer].topology.s.id,
459+
undefined,
460+
new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err, awaited)
461+
);
462+
463+
const error = !(err instanceof MongoError)
464+
? new MongoError(MongoError.buildErrorMessage(err), { cause: err })
465+
: err;
466+
error.addErrorLabel(MongoErrorLabel.ResetPool);
467+
if (error instanceof MongoNetworkTimeoutError) {
468+
error.addErrorLabel(MongoErrorLabel.InterruptInUseConnections);
469+
}
470+
471+
monitor.emit('resetServer', error);
472+
}
473+
474+
function onHeartbeatSucceeded(hello: Document) {
475+
if (!('isWritablePrimary' in hello)) {
476+
// Provide hello-style response document.
477+
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
478+
}
479+
480+
// NOTE: here we use the latestRTT as this measurment corresponds with the value
481+
// obtained for this successful heartbeat
482+
const duration =
483+
isAwaitable && monitor.rttPinger
484+
? monitor.rttPinger.latestRTT ?? calculateDurationInMs(start)
485+
: calculateDurationInMs(start);
486+
487+
monitor.addRttSample(duration);
488+
489+
monitor.emitAndLogHeartbeat(
490+
Server.SERVER_HEARTBEAT_SUCCEEDED,
491+
monitor[kServer].topology.s.id,
492+
hello.connectionId,
493+
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable)
494+
);
495+
496+
if (isAwaitable) {
497+
// If we are using the streaming protocol then we immediately issue another 'started'
498+
// event, otherwise the "check" is complete and return to the main monitor loop
499+
monitor.emitAndLogHeartbeat(
500+
Server.SERVER_HEARTBEAT_STARTED,
501+
monitor[kServer].topology.s.id,
502+
undefined,
503+
new ServerHeartbeatStartedEvent(monitor.address, true)
504+
);
505+
// We have not actually sent an outgoing handshake, but when we get the next response we
506+
// want the duration to reflect the time since we last heard from the server
507+
start = now();
508+
} else {
509+
monitor.rttPinger?.close();
510+
monitor.rttPinger = undefined;
511+
}
512+
}
513+
514+
const { connection } = monitor;
515+
if (connection && !connection.closed) {
516+
const { serverApi, helloOk } = connection;
517+
const connectTimeoutMS = monitor.options.connectTimeoutMS;
518+
const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS;
519+
520+
const cmd = {
521+
[serverApi?.version || helloOk ? 'hello' : LEGACY_HELLO_COMMAND]: 1,
522+
...(isAwaitable && topologyVersion
523+
? { maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) }
524+
: {})
525+
};
526+
527+
const options = isAwaitable
528+
? {
529+
socketTimeoutMS: connectTimeoutMS ? connectTimeoutMS + maxAwaitTimeMS : 0,
530+
exhaustAllowed: true
531+
}
532+
: { socketTimeoutMS: connectTimeoutMS };
533+
534+
if (isAwaitable && monitor.rttPinger == null) {
535+
monitor.rttPinger = new RTTPinger(
536+
monitor,
537+
monitor[kCancellationToken],
538+
Object.assign(
539+
{ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS },
540+
monitor.connectOptions
541+
)
542+
);
543+
}
544+
545+
// Record new start time before sending handshake
546+
start = now();
547+
548+
if (isAwaitable) {
549+
awaited = true;
550+
try {
551+
const hello = await connection.command(ns('admin.$cmd'), cmd, options);
552+
onHeartbeatSucceeded(hello);
553+
return hello;
554+
} catch (error) {
555+
onHeartbeatFailed(error);
556+
return null;
557+
}
558+
}
559+
560+
awaited = false;
561+
try {
562+
const hello = await connection.command(ns('admin.$cmd'), cmd, options);
563+
onHeartbeatSucceeded(hello);
564+
return hello;
565+
} catch (error) {
566+
onHeartbeatFailed(error);
567+
return null;
568+
}
569+
} else {
570+
const socket = await makeSocket(monitor.connectOptions);
571+
const connection = makeConnection(monitor.connectOptions, socket);
572+
// The start time is after socket creation but before the handshake
573+
start = now();
574+
try {
575+
await performInitialHandshake(connection, monitor.connectOptions);
576+
const duration = calculateDurationInMs(start);
577+
if (isInCloseState(monitor)) {
578+
connection.destroy();
579+
return null;
580+
}
581+
582+
monitor.connection = connection;
583+
monitor.addRttSample(duration);
584+
585+
monitor.emitAndLogHeartbeat(
586+
Server.SERVER_HEARTBEAT_SUCCEEDED,
587+
monitor[kServer].topology.s.id,
588+
connection.hello?.connectionId,
589+
new ServerHeartbeatSucceededEvent(
590+
monitor.address,
591+
duration,
592+
connection.hello,
593+
useStreamingProtocol(monitor, connection.hello?.topologyVersion)
594+
)
595+
);
596+
597+
return connection.hello;
598+
} catch (error) {
599+
connection.destroy();
600+
monitor.connection = null;
601+
awaited = false;
602+
onHeartbeatFailed(error);
603+
return null;
604+
}
605+
}
606+
}
607+
433608
function monitorServer(monitor: Monitor) {
434609
return (callback: Callback) => {
435610
if (monitor.s.state === STATE_MONITORING) {
@@ -491,12 +666,15 @@ export class RTTPinger {
491666
/** @internal */
492667
monitor: Monitor;
493668
closed: boolean;
669+
/** @internal */
670+
latestRTT?: number;
494671

495672
constructor(monitor: Monitor, cancellationToken: CancellationToken, options: RTTPingerOptions) {
496673
this.connection = undefined;
497674
this[kCancellationToken] = cancellationToken;
498675
this.closed = false;
499676
this.monitor = monitor;
677+
this.latestRTT = 0;
500678

501679
const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
502680
this[kMonitorId] = setTimeout(() => measureRoundTripTime(this, options), heartbeatFrequencyMS);
@@ -510,10 +688,6 @@ export class RTTPinger {
510688
return this.monitor.minRoundTripTime;
511689
}
512690

513-
get latestRTT(): number {
514-
return this.monitor.latestRTT;
515-
}
516-
517691
close(): void {
518692
this.closed = true;
519693
clearTimeout(this[kMonitorId]);
@@ -523,42 +697,48 @@ export class RTTPinger {
523697
}
524698
}
525699

526-
function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
527-
const start = now();
528-
options.cancellationToken = rttPinger[kCancellationToken];
529-
const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
530-
700+
function measureAndReschedule(
701+
rttPinger: RTTPinger,
702+
options: RTTPingerOptions,
703+
start?: number,
704+
conn?: Connection
705+
) {
706+
if (start == null) {
707+
start = now();
708+
}
531709
if (rttPinger.closed) {
710+
conn?.destroy();
532711
return;
533712
}
534713

535-
function measureAndReschedule(conn?: Connection) {
536-
if (rttPinger.closed) {
537-
conn?.destroy();
538-
return;
539-
}
714+
if (rttPinger.connection == null) {
715+
rttPinger.connection = conn;
716+
}
540717

541-
if (rttPinger.connection == null) {
542-
rttPinger.connection = conn;
543-
}
718+
rttPinger.latestRTT = calculateDurationInMs(start);
719+
rttPinger[kMonitorId] = setTimeout(
720+
() => measureRoundTripTime(rttPinger, options),
721+
options.heartbeatFrequencyMS
722+
);
723+
}
544724

545-
rttPinger.monitor.addRttSample(calculateDurationInMs(start));
546-
rttPinger[kMonitorId] = setTimeout(
547-
() => measureRoundTripTime(rttPinger, options),
548-
heartbeatFrequencyMS
549-
);
725+
function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
726+
const start = now();
727+
options.cancellationToken = rttPinger[kCancellationToken];
728+
729+
if (rttPinger.closed) {
730+
return;
550731
}
551732

552733
const connection = rttPinger.connection;
553734
if (connection == null) {
554735
// eslint-disable-next-line github/no-then
555736
connect(options).then(
556737
connection => {
557-
measureAndReschedule(connection);
738+
measureAndReschedule(rttPinger, options, start, connection);
558739
},
559740
() => {
560741
rttPinger.connection = undefined;
561-
rttPinger.monitor.clearRttSamples();
562742
}
563743
);
564744
return;
@@ -568,11 +748,10 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
568748
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
569749
// eslint-disable-next-line github/no-then
570750
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
571-
() => measureAndReschedule(),
751+
() => measureAndReschedule(rttPinger, options),
572752
() => {
573753
rttPinger.connection?.destroy();
574754
rttPinger.connection = undefined;
575-
rttPinger.monitor.clearRttSamples();
576755
return;
577756
}
578757
);

0 commit comments

Comments
 (0)