Skip to content

test(NODE-3903): update connections survive stepdown tests to check CMAP events #4071

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,191 +1,204 @@
import { expect } from 'chai';

import type { Collection, Db, MongoClient } from '../../mongodb';
import { skipBrokenAuthTestBeforeEachHook } from '../../tools/runner/hooks/configuration';

function ignoreNsNotFound(err) {
if (!err.message.match(/ns not found/)) {
throw err;
}
}

function connectionCount(client) {
return client
.db()
.admin()
.serverStatus()
.then(result => result.connections.totalCreated);
}

function expectPoolWasCleared(initialCount) {
return count => expect(count).to.greaterThan(initialCount);
}

function expectPoolWasNotCleared(initialCount) {
return count => expect(count).to.equal(initialCount);
}

// TODO: NODE-3819: Unskip flaky MacOS tests.
// TODO: NODE-3903: check events as specified in the corresponding prose test description
const maybeDescribe = process.platform === 'darwin' ? describe.skip : describe;
maybeDescribe('Connections survive primary step down - prose', function () {
import {
type Collection,
type ConnectionPoolClearedEvent,
type FindCursor,
type MongoClient,
MONGODB_ERROR_CODES,
MongoServerError,
ReadPreference
} from '../../mongodb';
import { type FailPoint } from '../../tools/utils';

describe('Connections Survive Primary Step Down - prose', function () {
let client: MongoClient;
let checkClient: MongoClient;
let db: Db;
let collection: Collection;
let poolClearedEvents: ConnectionPoolClearedEvent[];

beforeEach(
skipBrokenAuthTestBeforeEachHook({
skippedTests: [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice we don't have to skip these any more.

'getMore iteration',
'Not Primary - Keep Connection Pool',
'Not Primary - Reset Connection Pool',
'Shutdown in progress - Reset Connection Pool',
'Interrupted at shutdown - Reset Connection Pool'
]
})
);

beforeEach(function () {
const clientOptions = {
maxPoolSize: 1,
retryWrites: false,
heartbeatFrequencyMS: 100
};

client = this.configuration.newClient(clientOptions);
return client
.db()
.command({ ping: 1 })
.then(() => {
const primary = Array.from(client.topology.description.servers.values()).filter(
sd => sd.type === 'RSPrimary'
)[0];

checkClient = this.configuration.newClient(
`mongodb://${primary.address}/?directConnection=true`,
clientOptions
);
return checkClient.connect();
})
.then(() => {
db = client.db('step-down');
collection = db.collection('step-down');
})
.then(() => collection.drop({ writeConcern: { w: 'majority' } }))
.catch(ignoreNsNotFound)
.then(() => db.createCollection('step-down', { writeConcern: { w: 'majority' } }));
});
afterEach(() => client.close());

let deferred = [];
afterEach(function () {
return Promise.all(deferred.map(d => d())).then(() => {
deferred = [];
return Promise.all([client, checkClient].filter(x => !!x).map(client => client.close()));
});
afterEach(async function () {
const utilClient = this.configuration.newClient();
await utilClient.db('admin').command({ configureFailPoint: 'failCommand', mode: 'off' });
await utilClient.close();
poolClearedEvents = [];
});

it('getMore iteration', {
metadata: {
requires: { mongodb: '>=4.2.0', topology: 'replicaset' }
},

test: function () {
return collection
.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }, { a: 5 }], {
writeConcern: { w: 'majority' }
})
.then(result => expect(result.insertedCount).to.equal(5))
.then(() => {
const cursor = collection.find({}, { batchSize: 2 });
deferred.push(() => cursor.close());

return cursor
.next()
.then(item => expect(item.a).to.equal(1))
.then(() => cursor.next())
.then(item => expect(item.a).to.equal(2))
.then(() => {
return connectionCount(checkClient).then(initialConnectionCount => {
return client
.db('admin')
.command({ replSetFreeze: 0 }, { readPreference: 'secondary' })
.then(result => expect(result).property('info').to.equal('unfreezing'))
.then(() =>
client
.db('admin')
.command({ replSetStepDown: 30, force: true }, { readPreference: 'primary' })
)
.then(() => cursor.next())
.then(item => expect(item.a).to.equal(3))
.then(() =>
connectionCount(checkClient).then(
expectPoolWasNotCleared(initialConnectionCount)
)
);
});
});
});
}
beforeEach(async function () {
// For each test, make sure the following steps have been completed before running the actual test:

// - Create a ``MongoClient`` with ``retryWrites=false``
client = this.configuration.newClient({ retryWrites: false, heartbeatFrequencyMS: 500 });
// - Create a collection object from the ``MongoClient``, using ``step-down`` for the database and collection name.
collection = client.db('step-down').collection('step-down');
// - Drop the test collection, using ``writeConcern`` "majority".
await collection.drop({ writeConcern: { w: 'majority' } }).catch(() => null);
// - Execute the "create" command to recreate the collection, using writeConcern: "majority".
collection = await client
.db('step-down')
.createCollection('step-down', { writeConcern: { w: 'majority' } });

poolClearedEvents = [];
client.on('connectionPoolCleared', poolClearEvent => poolClearedEvents.push(poolClearEvent));
});

function runStepownScenario(errorCode, predicate) {
return connectionCount(checkClient).then(initialConnectionCount => {
return client
context('getMore Iteration', { requires: { mongodb: '>4.2', topology: ['replicaset'] } }, () => {
// This test requires a replica set with server version 4.2 or higher.

let cursor: FindCursor;
afterEach(() => cursor.close());

it('survives after primary step down', async () => {
// - Insert 5 documents into a collection with a majority write concern.
await collection.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }, { a: 5 }], {
writeConcern: { w: 'majority' }
});
// - Start a find operation on the collection with a batch size of 2, and retrieve the first batch of results.
cursor = collection.find({}, { batchSize: 2 });
expect(await cursor.next()).to.have.property('a', 1);
expect(await cursor.next()).to.have.property('a', 2);
// - Send a `{replSetFreeze: 0}` command to any secondary and verify that the command succeeded.
// This command will unfreeze (because it is set to zero) the secondary and ensure that it will be eligible to be elected immediately.
await client
.db('admin')
.command({
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: { failCommands: ['insert'], errorCode }
})
.then(() => {
deferred.push(() =>
client.db('admin').command({ configureFailPoint: 'failCommand', mode: 'off' })
);

return collection.insertOne({ test: 1 }).then(
() => Promise.reject(new Error('expected an error')),
err => expect(err.code).to.equal(errorCode)
);
})
.then(() => collection.insertOne({ test: 1 }))
.then(() => connectionCount(checkClient).then(predicate(initialConnectionCount)));
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.secondary });
// - Send a ``{replSetStepDown: 30, force: true}`` command to the current primary and verify that the command succeeded.
await client.db('admin').command({ replSetStepDown: 5, force: true });
// - Retrieve the next batch of results from the cursor obtained in the find operation, and verify that this operation succeeded.
expect(await cursor.next()).to.have.property('a', 3);
// - If the driver implements the `CMAP`_ specification, verify that no new `PoolClearedEvent`_ has been
// published. Otherwise verify that `connections.totalCreated`_ in `serverStatus`_ has not changed.
expect(poolClearedEvents).to.be.empty;

// Referenced python's implementation. Changes from spec:
// replSetStepDown: 5 instead of 30
// Run these inserts to clear NotWritablePrimary issue
// Create client with heartbeatFrequencyMS=500 instead of default of 10_000

// Attempt insertion to mark server description as stale and prevent a
// NotPrimaryError on the subsequent operation.
const error = await collection.insertOne({ a: 6 }).catch(error => error);
expect(error)
.to.be.instanceOf(MongoServerError)
.to.have.property('code', MONGODB_ERROR_CODES.NotWritablePrimary);

// Next insert should succeed on the new primary without clearing pool.
await collection.insertOne({ a: 7 });

expect(poolClearedEvents).to.be.empty;
});
}

it('Not Primary - Keep Connection Pool', {
metadata: {
requires: { mongodb: '>=4.2.0', topology: 'replicaset' }
},
test: function () {
return runStepownScenario(10107, expectPoolWasNotCleared);
}
});

it('Not Primary - Reset Connection Pool', {
metadata: {
requires: { mongodb: '4.0.x', topology: 'replicaset' }
},
test: function () {
return runStepownScenario(10107, expectPoolWasCleared);
context(
'Not Primary - Keep Connection Pool',
{ requires: { mongodb: '>4.2', topology: ['replicaset'] } },
() => {
// This test requires a replica set with server version 4.2 or higher.

// - Set the following fail point: ``{configureFailPoint: "failCommand", mode: {times: 1}, data: {failCommands: ["insert"], errorCode: 10107}}``
const failPoint: FailPoint = {
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: { failCommands: ['insert'], errorCode: 10107 }
};

it('survives after primary step down', async () => {
await client.db('admin').command(failPoint);
// - Execute an insert into the test collection of a ``{test: 1}`` document.
const error = await collection.insertOne({ test: 1 }).catch(error => error);
// - Verify that the insert failed with an operation failure with 10107 code.
expect(error).to.be.instanceOf(MongoServerError).and.has.property('code', 10107);
// - Execute an insert into the test collection of a ``{test: 1}`` document and verify that it succeeds.
await collection.insertOne({ test: 1 });
// - If the driver implements the `CMAP`_ specification, verify that no new `PoolClearedEvent`_ has been
// published. Otherwise verify that `connections.totalCreated`_ in `serverStatus`_ has not changed.
expect(poolClearedEvents).to.be.empty;
});
}
});
);

it('Shutdown in progress - Reset Connection Pool', {
metadata: {
requires: { mongodb: '>=4.0.0', topology: 'replicaset' }
},
test: function () {
return runStepownScenario(91, expectPoolWasCleared);
context(
'Not Primary - Reset Connection Pool',
{ requires: { mongodb: '>=4.0.0 <4.2.0', topology: ['replicaset'] } },
() => {
// This test requires a replica set with server version 4.0.

// - Set the following fail point: ``{configureFailPoint: "failCommand", mode: {times: 1}, data: {failCommands: ["insert"], errorCode: 10107}}``
const failPoint: FailPoint = {
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: { failCommands: ['insert'], errorCode: 10107 }
};

it('survives after primary step down', async () => {
await client.db('admin').command(failPoint);
// - Execute an insert into the test collection of a ``{test: 1}`` document.
const error = await collection.insertOne({ test: 1 }).catch(error => error);
// - Verify that the insert failed with an operation failure with 10107 code.
expect(error).to.be.instanceOf(MongoServerError).and.has.property('code', 10107);
// - If the driver implements the `CMAP`_ specification, verify that a `PoolClearedEvent`_ has been published
expect(poolClearedEvents).to.have.lengthOf(1);
// - Execute an insert into the test collection of a ``{test: 1}`` document and verify that it succeeds.
await collection.insertOne({ test: 1 });
// - If the driver does NOT implement the `CMAP`_ specification, use the `serverStatus`_ command to verify `connections.totalCreated`_ has increased by 1.
});
}
});
);

it('Interrupted at shutdown - Reset Connection Pool', {
metadata: {
requires: { mongodb: '>=4.0.0', topology: 'replicaset' }
},
test: function () {
return runStepownScenario(11600, expectPoolWasCleared);
context(
'Shutdown in progress - Reset Connection Pool',
{ requires: { mongodb: '>=4.0', topology: ['replicaset'] } },
() => {
// This test should be run on all server versions >= 4.0.

// - Set the following fail point: ``{configureFailPoint: "failCommand", mode: {times: 1}, data: {failCommands: ["insert"], errorCode: 91}}``
const failPoint: FailPoint = {
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: { failCommands: ['insert'], errorCode: 91 }
};

it('survives after primary step down', async () => {
await client.db('admin').command(failPoint);
// - Execute an insert into the test collection of a ``{test: 1}`` document.
const error = await collection.insertOne({ test: 1 }).catch(error => error);
// - Verify that the insert failed with an operation failure with 91 code.
expect(error).to.be.instanceOf(MongoServerError).and.has.property('code', 91);
// - If the driver implements the `CMAP`_ specification, verify that a `PoolClearedEvent`_ has been published
expect(poolClearedEvents).to.have.lengthOf(1);
// - Execute an insert into the test collection of a ``{test: 1}`` document and verify that it succeeds.
await collection.insertOne({ test: 1 });
// - If the driver does NOT implement the `CMAP`_ specification, use the `serverStatus`_ command to verify `connections.totalCreated`_ has increased by 1.
});
}
});
);

context(
'Interrupted at shutdown - Reset Connection Pool',
{ requires: { mongodb: '>=4.0', topology: ['replicaset'] } },
() => {
// This test should be run on all server versions >= 4.0.

// - Set the following fail point: ``{configureFailPoint: "failCommand", mode: {times: 1}, data: {failCommands: ["insert"], errorCode: 11600}}``
const failPoint: FailPoint = {
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: { failCommands: ['insert'], errorCode: 11600 }
};

it('survives after primary step down', async () => {
await client.db('admin').command(failPoint);
// - Execute an insert into the test collection of a ``{test: 1}`` document.
const error = await collection.insertOne({ test: 1 }).catch(error => error);
// - Verify that the insert failed with an operation failure with 11600 code.
expect(error).to.be.instanceOf(MongoServerError).and.has.property('code', 11600);
// - If the driver implements the `CMAP`_ specification, verify that a `PoolClearedEvent`_ has been published
expect(poolClearedEvents).to.have.lengthOf(1);
// - Execute an insert into the test collection of a ``{test: 1}`` document and verify that it succeeds.
await collection.insertOne({ test: 1 });
// - If the driver does NOT implement the `CMAP`_ specification, use the `serverStatus`_ command to verify `connections.totalCreated`_ has increased by 1.
});
}
);
});