Skip to content

feat(NODE-4687): Add logging to server selection #3946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -934,10 +934,14 @@ export class ChangeStream<
this.cursor.close().catch(() => null);

const topology = getTopology(this.parent);
topology.selectServer(this.cursor.readPreference, {}, serverSelectionError => {
if (serverSelectionError) return this._closeEmitterModeWithError(changeStreamError);
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
});
topology.selectServer(
this.cursor.readPreference,
{ operationName: 'reconnect topology in change stream' },
serverSelectionError => {
if (serverSelectionError) return this._closeEmitterModeWithError(changeStreamError);
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
}
);
} else {
this._closeEmitterModeWithError(changeStreamError);
}
Expand All @@ -962,7 +966,9 @@ export class ChangeStream<
await this.cursor.close().catch(() => null);
const topology = getTopology(this.parent);
try {
await topology.selectServerAsync(this.cursor.readPreference, {});
await topology.selectServerAsync(this.cursor.readPreference, {
operationName: 'reconnect topology in change stream'
});
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
} catch {
// if the topology can't reconnect, close the stream
Expand Down
8 changes: 8 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ export const TOPOLOGY_CLOSED = 'topologyClosed' as const;
/** @internal */
export const TOPOLOGY_DESCRIPTION_CHANGED = 'topologyDescriptionChanged' as const;
/** @internal */
export const SERVER_SELECTION_STARTED = 'serverSelectionStarted' as const;
/** @internal */
export const SERVER_SELECTION_FAILED = 'serverSelectionFailed' as const;
/** @internal */
export const SERVER_SELECTION_SUCCEEDED = 'serverSelectionSucceeded' as const;
/** @internal */
export const WAITING_FOR_SUITABLE_SERVER = 'waitingForSuitableServer' as const;
/** @internal */
export const CONNECTION_POOL_CREATED = 'connectionPoolCreated' as const;
/** @internal */
export const CONNECTION_POOL_CLOSED = 'connectionPoolClosed' as const;
Expand Down
13 changes: 12 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ export {
TopologyDescriptionChangedEvent,
TopologyOpeningEvent
} from './sdam/events';
export {
ServerSelectionEvent,
ServerSelectionFailedEvent,
ServerSelectionStartedEvent,
ServerSelectionSucceededEvent,
WaitingForSuitableServerEvent
} from './sdam/server_selection_events';
export { SrvPollingEvent } from './sdam/srv_polling';

// type only exports below, these are removed from emitted JS
Expand Down Expand Up @@ -303,9 +310,13 @@ export type {
SERVER_HEARTBEAT_STARTED,
SERVER_HEARTBEAT_SUCCEEDED,
SERVER_OPENING,
SERVER_SELECTION_FAILED,
SERVER_SELECTION_STARTED,
SERVER_SELECTION_SUCCEEDED,
TOPOLOGY_CLOSED,
TOPOLOGY_DESCRIPTION_CHANGED,
TOPOLOGY_OPENING
TOPOLOGY_OPENING,
WAITING_FOR_SUITABLE_SERVER
} from './constants';
export type {
AbstractCursorEvents,
Expand Down
66 changes: 56 additions & 10 deletions src/mongo_logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import type {
ConnectionPoolClearedEvent,
ConnectionPoolClosedEvent,
ConnectionPoolCreatedEvent,
ConnectionPoolMonitoringEvent,
ConnectionPoolReadyEvent,
ConnectionReadyEvent
} from './cmap/connection_pool_events';
Expand All @@ -41,9 +40,13 @@ import {
SERVER_HEARTBEAT_STARTED,
SERVER_HEARTBEAT_SUCCEEDED,
SERVER_OPENING,
SERVER_SELECTION_FAILED,
SERVER_SELECTION_STARTED,
SERVER_SELECTION_SUCCEEDED,
TOPOLOGY_CLOSED,
TOPOLOGY_DESCRIPTION_CHANGED,
TOPOLOGY_OPENING
TOPOLOGY_OPENING,
WAITING_FOR_SUITABLE_SERVER
} from './constants';
import type {
ServerClosedEvent,
Expand All @@ -52,6 +55,13 @@ import type {
TopologyDescriptionChangedEvent,
TopologyOpeningEvent
} from './sdam/events';
import type {
ServerSelectionEvent,
ServerSelectionFailedEvent,
ServerSelectionStartedEvent,
ServerSelectionSucceededEvent,
WaitingForSuitableServerEvent
} from './sdam/server_selection_events';
import { HostAddress, parseUnsignedInteger } from './utils';

/** @internal */
Expand Down Expand Up @@ -335,6 +345,10 @@ type SDAMLoggableEvent =

/** @internal */
export type LoggableEvent =
| ServerSelectionStartedEvent
| ServerSelectionFailedEvent
| ServerSelectionSucceededEvent
| WaitingForSuitableServerEvent
| CommandStartedEvent
| CommandSucceededEvent
| CommandFailedEvent
Expand Down Expand Up @@ -369,11 +383,16 @@ export function stringifyWithMaxLen(
maxDocumentLength: number,
options: EJSONOptions = {}
): string {
const ejson = EJSON.stringify(value, options);
let strToTruncate: string;
if (typeof value === 'function') {
strToTruncate = value.toString();
} else {
strToTruncate = EJSON.stringify(value, options);
}

return maxDocumentLength !== 0 && ejson.length > maxDocumentLength
? `${ejson.slice(0, maxDocumentLength)}...`
: ejson;
return maxDocumentLength !== 0 && strToTruncate.length > maxDocumentLength
? `${strToTruncate.slice(0, maxDocumentLength)}...`
: strToTruncate;
}

/** @internal */
Expand All @@ -385,6 +404,20 @@ function isLogConvertible(obj: Loggable): obj is LogConvertible {
return objAsLogConvertible.toLog !== undefined && typeof objAsLogConvertible.toLog === 'function';
}

function attachServerSelectionFields(
log: Record<string, any>,
serverSelectionEvent: ServerSelectionEvent,
maxDocumentLength: number = DEFAULT_MAX_DOCUMENT_LENGTH
) {
const { selector, operation, topologyDescription, message } = serverSelectionEvent;
log.selector = stringifyWithMaxLen(selector, maxDocumentLength);
log.operation = operation;
log.topologyDescription = stringifyWithMaxLen(topologyDescription, maxDocumentLength);
log.message = message;

return log;
}

function attachCommandFields(
log: Record<string, any>,
commandEvent: CommandStartedEvent | CommandSucceededEvent | CommandFailedEvent
Expand All @@ -402,10 +435,7 @@ function attachCommandFields(
return log;
}

function attachConnectionFields(
log: Record<string, any>,
event: ConnectionPoolMonitoringEvent | ServerOpeningEvent | ServerClosedEvent
) {
function attachConnectionFields(log: Record<string, any>, event: any) {
Copy link
Contributor

@W-A-James W-A-James Dec 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we broaden the type of the event parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use this function for all SDAM Events, too. We used to use it for server selection events as well before serverHost and serverPort were defined directly on the loggable server selection event classes.

const { host, port } = HostAddress.fromString(event.address).toHostPort();
log.serverHost = host;
log.serverPort = port;
Expand Down Expand Up @@ -441,6 +471,22 @@ function defaultLogTransform(
let log: Omit<Log, 's' | 't' | 'c'> = Object.create(null);

switch (logObject.name) {
case SERVER_SELECTION_STARTED:
log = attachServerSelectionFields(log, logObject, maxDocumentLength);
return log;
case SERVER_SELECTION_FAILED:
log = attachServerSelectionFields(log, logObject, maxDocumentLength);
log.failure = logObject.failure.message;
return log;
case SERVER_SELECTION_SUCCEEDED:
log = attachServerSelectionFields(log, logObject, maxDocumentLength);
log.serverHost = logObject.serverHost;
log.serverPort = logObject.serverPort;
return log;
case WAITING_FOR_SUITABLE_SERVER:
log = attachServerSelectionFields(log, logObject, maxDocumentLength);
log.remainingTimeMS = logObject.remainingTimeMS;
return log;
case COMMAND_STARTED:
log = attachCommandFields(log, logObject);
log.message = 'Command started';
Expand Down
10 changes: 8 additions & 2 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ async function executeOperationAsync<
selector = readPreference;
}

const server = await topology.selectServerAsync(selector, { session });
const server = await topology.selectServerAsync(selector, {
session,
operationName: operation.commandName
});

if (session == null) {
// No session also means it is not retryable, early exit
Expand Down Expand Up @@ -251,7 +254,10 @@ async function retryOperation<
}

// select a new server, and attempt to retry the operation
const server = await topology.selectServerAsync(selector, { session });
const server = await topology.selectServerAsync(selector, {
session,
operationName: operation.commandName
});

if (isWriteOperation && !supportsRetryableWrites(server)) {
throw new MongoUnexpectedServerResponseError(
Expand Down
142 changes: 142 additions & 0 deletions src/sdam/server_selection_events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { HostAddress } from '.././utils';
import {
SERVER_SELECTION_FAILED,
SERVER_SELECTION_STARTED,
SERVER_SELECTION_SUCCEEDED,
WAITING_FOR_SUITABLE_SERVER
} from '../constants';
import { type ReadPreference } from '../read_preference';
import { type ServerSelector } from './server_selection';
import type { TopologyDescription } from './topology_description';

/**
* The base export class for all logs published from server selection
* @internal
* @category Log Type
*/
export abstract class ServerSelectionEvent {
/** String representation of the selector being used to select the server.
* Defaults to 'custom selector' for application-provided custom selector case.
*/
selector: string | ReadPreference | ServerSelector;
/** The name of the operation for which a server is being selected. */
operation: string;
/** The current topology description. */
topologyDescription: TopologyDescription;

/** @internal */
abstract name:
| typeof SERVER_SELECTION_STARTED
| typeof SERVER_SELECTION_SUCCEEDED
| typeof SERVER_SELECTION_FAILED
| typeof WAITING_FOR_SUITABLE_SERVER;

abstract message: string;

/** @internal */
constructor(
selector: string | ReadPreference | ServerSelector,
topologyDescription: TopologyDescription,
operation: string
) {
this.selector = selector;
this.operation = operation;
this.topologyDescription = topologyDescription;
}
}

/**
* An event published when server selection starts
* @internal
* @category Event
*/
export class ServerSelectionStartedEvent extends ServerSelectionEvent {
/** @internal */
name = SERVER_SELECTION_STARTED;
message = 'Server selection started';

/** @internal */
constructor(
selector: string | ReadPreference | ServerSelector,
topologyDescription: TopologyDescription,
operation: string
) {
super(selector, topologyDescription, operation);
}
}

/**
* An event published when a server selection fails
* @internal
* @category Event
*/
export class ServerSelectionFailedEvent extends ServerSelectionEvent {
/** @internal */
name = SERVER_SELECTION_FAILED;
message = 'Server selection failed';
/** Representation of the error the driver will throw regarding server selection failing. */
failure: Error;

/** @internal */
constructor(
selector: string | ReadPreference | ServerSelector,
topologyDescription: TopologyDescription,
error: Error,
operation: string
) {
super(selector, topologyDescription, operation);
this.failure = error;
}
}

/**
* An event published when server selection succeeds
* @internal
* @category Event
*/
export class ServerSelectionSucceededEvent extends ServerSelectionEvent {
/** @internal */
name = SERVER_SELECTION_SUCCEEDED;
message = 'Server selection succeeded';
/** The hostname, IP address, or Unix domain socket path for the selected server. */
serverHost: string;
/** The port for the selected server. Optional; not present for Unix domain sockets. When the user does not specify a port and the default (27017) is used, the driver SHOULD include it here. */
serverPort: number | undefined;

/** @internal */
constructor(
selector: string | ReadPreference | ServerSelector,
topologyDescription: TopologyDescription,
address: string,
operation: string
) {
super(selector, topologyDescription, operation);
const { host, port } = HostAddress.fromString(address).toHostPort();
this.serverHost = host;
this.serverPort = port;
}
}

/**
* An event published when server selection is waiting for a suitable server to become available
* @internal
* @category Event
*/
export class WaitingForSuitableServerEvent extends ServerSelectionEvent {
/** @internal */
name = WAITING_FOR_SUITABLE_SERVER;
message = 'Waiting for suitable server to become available';
/** The remaining time left until server selection will time out. */
remainingTimeMS: number;

/** @internal */
constructor(
selector: string | ReadPreference | ServerSelector,
topologyDescription: TopologyDescription,
remainingTimeMS: number,
operation: string
) {
super(selector, topologyDescription, operation);
this.remainingTimeMS = remainingTimeMS;
}
}
Loading