Skip to content

feat(NODE-2992): consider server load during server selection #3219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ declare global {
apiVersion?: '1';
clientSideEncryption?: boolean;
serverless?: 'forbid' | 'allow' | 'require';
auth: 'enabled' | 'disabled';
auth?: 'enabled' | 'disabled';
};

sessions?: {
Expand Down
2 changes: 1 addition & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ export abstract class AbstractCursor<
});
}

close(): void;
close(): Promise<void>;
close(callback: Callback): void;
/**
* @deprecated options argument is deprecated
Expand Down
43 changes: 38 additions & 5 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ export interface ServerPrivate {
pool: ConnectionPool;
/** MongoDB server API version */
serverApi?: ServerApi;
/** A count of the operations currently running against the server. */
operationCount: number;
}

/** @public */
Expand Down Expand Up @@ -147,7 +149,8 @@ export class Server extends TypedEventEmitter<ServerEvents> {
logger: new Logger('Server'),
state: STATE_CLOSED,
topology,
pool: new ConnectionPool(poolOptions)
pool: new ConnectionPool(poolOptions),
operationCount: 0
};

for (const event of [...CMAP_EVENTS, ...APM_EVENTS]) {
Expand Down Expand Up @@ -325,6 +328,15 @@ export class Server extends TypedEventEmitter<ServerEvents> {
// NOTE: This is a hack! We can't retrieve the connections used for executing an operation
// (and prevent them from being checked back in) at the point of operation execution.
// This should be considered as part of the work for NODE-2882
// NOTE:
// When incrementing operation count, it's important that we increment it before we
// attempt to check out a connection from the pool. This ensures that operations that
// are waiting for a connection are included in the operation count. Load balanced
// mode will only ever have a single server, so the operation count doesn't matter.
// Incrementing the operation count above the logic to handle load balanced mode would
// require special logic to decrement it again, or would double increment (the load
// balanced code makes a recursive call). Instead, we increment the count after this
// check.
if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) {
this.s.pool.checkOut((err, checkedOut) => {
if (err || checkedOut == null) {
Expand All @@ -335,14 +347,16 @@ export class Server extends TypedEventEmitter<ServerEvents> {
session.pin(checkedOut);
this.command(ns, cmd, finalOptions, callback);
});

return;
}

this.s.operationCount += 1;

this.s.pool.withConnection(
conn,
(err, conn, cb) => {
if (err || !conn) {
this.s.operationCount -= 1;
markServerUnknown(this, err);
return cb(err);
}
Expand All @@ -351,7 +365,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {
ns,
cmd,
finalOptions,
makeOperationHandler(this, conn, cmd, finalOptions, cb)
makeOperationHandler(this, conn, cmd, finalOptions, (error, response) => {
this.s.operationCount -= 1;
cb(error, response);
})
);
},
callback
Expand All @@ -373,15 +390,26 @@ export class Server extends TypedEventEmitter<ServerEvents> {
return;
}

this.s.operationCount += 1;

this.s.pool.withConnection(
options.session?.pinnedConnection,
(err, conn, cb) => {
if (err || !conn) {
this.s.operationCount -= 1;
markServerUnknown(this, err);
return cb(err);
}

conn.getMore(ns, cursorId, options, makeOperationHandler(this, conn, {}, options, cb));
conn.getMore(
ns,
cursorId,
options,
makeOperationHandler(this, conn, {}, options, (error, response) => {
this.s.operationCount -= 1;
cb(error, response);
})
);
},
callback
);
Expand All @@ -405,10 +433,12 @@ export class Server extends TypedEventEmitter<ServerEvents> {
return;
}

this.s.operationCount += 1;
this.s.pool.withConnection(
options.session?.pinnedConnection,
(err, conn, cb) => {
if (err || !conn) {
this.s.operationCount -= 1;
markServerUnknown(this, err);
return cb(err);
}
Expand All @@ -417,7 +447,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {
ns,
cursorIds,
options,
makeOperationHandler(this, conn, {}, undefined, cb)
makeOperationHandler(this, conn, {}, undefined, (error, response) => {
this.s.operationCount -= 1;
cb(error, response);
})
);
},
callback
Expand Down
29 changes: 23 additions & 6 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -843,10 +843,6 @@ function topologyTypeFromOptions(options?: TopologyOptions) {
return TopologyType.Unknown;
}

function randomSelection(array: ServerDescription[]): ServerDescription {
return array[Math.floor(Math.random() * array.length)];
}

/**
* Creates new server instances and attempts to connect them
*
Expand Down Expand Up @@ -963,13 +959,34 @@ function processWaitQueue(topology: Topology) {
continue;
}

let selectedServer;
if (selectedDescriptions.length === 0) {
topology[kWaitQueue].push(waitQueueMember);
continue;
} else if (selectedDescriptions.length === 1) {
selectedServer = topology.s.servers.get(selectedDescriptions[0].address);
} else {
// don't shuffle the array if there are only two elements
const descriptions =
selectedDescriptions.length === 2 ? selectedDescriptions : shuffle(selectedDescriptions, 2);
const server1 = topology.s.servers.get(descriptions[0].address);
const server2 = topology.s.servers.get(descriptions[1].address);

selectedServer =
server1 && server2 && server1.s.operationCount < server2.s.operationCount
? server1
: server2;
}

const selectedServerDescription = randomSelection(selectedDescriptions);
const selectedServer = topology.s.servers.get(selectedServerDescription.address);
if (!selectedServer) {
waitQueueMember.callback(
new MongoServerSelectionError(
'server selection returned a server description but the server was not found in the topology',
topology.description
)
);
return;
}
const transaction = waitQueueMember.transaction;
if (isSharded && transaction && transaction.isActive && selectedServer) {
transaction.pinServer(selectedServer);
Expand Down
Loading