Skip to content

refactor(NODE-5675): refactor server selection and connection checkout to use abort signals for timeout management #3890

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 26 additions & 30 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import {
} from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { type Callback, eachAsync, List, makeCounter } from '../utils';
import { type Callback, eachAsync, List, makeCounter, TimeoutController } from '../utils';
import { AUTH_PROVIDERS, connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -101,7 +101,7 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
/** @internal */
export interface WaitQueueMember {
callback: Callback<Connection>;
timer?: NodeJS.Timeout;
timeoutController: TimeoutController;
[kCancelled]?: boolean;
}

Expand Down Expand Up @@ -356,27 +356,29 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
new ConnectionCheckOutStartedEvent(this)
);

const waitQueueMember: WaitQueueMember = { callback };
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
if (waitQueueTimeoutMS) {
waitQueueMember.timer = setTimeout(() => {
waitQueueMember[kCancelled] = true;
waitQueueMember.timer = undefined;

this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout')
);
waitQueueMember.callback(
new WaitQueueTimeoutError(
this.loadBalanced
? this.waitQueueErrorMetrics()
: 'Timed out while checking out a connection from connection pool',
this.address
)
);
}, waitQueueTimeoutMS);
}
const waitQueueMember: WaitQueueMember = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might ask - why are we using AbortControllers and then manually consuming their signal, instead of making ConnectionPool.checkOut accept an abort signal? Really, there are a few reasons but it's primarily with the eventual state of CSOT in mind (note: everything here applies to server selection as well):

  • We need to clear any timeouts we set for connection checkout after we successfully obtain a connection. Abort signals do not provide a "cancellation" interface - and if they did, it would break the Controller/Signal abstraction. Controllers control signals and signals notify other components of abortion.
  • Given the above - in later work we will be introducing a TimeoutControllerFactory that is a property on the OperationContext. So eventually it will look like:
const waitQueueMember: WaitQueueMember = {
      callback,
      timeoutController: operationContext.timeoutFactory.controllerForConnectionCheckout()
}
  • To answer "why don't we make connection checkout accept a signal instead and pass it in?" - it's two fold. First - we need to cancel the controller like the first bullet mentions. Second - mostly preference, but in terms of encapsulation, I'd rather have connection checkout be responsible for managing its own timeout. If we pass a signal in as an option, we'd need to construct a timeout signal outside of every place we call ConnectionPool.checkout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, considering the perspective how this piece of code relates to CSOT as a whole. What was the reasoning for not including this information in, say, the PR description so that team reviewers will be able to access it easier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just preference. I try to answer questions that reviewers might have as they're reviewing the code in advance of anyone reviewing it. I've found the most effective way to do that is by adding comments at the places I think reviewers might have questions

callback,
timeoutController: new TimeoutController(waitQueueTimeoutMS)
};
waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
waitQueueMember[kCancelled] = true;
waitQueueMember.timeoutController.clear();

this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout')
);
waitQueueMember.callback(
new WaitQueueTimeoutError(
this.loadBalanced
? this.waitQueueErrorMetrics()
: 'Timed out while checking out a connection from connection pool',
this.address
)
);
});

this[kWaitQueue].push(waitQueueMember);
process.nextTick(() => this.processWaitQueue());
Expand Down Expand Up @@ -831,9 +833,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, reason, error)
);
if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
waitQueueMember.timeoutController.clear();
this[kWaitQueue].shift();
waitQueueMember.callback(error);
continue;
Expand All @@ -854,9 +854,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(this, connection)
);
if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
waitQueueMember.timeoutController.clear();

this[kWaitQueue].shift();
waitQueueMember.callback(undefined, connection);
Expand Down Expand Up @@ -893,9 +891,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
);
}

if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
waitQueueMember.timeoutController.clear();
waitQueueMember.callback(err, connection);
}
process.nextTick(() => this.processWaitQueue());
Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ export type {
HostAddress,
List,
MongoDBCollectionNamespace,
MongoDBNamespace
MongoDBNamespace,
TimeoutController
} from './utils';
export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern';
42 changes: 17 additions & 25 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { clearTimeout, setTimeout } from 'timers';
import { promisify } from 'util';

import type { BSONSerializeOptions, Document } from '../bson';
Expand Down Expand Up @@ -43,7 +42,8 @@ import {
List,
makeStateMachine,
ns,
shuffle
shuffle,
TimeoutController
} from '../utils';
import {
_advanceClusterTime,
Expand Down Expand Up @@ -94,8 +94,8 @@ export interface ServerSelectionRequest {
serverSelector: ServerSelector;
transaction?: Transaction;
callback: ServerSelectionCallback;
timer?: NodeJS.Timeout;
[kCancelled]?: boolean;
timeoutController: TimeoutController;
}

/** @internal */
Expand Down Expand Up @@ -556,22 +556,20 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
const waitQueueMember: ServerSelectionRequest = {
serverSelector,
transaction,
callback
callback,
timeoutController: new TimeoutController(options.serverSelectionTimeoutMS)
};

const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS;
if (serverSelectionTimeoutMS) {
waitQueueMember.timer = setTimeout(() => {
waitQueueMember[kCancelled] = true;
waitQueueMember.timer = undefined;
const timeoutError = new MongoServerSelectionError(
`Server selection timed out after ${serverSelectionTimeoutMS} ms`,
this.description
);
waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
waitQueueMember[kCancelled] = true;
waitQueueMember.timeoutController.clear();
const timeoutError = new MongoServerSelectionError(
`Server selection timed out after ${options.serverSelectionTimeoutMS} ms`,
this.description
);

waitQueueMember.callback(timeoutError);
}, serverSelectionTimeoutMS);
}
waitQueueMember.callback(timeoutError);
});

