Skip to content

Commit 898b93f

Browse files
W-A-Jamesnbbeeken
authored andcommitted
refactor(NODE-6230): executeOperation to use iterative retry mechanism (#4157)
1 parent 4aa6575 commit 898b93f

File tree

1 file changed

+53
-106
lines changed

1 file changed

+53
-106
lines changed

src/operations/execute_operation.ts

Lines changed: 53 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import {
2525
import type { Topology } from '../sdam/topology';
2626
import type { ClientSession } from '../sessions';
2727
import { TimeoutContext } from '../timeout';
28-
import { squashError, supportsRetryableWrites } from '../utils';
28+
import { supportsRetryableWrites } from '../utils';
2929
import { AbstractOperation, Aspect } from './operation';
3030

3131
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
@@ -88,12 +88,6 @@ export async function executeOperation<
8888
);
8989
}
9090

91-
timeoutContext ??= TimeoutContext.create({
92-
serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
93-
waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
94-
timeoutMS: operation.options.timeoutMS
95-
});
96-
9791
const readPreference = operation.readPreference ?? ReadPreference.primary;
9892
const inTransaction = !!session?.inTransaction();
9993

@@ -206,31 +200,15 @@ async function tryOperation<
206200
selector = readPreference;
207201
}
208202

209-
const server = await topology.selectServer(selector, {
203+
let server = await topology.selectServer(selector, {
210204
session,
211205
operationName: operation.commandName,
212206
timeoutContext
213207
});
214208

215-
if (session == null) {
216-
// No session also means it is not retryable, early exit
217-
return await operation.execute(server, undefined, timeoutContext);
218-
}
219-
220-
if (!operation.hasAspect(Aspect.RETRYABLE)) {
221-
// non-retryable operation, early exit
222-
try {
223-
return await operation.execute(server, session, timeoutContext);
224-
} finally {
225-
if (session?.owner != null && session.owner === owner) {
226-
try {
227-
await session.endSession();
228-
} catch (error) {
229-
squashError(error);
230-
}
231-
}
232-
}
233-
}
209+
const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
210+
const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION);
211+
const inTransaction = session?.inTransaction() ?? false;
234212

235213
const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead;
236214

@@ -250,16 +228,42 @@ async function tryOperation<
250228
session.incrementTransactionNumber();
251229
}
252230

253-
try {
254-
return await operation.execute(server, session, timeoutContext);
255-
} catch (operationError) {
256-
if (willRetry && operationError instanceof MongoError) {
257-
return await retryOperation(operation, operationError, {
231+
// TODO(NODE-6231): implement infinite retry within CSOT timeout here
232+
const maxTries = willRetry ? 2 : 1;
233+
let previousOperationError: MongoError | undefined;
234+
let previousServer: ServerDescription | undefined;
235+
236+
// TODO(NODE-6231): implement infinite retry within CSOT timeout here
237+
for (let tries = 0; tries < maxTries; tries++) {
238+
if (previousOperationError) {
239+
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
240+
throw new MongoServerError({
241+
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
242+
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
243+
originalError: previousOperationError
244+
});
245+
}
246+
247+
if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
248+
throw previousOperationError;
249+
250+
if (hasReadAspect && !isRetryableReadError(previousOperationError))
251+
throw previousOperationError;
252+
253+
if (
254+
previousOperationError instanceof MongoNetworkError &&
255+
operation.hasAspect(Aspect.CURSOR_CREATING) &&
256+
session != null &&
257+
session.isPinned &&
258+
!session.inTransaction()
259+
) {
260+
session.unpin({ force: true, forceClear: true });
261+
}
262+
263+
server = await topology.selectServer(selector, {
258264
session,
259-
topology,
260-
selector,
261-
previousServer: server.description,
262-
timeoutContext
265+
operationName: operation.commandName,
266+
previousServer
263267
});
264268

265269
if (hasWriteAspect && !supportsRetryableWrites(server)) {
@@ -269,76 +273,19 @@ async function tryOperation<
269273
}
270274
}
271275

272-
/** @internal */
273-
type RetryOptions = {
274-
session: ClientSession;
275-
topology: Topology;
276-
selector: ReadPreference | ServerSelector;
277-
previousServer: ServerDescription;
278-
timeoutContext: TimeoutContext;
279-
};
280-
281-
async function retryOperation<
282-
T extends AbstractOperation<TResult>,
283-
TResult = ResultTypeFromOperation<T>
284-
>(
285-
operation: T,
286-
originalError: MongoError,
287-
{ session, topology, selector, previousServer, timeoutContext }: RetryOptions
288-
): Promise<TResult> {
289-
const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION);
290-
const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION);
291-
292-
if (isWriteOperation && originalError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
293-
throw new MongoServerError({
294-
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
295-
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
296-
originalError
297-
});
298-
}
299-
300-
if (isWriteOperation && !isRetryableWriteError(originalError)) {
301-
throw originalError;
302-
}
303-
304-
if (isReadOperation && !isRetryableReadError(originalError)) {
305-
throw originalError;
306-
}
307-
308-
if (
309-
originalError instanceof MongoNetworkError &&
310-
session.isPinned &&
311-
!session.inTransaction() &&
312-
operation.hasAspect(Aspect.CURSOR_CREATING)
313-
) {
314-
// If we have a cursor and the initial command fails with a network error,
315-
// we can retry it on another connection. So we need to check it back in, clear the
316-
// pool for the service id, and retry again.
317-
session.unpin({ force: true, forceClear: true });
318-
}
319-
320-
// select a new server, and attempt to retry the operation
321-
const server = await topology.selectServer(selector, {
322-
session,
323-
operationName: operation.commandName,
324-
previousServer,
325-
timeoutContext
326-
});
327-
328-
if (isWriteOperation && !supportsRetryableWrites(server)) {
329-
throw new MongoUnexpectedServerResponseError(
330-
'Selected server does not support retryable writes'
331-
);
332-
}
333-
334-
try {
335-
return await operation.execute(server, session, timeoutContext);
336-
} catch (retryError) {
337-
if (
338-
retryError instanceof MongoError &&
339-
retryError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
340-
) {
341-
throw originalError;
276+
try {
277+
return await operation.execute(server, session, timeoutContext);
278+
} catch (operationError) {
279+
if (!(operationError instanceof MongoError)) throw operationError;
280+
281+
if (
282+
previousOperationError != null &&
283+
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
284+
) {
285+
throw previousOperationError;
286+
}
287+
previousServer = server.description;
288+
previousOperationError = operationError;
342289
}
343290
}
344291

0 commit comments

Comments
 (0)