Skip to content

Commit ca2118d

Browse files
committed
introduce timeout abstraction
1 parent 232bf3c commit ca2118d

File tree

5 files changed

+157
-1
lines changed

5 files changed

+157
-1
lines changed

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,7 @@ export type {
547547
WithTransactionCallback
548548
} from './sessions';
549549
export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort';
550+
export type { Timeout } from './timeout';
550551
export type { Transaction, TransactionOptions, TxnState } from './transactions';
551552
export type {
552553
BufferPool,

src/sdam/topology.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mong
3333
import { TypedEventEmitter } from '../mongo_types';
3434
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3535
import type { ClientSession } from '../sessions';
36+
import { type Timeout } from '../timeout';
3637
import type { Transaction } from '../transactions';
3738
import {
3839
type Callback,
@@ -178,6 +179,8 @@ export interface SelectServerOptions {
178179
session?: ClientSession;
179180
operationName: string;
180181
previousServer?: ServerDescription;
182+
/** @internal*/
183+
timeout?: Timeout | null;
181184
}
182185

183186
/** @public */
@@ -623,7 +626,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
623626
this[kWaitQueue].push(waitQueueMember);
624627
processWaitQueue(this);
625628

626-
return await serverPromise;
629+
return await (options.timeout ? Promise.race([options.timeout, serverPromise]) : serverPromise);
627630
}
628631
/**
629632
* Update the internal TopologyDescription with a ServerDescription

src/timeout.ts

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import { clearTimeout, setTimeout } from 'timers';
2+
3+
import { MongoError } from './error';
4+
import { noop } from './utils';
5+
6+
/** @internal */
7+
export class CSOTError extends MongoError {
8+
override get name(): 'CSOTError' {
9+
return 'CSOTError';
10+
}
11+
12+
constructor(message: string, options?: { cause?: Error }) {
13+
super(message, options);
14+
}
15+
16+
static is(error: unknown): error is CSOTError {
17+
return (
18+
error != null && typeof error === 'object' && 'name' in error && error.name === 'CSOTError'
19+
);
20+
}
21+
22+
static from(error: CSOTError) {
23+
return new CSOTError(error.message, { cause: error });
24+
}
25+
}
26+
27+
/** @internal
28+
* This class is an abstraction over CSOT timeouts, implementing the specification outlined in
29+
* https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/client-side-operations-timeout.md
30+
* */
31+
export class Timeout extends Promise<never> {
32+
get [Symbol.toStringTag](): 'MongoDBTimeout' {
33+
return 'MongoDBTimeout';
34+
}
35+
36+
private expireTimeout: () => void;
37+
private timeoutError: CSOTError;
38+
private id: Parameters<typeof clearTimeout>[0];
39+
40+
public start: number;
41+
public ended: number | null = null;
42+
public duration: number;
43+
public timedOut = false;
44+
45+
/**
46+
* Return the amount of time remaining until a CSOTError is thrown
47+
* */
48+
public get remainingTime(): number {
49+
if (this.duration === 0) return Infinity;
50+
if (this.timedOut) return 0;
51+
const timePassed = Math.trunc(performance.now()) - this.start;
52+
return Math.max(0, this.duration - timePassed);
53+
}
54+
55+
private constructor(
56+
executor: ConstructorParameters<typeof Promise<never>>[0] = () => null,
57+
duration = 0
58+
) {
59+
// for a promise constructed as follows new Promise((resolve: (a) => void, reject: (b) => void){})
60+
// reject here is of type: typeof(reject)
61+
let reject!: Parameters<ConstructorParameters<typeof Promise<never>>[0]>[1];
62+
63+
super((_, promiseReject) => {
64+
reject = promiseReject;
65+
executor(noop, promiseReject);
66+
});
67+
68+
// NOTE: Construct timeout error at point of Timeout instantiation to preserve stack traces
69+
this.timeoutError = new CSOTError('Timeout!');
70+
71+
this.expireTimeout = () => {
72+
this.ended = Math.trunc(performance.now());
73+
this.timedOut = true;
74+
// NOTE: Wrap error here: Why?
75+
reject(CSOTError.from(this.timeoutError));
76+
};
77+
78+
this.duration = duration;
79+
this.start = Math.trunc(performance.now());
80+
if (this.duration > 0) {
81+
this.id = setTimeout(this.expireTimeout, this.duration);
82+
// I see no reason CSOT should keep Node.js running, that's for the sockets to do
83+
if (typeof this.id.unref === 'function') {
84+
this.id.unref();
85+
}
86+
}
87+
}
88+
89+
public clear(): void {
90+
clearTimeout(this.id);
91+
this.id = undefined;
92+
}
93+
94+
/** Start the timer over, this only has effect if the timer has not expired. */
95+
public refresh() {
96+
if (this.timedOut) return;
97+
if (this.duration <= 0) return;
98+
99+
this.start = Math.trunc(performance.now());
100+
if (
101+
this.id != null &&
102+
typeof this.id === 'object' &&
103+
'refresh' in this.id &&
104+
typeof this.id?.refresh === 'function'
105+
) {
106+
this.id.refresh();
107+
return;
108+
}
109+
110+
clearTimeout(this.id);
111+
this.id = setTimeout(this.expireTimeout, this.duration);
112+
if (typeof this.id.unref === 'function') {
113+
this.id.unref();
114+
}
115+
}
116+
117+
/**
118+
* Implement maxTimeMS calculation detailed in https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/client-side-operations-timeout.md#command-execution
119+
* */
120+
public getMaxTimeMS(minRoundTripTime: number): any {
121+
if (!Number.isFinite(this.remainingTime)) return 0;
122+
if (minRoundTripTime < this.remainingTime) return this.remainingTime - minRoundTripTime;
123+
throw CSOTError.from(this.timeoutError);
124+
}
125+
126+
/** Create a new pending Timeout with the same duration */
127+
public clone() {
128+
return Timeout.expires(this.duration);
129+
}
130+
131+
/** Create a new timeout that expires in `duration` ms */
132+
public static expires(duration: number): Timeout {
133+
return new Timeout(undefined, duration);
134+
}
135+
136+
static is(timeout: unknown): timeout is Timeout {
137+
return (
138+
typeof timeout === 'object' &&
139+
timeout != null &&
140+
Symbol.toStringTag in timeout &&
141+
timeout[Symbol.toStringTag] === 'MongoDBTimeout' &&
142+
'then' in timeout &&
143+
// eslint-disable-next-line github/no-then
144+
typeof timeout.then === 'function'
145+
);
146+
}
147+
}

src/utils.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,3 +1344,7 @@ export async function fileIsAccessible(fileName: string, mode?: number) {
13441344
return false;
13451345
}
13461346
}
1347+
1348+
export function noop() {
1349+
return;
1350+
}

test/mongodb.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ export * from '../src/sdam/topology';
201201
export * from '../src/sdam/topology_description';
202202
export * from '../src/sessions';
203203
export * from '../src/sort';
204+
export * from '../src/timeout';
204205
export * from '../src/transactions';
205206
export * from '../src/utils';
206207
export * from '../src/write_concern';

0 commit comments

Comments
 (0)