this[kWaitQueue].push(waitQueueMember);
processWaitQueue(this);
Expand Down Expand Up @@ -842,9 +840,7 @@ function drainWaitQueue(queue: List<ServerSelectionRequest>, err?: MongoDriverEr
continue;
}

if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
waitQueueMember.timeoutController.clear();

if (!waitQueueMember[kCancelled]) {
waitQueueMember.callback(err);
Expand Down Expand Up @@ -878,9 +874,7 @@ function processWaitQueue(topology: Topology) {
? serverSelector(topology.description, serverDescriptions)
: serverDescriptions;
} catch (e) {
if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
waitQueueMember.timeoutController.clear();

waitQueueMember.callback(e);
continue;
Expand Down Expand Up @@ -917,9 +911,7 @@ function processWaitQueue(topology: Topology) {
transaction.pinServer(selectedServer);
}

if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
waitQueueMember.timeoutController.clear();

waitQueueMember.callback(undefined, selectedServer);
}
Expand Down
28 changes: 28 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as crypto from 'crypto';
import type { SrvRecord } from 'dns';
import * as http from 'http';
import { clearTimeout, setTimeout } from 'timers';
import * as url from 'url';
import { URL } from 'url';

Expand Down Expand Up @@ -1254,3 +1255,30 @@ export async function request(
req.end();
});
}

/**
* A custom AbortController that aborts after a specified timeout.
*
* If `timeout` is undefined or \<=0, the abort controller never aborts.
*
* This class provides two benefits over the built-in AbortSignal.timeout() method.
* - This class provides a mechanism for cancelling the timeout
* - This class supports infinite timeouts by interpreting a timeout of 0 as infinite. This is
* consistent with existing timeout options in the Node driver (serverSelectionTimeoutMS, for example).
* @internal
*/
export class TimeoutController extends AbortController {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimeoutController intentionally returns an "infinite" timeout when timeout is <=0 or undefined. This unifies code paths for existing timeouts. We now always create a timeout controller to manage server selection and connection checkout, but only if a non-zero positive timeout was provided do we actually create a Nodejs timeout that cancels the request.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense. Do you feel it would be beneficial to also capture this behavior in the PR description or the ticket information?

constructor(
timeout = 0,
private timeoutId = timeout > 0 ? setTimeout(() => this.abort(), timeout) : null
) {
super();
}

clear() {
if (this.timeoutId != null) {
clearTimeout(this.timeoutId);
}
this.timeoutId = null;
}
}
66 changes: 65 additions & 1 deletion test/unit/utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { expect } from 'chai';
import * as sinon from 'sinon';

import {
BufferPool,
Expand All @@ -16,8 +17,10 @@ import {
MongoDBNamespace,
MongoRuntimeError,
ObjectId,
shuffle
shuffle,
TimeoutController
} from '../mongodb';
import { createTimerSandbox } from './timer_sandbox';

describe('driver utils', function () {
describe('.hostMatchesWildcards', function () {
Expand Down Expand Up @@ -1101,4 +1104,65 @@ describe('driver utils', function () {
});
});
});

describe('class TimeoutController', () => {
let timerSandbox, clock, spy;

beforeEach(function () {
timerSandbox = createTimerSandbox();
clock = sinon.useFakeTimers();
spy = sinon.spy();
});

afterEach(function () {
clock.restore();
timerSandbox.restore();
});

describe('constructor', () => {
it('when no timeout is provided, it creates an infinite timeout', () => {
const controller = new TimeoutController();
// @ts-expect-error Accessing a private field on TimeoutController
expect(controller.timeoutId).to.be.null;
});

it('when timeout is 0, it creates an infinite timeout', () => {
const controller = new TimeoutController(0);
// @ts-expect-error Accessing a private field on TimeoutController
expect(controller.timeoutId).to.be.null;
});

it('when timeout <0, it creates an infinite timeout', () => {
const controller = new TimeoutController(-5);
// @ts-expect-error Accessing a private field on TimeoutController
expect(controller.timeoutId).to.be.null;
});

context('when timeout > 0', () => {
let timeoutController: TimeoutController;

beforeEach(function () {
timeoutController = new TimeoutController(3000);
timeoutController.signal.addEventListener('abort', spy);
});

afterEach(function () {
timeoutController.clear();
});

it('it creates a timeout', () => {
// @ts-expect-error Accessing a private field on TimeoutController
expect(timeoutController.timeoutId).not.to.be.null;
});

it('times out after `timeout` milliseconds', () => {
expect(spy, 'spy was called after creation').not.to.have.been.called;
clock.tick(2999);
expect(spy, 'spy was called before 3000ms has expired').not.to.have.been.called;
clock.tick(1);
expect(spy, 'spy was not called after 3000ms').to.have.been.called;
});
});
});
});
});