Skip to content

Commit 2072985

Browse files
committed
add CSOT logic to server selection
1 parent 165918f commit 2072985

File tree

9 files changed

+34
-13
lines changed

9 files changed

+34
-13
lines changed

src/cmap/connection.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3030
import { ServerType } from '../sdam/common';
3131
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
32+
import { type Timeout } from '../timeout';
3233
import {
3334
BufferPool,
3435
calculateDurationInMs,
@@ -88,6 +89,8 @@ export interface CommandOptions extends BSONSerializeOptions {
8889
writeConcern?: WriteConcern;
8990

9091
directConnection?: boolean;
92+
93+
timeout?: Timeout | null;
9194
}
9295

9396
/** @public */

src/cmap/wire_protocol/on_data.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { type EventEmitter } from 'events';
22

3+
import { type Timeout } from '../../timeout';
34
import { List, promiseWithResolvers } from '../../utils';
45

56
/**
@@ -18,7 +19,7 @@ type PendingPromises = Omit<
1819
* Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
1920
* It will reject upon an error event.
2021
*/
21-
export function onData(emitter: EventEmitter) {
22+
export function onData(emitter: EventEmitter, options?: { timeout?: Timeout | null }) {
2223
// Setup pending events and pending promise lists
2324
/**
2425
* When the caller has not yet called .next(), we store the
@@ -86,6 +87,8 @@ export function onData(emitter: EventEmitter) {
8687
// Adding event handlers
8788
emitter.on('data', eventHandler);
8889
emitter.on('error', errorHandler);
90+
// eslint-disable-next-line github/no-then
91+
options?.timeout?.then(() => null, errorHandler);
8992

9093
return iterator;
9194

src/operations/bulk_write.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,12 @@ export class BulkWriteOperation extends AbstractOperation<BulkWriteResult> {
3636
): Promise<BulkWriteResult> {
3737
const coll = this.collection;
3838
const operations = this.operations;
39-
const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference };
39+
const options = {
40+
...this.options,
41+
...this.bsonOptions,
42+
readPreference: this.readPreference,
43+
timeout: this.timeout
44+
};
4045

4146
// Create the bulk operation
4247
const bulk: BulkOperationBase =

src/operations/command.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
118118
...this.options,
119119
...this.bsonOptions,
120120
readPreference: this.readPreference,
121-
session
121+
session,
122+
timeout: this.timeout
122123
};
123124

124125
const serverWireVersion = maxWireVersion(server);

src/operations/execute_operation.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,8 @@ async function retryOperation<
260260
const server = await topology.selectServer(selector, {
261261
session,
262262
operationName: operation.commandName,
263-
previousServer
263+
previousServer,
264+
timeout: operation.timeout
264265
});
265266

266267
if (isWriteOperation && !supportsRetryableWrites(server)) {

src/operations/find.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ export class FindOperation extends CommandOperation<Document> {
115115
...this.options,
116116
...this.bsonOptions,
117117
documentsReturnedIn: 'firstBatch',
118-
session
118+
session,
119+
timeout: this.timeout
119120
});
120121
}
121122
}

src/operations/operation.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
22
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
33
import type { Server } from '../sdam/server';
44
import type { ClientSession } from '../sessions';
5+
import { Timeout } from '../timeout';
56
import type { MongoDBNamespace } from '../utils';
67

78
export const Aspect = {
@@ -63,7 +64,10 @@ export abstract class AbstractOperation<TResult = any> {
6364

6465
[kSession]: ClientSession | undefined;
6566

67+
timeout?: Timeout | null;
68+
6669
constructor(options: OperationOptions = {}) {
70+
this.timeout = options.timeoutMS != null ? Timeout.expires(options.timeoutMS) : null;
6771
this.readPreference = this.hasAspect(Aspect.WRITE_OPERATION)
6872
? ReadPreference.primary
6973
: ReadPreference.fromOptions(options) ?? ReadPreference.primary;

src/operations/run_command.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ export type RunCommandOptions = {
1313
session?: ClientSession;
1414
/** The read preference */
1515
readPreference?: ReadPreferenceLike;
16+
/** @internal */
17+
timeoutMS?: number;
1618
} & BSONSerializeOptions;
1719

1820
/** @internal */
@@ -31,7 +33,8 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
3133
const res: TODO_NODE_3286 = await server.command(this.ns, this.command, {
3234
...this.options,
3335
readPreference: this.readPreference,
34-
session
36+
session,
37+
timeout: this.timeout
3538
});
3639
return res;
3740
}

src/sdam/topology.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ export interface SelectServerOptions {
180180
operationName: string;
181181
previousServer?: ServerDescription;
182182
/** @internal*/
183-
timeout?: Timeout;
183+
timeout?: Timeout | null;
184184
}
185185

186186
/** @public */
@@ -285,8 +285,8 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
285285

286286
const selectedHosts =
287287
options.srvMaxHosts == null ||
288-
options.srvMaxHosts === 0 ||
289-
options.srvMaxHosts >= seedlist.length
288+
options.srvMaxHosts === 0 ||
289+
options.srvMaxHosts >= seedlist.length
290290
? seedlist
291291
: shuffle(seedlist, options.srvMaxHosts);
292292

@@ -932,10 +932,10 @@ function processWaitQueue(topology: Topology) {
932932
const previousServer = waitQueueMember.previousServer;
933933
selectedDescriptions = serverSelector
934934
? serverSelector(
935-
topology.description,
936-
serverDescriptions,
937-
previousServer ? [previousServer] : []
938-
)
935+
topology.description,
936+
serverDescriptions,
937+
previousServer ? [previousServer] : []
938+
)
939939
: serverDescriptions;
940940
} catch (selectorError) {
941941
waitQueueMember.timeoutController.clear();

0 commit comments

Comments
 (0)