Skip to content

Commit 35eeba3

Browse files
feat(NODE-2992): consider server load during server selection (#3219)
1 parent 5e465b7 commit 35eeba3

24 files changed

+1433
-16
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ export abstract class AbstractCursor<
388388
});
389389
}
390390

391-
close(): void;
391+
close(): Promise<void>;
392392
close(callback: Callback): void;
393393
/**
394394
* @deprecated options argument is deprecated

src/sdam/server.ts

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ export interface ServerPrivate {
9393
pool: ConnectionPool;
9494
/** MongoDB server API version */
9595
serverApi?: ServerApi;
96+
/** A count of the operations currently running against the server. */
97+
operationCount: number;
9698
}
9799

98100
/** @public */
@@ -147,7 +149,8 @@ export class Server extends TypedEventEmitter<ServerEvents> {
147149
logger: new Logger('Server'),
148150
state: STATE_CLOSED,
149151
topology,
150-
pool: new ConnectionPool(poolOptions)
152+
pool: new ConnectionPool(poolOptions),
153+
operationCount: 0
151154
};
152155

153156
for (const event of [...CMAP_EVENTS, ...APM_EVENTS]) {
@@ -325,6 +328,15 @@ export class Server extends TypedEventEmitter<ServerEvents> {
325328
// NOTE: This is a hack! We can't retrieve the connections used for executing an operation
326329
// (and prevent them from being checked back in) at the point of operation execution.
327330
// This should be considered as part of the work for NODE-2882
331+
// NOTE:
332+
// When incrementing operation count, it's important that we increment it before we
333+
// attempt to check out a connection from the pool. This ensures that operations that
334+
// are waiting for a connection are included in the operation count. Load balanced
335+
// mode will only ever have a single server, so the operation count doesn't matter.
336+
// Incrementing the operation count above the logic to handle load balanced mode would
337+
// require special logic to decrement it again, or would double increment (the load
338+
// balanced code makes a recursive call). Instead, we increment the count after this
339+
// check.
328340
if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) {
329341
this.s.pool.checkOut((err, checkedOut) => {
330342
if (err || checkedOut == null) {
@@ -335,14 +347,16 @@ export class Server extends TypedEventEmitter<ServerEvents> {
335347
session.pin(checkedOut);
336348
this.command(ns, cmd, finalOptions, callback);
337349
});
338-
339350
return;
340351
}
341352

353+
this.s.operationCount += 1;
354+
342355
this.s.pool.withConnection(
343356
conn,
344357
(err, conn, cb) => {
345358
if (err || !conn) {
359+
this.s.operationCount -= 1;
346360
markServerUnknown(this, err);
347361
return cb(err);
348362
}
@@ -351,7 +365,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {
351365
ns,
352366
cmd,
353367
finalOptions,
354-
makeOperationHandler(this, conn, cmd, finalOptions, cb)
368+
makeOperationHandler(this, conn, cmd, finalOptions, (error, response) => {
369+
this.s.operationCount -= 1;
370+
cb(error, response);
371+
})
355372
);
356373
},
357374
callback
@@ -373,15 +390,26 @@ export class Server extends TypedEventEmitter<ServerEvents> {
373390
return;
374391
}
375392

393+
this.s.operationCount += 1;
394+
376395
this.s.pool.withConnection(
377396
options.session?.pinnedConnection,
378397
(err, conn, cb) => {
379398
if (err || !conn) {
399+
this.s.operationCount -= 1;
380400
markServerUnknown(this, err);
381401
return cb(err);
382402
}
383403

384-
conn.getMore(ns, cursorId, options, makeOperationHandler(this, conn, {}, options, cb));
404+
conn.getMore(
405+
ns,
406+
cursorId,
407+
options,
408+
makeOperationHandler(this, conn, {}, options, (error, response) => {
409+
this.s.operationCount -= 1;
410+
cb(error, response);
411+
})
412+
);
385413
},
386414
callback
387415
);
@@ -405,10 +433,12 @@ export class Server extends TypedEventEmitter<ServerEvents> {
405433
return;
406434
}
407435

436+
this.s.operationCount += 1;
408437
this.s.pool.withConnection(
409438
options.session?.pinnedConnection,
410439
(err, conn, cb) => {
411440
if (err || !conn) {
441+
this.s.operationCount -= 1;
412442
markServerUnknown(this, err);
413443
return cb(err);
414444
}
@@ -417,7 +447,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {
417447
ns,
418448
cursorIds,
419449
options,
420-
makeOperationHandler(this, conn, {}, undefined, cb)
450+
makeOperationHandler(this, conn, {}, undefined, (error, response) => {
451+
this.s.operationCount -= 1;
452+
cb(error, response);
453+
})
421454
);
422455
},
423456
callback

src/sdam/topology.ts

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -843,10 +843,6 @@ function topologyTypeFromOptions(options?: TopologyOptions) {
843843
return TopologyType.Unknown;
844844
}
845845

846-
function randomSelection(array: ServerDescription[]): ServerDescription {
847-
return array[Math.floor(Math.random() * array.length)];
848-
}
849-
850846
/**
851847
* Creates new server instances and attempts to connect them
852848
*
@@ -963,13 +959,34 @@ function processWaitQueue(topology: Topology) {
963959
continue;
964960
}
965961

962+
let selectedServer;
966963
if (selectedDescriptions.length === 0) {
967964
topology[kWaitQueue].push(waitQueueMember);
968965
continue;
966+
} else if (selectedDescriptions.length === 1) {
967+
selectedServer = topology.s.servers.get(selectedDescriptions[0].address);
968+
} else {
969+
// don't shuffle the array if there are only two elements
970+
const descriptions =
971+
selectedDescriptions.length === 2 ? selectedDescriptions : shuffle(selectedDescriptions, 2);
972+
const server1 = topology.s.servers.get(descriptions[0].address);
973+
const server2 = topology.s.servers.get(descriptions[1].address);
974+
975+
selectedServer =
976+
server1 && server2 && server1.s.operationCount < server2.s.operationCount
977+
? server1
978+
: server2;
969979
}
970980

971-
const selectedServerDescription = randomSelection(selectedDescriptions);
972-
const selectedServer = topology.s.servers.get(selectedServerDescription.address);
981+
if (!selectedServer) {
982+
waitQueueMember.callback(
983+
new MongoServerSelectionError(
984+
'server selection returned a server description but the server was not found in the topology',
985+
topology.description
986+
)
987+
);
988+
return;
989+
}
973990
const transaction = waitQueueMember.transaction;
974991
if (isSharded && transaction && transaction.isActive && selectedServer) {
975992
transaction.pinServer(selectedServer);

0 commit comments

Comments
 (0)