Skip to content

NODE-3008: Dont pass session to cloned cursor #2725

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/cursor/aggregation_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AggregateOperation, AggregateOptions } from '../operations/aggregate';
import { AbstractCursor, assertUninitialized } from './abstract_cursor';
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
import { mergeOptions } from '../utils';
import type { Document } from '../bson';
import type { Sort } from '../sort';
import type { Topology } from '../sdam/topology';
Expand Down Expand Up @@ -52,9 +53,9 @@ export class AggregationCursor extends AbstractCursor {
}

clone(): AggregationCursor {
const clonedOptions = mergeOptions({}, this[kOptions]);
return new AggregationCursor(this[kParent], this.topology, this.namespace, this[kPipeline], {
...this[kOptions],
...this.cursorOptions
...clonedOptions
});
}

Expand Down
5 changes: 3 additions & 2 deletions src/cursor/find_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { ExplainVerbosityLike } from '../explain';
import { CountOperation, CountOptions } from '../operations/count';
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
import { FindOperation, FindOptions } from '../operations/find';
import { mergeOptions } from '../utils';
import type { Hint } from '../operations/operation';
import type { CollationOptions } from '../operations/command';
import type { Topology } from '../sdam/topology';
Expand Down Expand Up @@ -52,9 +53,9 @@ export class FindCursor extends AbstractCursor {
}

clone(): FindCursor {
const clonedOptions = mergeOptions({}, this[kBuiltOptions]);
return new FindCursor(this.topology, this.namespace, this[kFilter], {
...this[kBuiltOptions],
...this.cursorOptions
...clonedOptions
});
}

Expand Down
287 changes: 167 additions & 120 deletions test/functional/cursor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ describe('Cursor', function () {
}
});


it('cursor should trigger getMore', {
// Add a tag that our runner can trigger on
// in this case we are setting that node needs to be higher than 0.10.X to run
Expand Down Expand Up @@ -3755,6 +3756,65 @@ describe('Cursor', function () {
}
);

describe('#clone', function () {
let client;
let db;
let collection;

beforeEach(function () {
client = this.configuration.newClient({ w: 1 });

return client.connect().then(client => {
db = client.db(this.configuration.db);
collection = db.collection('test_coll');
});
});

afterEach(function () {
return client.close();
});

context('when executing on a find cursor', function () {
it('removes the existing session from the cloned cursor', function () {
const docs = [{ name: 'test1' }, { name: 'test2' }];
return collection.insertMany(docs).then(() => {
const cursor = collection.find({}, { batchSize: 1 });
return cursor
.next()
.then(doc => {
expect(doc).to.exist;
const clonedCursor = cursor.clone();
expect(clonedCursor.cursorOptions.session).to.not.exist;
expect(clonedCursor.session).to.not.exist;
})
.finally(() => {
return cursor.close();
});
});
});
});

context('when executing on an aggregation cursor', function () {
it('removes the existing session from the cloned cursor', function () {
const docs = [{ name: 'test1' }, { name: 'test2' }];
return collection.insertMany(docs).then(() => {
const cursor = collection.aggregate([{ $match: {} }], { batchSize: 1 });
return cursor
.next()
.then(doc => {
expect(doc).to.exist;
const clonedCursor = cursor.clone();
expect(clonedCursor.cursorOptions.session).to.not.exist;
expect(clonedCursor.session).to.not.exist;
})
.finally(() => {
return cursor.close();
});
});
});
});
});

it('should return a promise when no callback supplied to forEach method', function () {
const configuration = this.configuration;
const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 });
Expand Down Expand Up @@ -3854,66 +3914,6 @@ describe('Cursor', function () {
const expectedDocs = [
{ _id: 0, b: 1, c: 0 },
{ _id: 1, b: 1, c: 0 },
{ _id: 2, b: 1, c: 0 }
];
const config = {
client: client,
configuration: configuration,
collectionName: 'stream-test-transform',
transformFunc: doc => ({ _id: doc._id, b: doc.a.b, c: doc.a.c }),
expectedSet: new Set(expectedDocs)
};

testTransformStream(config, done);
});

it('stream should return a stream of unmodified docs if no transform function applied', function (done) {
const configuration = this.configuration;
const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 });
const expectedDocs = [
{ _id: 0, a: { b: 1, c: 0 } },
{ _id: 1, a: { b: 1, c: 0 } },
{ _id: 2, a: { b: 1, c: 0 } }
];
const config = {
client: client,
configuration: configuration,
collectionName: 'transformStream-test-notransform',
transformFunc: null,
expectedSet: new Set(expectedDocs)
};

testTransformStream(config, done);
});

it.skip('should apply parent read preference to count command', function (done) {
// NOTE: this test is skipped because mongo orchestration does not test sharded clusters
// with secondaries. This behavior should be unit tested

const configuration = this.configuration;
const client = configuration.newClient(
{ w: 1, readPreference: ReadPreference.SECONDARY },
{ maxPoolSize: 1, connectWithNoPrimary: true }
);

client.connect((err, client) => {
expect(err).to.not.exist;
this.defer(() => client.close());

const db = client.db(configuration.db);
let collection, cursor, spy;
const close = e => cursor.close(() => client.close(() => done(e)));

Promise.resolve()
.then(() => new Promise(resolve => setTimeout(() => resolve(), 500)))
.then(() => db.createCollection('test_count_readPreference'))
.then(() => (collection = db.collection('test_count_readPreference')))
.then(() => collection.find())
.then(_cursor => (cursor = _cursor))
.then(() => (spy = sinon.spy(cursor.topology, 'command')))
.then(() => cursor.count())
.then(() =>
expect(spy.firstCall.args[2])
.to.have.nested.property('readPreference.mode')
.that.equals('secondary')
)
Expand Down Expand Up @@ -3965,6 +3965,53 @@ describe('Cursor', function () {
});
});

describe('#stream', function () {
context('when the stream is closed', function () {
it('emits the close event once only', function () {
const configuration = this.configuration;
const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 });

return client.connect(err => {
expect(err).to.not.exist;
this.defer(() => client.close());

const collection = client.db().collection('documents');
collection.drop(() => {
const docs = [{ a: 1 }, { a: 2 }, { a: 3 }];
collection.insertMany(docs, err => {
expect(err).to.not.exist;

const cursor = collection.find({}, { sort: { a: 1 } });
cursor.hasNext((err, hasNext) => {
expect(err).to.not.exist;
expect(hasNext).to.be.true;

const collected = [];
const stream = new Writable({
objectMode: true,
write: (chunk, encoding, next) => {
collected.push(chunk);
next(undefined, chunk);
}
});

const cursorStream = cursor.stream();

cursorStream.on('end', () => {
expect(collected).to.have.length(3);
expect(collected).to.eql(docs);
done();
});

cursorStream.pipe(stream);
});
});
});
});
});
});
});

describe('transforms', function () {
it('should correctly apply map transform to cursor as readable stream', function (done) {
const configuration = this.configuration;
Expand Down Expand Up @@ -4023,24 +4070,70 @@ describe('Cursor', function () {
context('sort', function () {
const findSort = (input, output) =>
withMonitoredClient('find', function (client, events, done) {
const db = client.db('test');
db.collection('test_sort_dos', (err, collection) => {
});

describe('transforms', function () {
it('should correctly apply map transform to cursor as readable stream', function (done) {
const configuration = this.configuration;
const client = configuration.newClient();
client.connect(err => {
expect(err).to.not.exist;
this.defer(() => client.close());

const docs = 'Aaden Aaron Adrian Aditya Bob Joe'.split(' ').map(x => ({ name: x }));
const coll = client.db(configuration.db).collection('cursor_stream_mapping');
coll.insertMany(docs, err => {
expect(err).to.not.exist;
const cursor = collection.find({}, { sort: input });
cursor.next(err => {
expect(err).to.not.exist;
expect(events[0].command.sort).to.deep.equal(output);
cursor.close(done);

const bag = [];
const stream = coll
.find()
.project({ _id: 0, name: 1 })
.map(doc => ({ mapped: doc }))
.stream()
.on('data', doc => bag.push(doc));

stream.on('error', done).on('end', () => {
expect(bag.map(x => x.mapped)).to.eql(docs.map(x => ({ name: x.name })));
done();
});
});
});
});

const cursorSort = (input, output) =>
it('should correctly apply map transform when converting cursor to array', function (done) {
const configuration = this.configuration;
const client = configuration.newClient();
client.connect(err => {
expect(err).to.not.exist;
this.defer(() => client.close());

const docs = 'Aaden Aaron Adrian Aditya Bob Joe'.split(' ').map(x => ({ name: x }));
const coll = client.db(configuration.db).collection('cursor_toArray_mapping');
coll.insertMany(docs, err => {
expect(err).to.not.exist;

coll
.find()
.project({ _id: 0, name: 1 })
.map(doc => ({ mapped: doc }))
.toArray((err, mappedDocs) => {
expect(err).to.not.exist;
expect(mappedDocs.map(x => x.mapped)).to.eql(docs.map(x => ({ name: x.name })));
done();
});
});
});
});
});

context('sort', function () {
const findSort = (input, output) =>
withMonitoredClient('find', function (client, events, done) {
const db = client.db('test');
db.collection('test_sort_dos', (err, collection) => {
expect(err).to.not.exist;
const cursor = collection.find({}).sort(input);
const cursor = collection.find({}, { sort: input });
cursor.next(err => {
expect(err).to.not.exist;
expect(events[0].command.sort).to.deep.equal(output);
Expand All @@ -4049,61 +4142,15 @@ describe('Cursor', function () {
});
});

it('should use find options object', findSort({ alpha: 1 }, { alpha: 1 }));
it('should use find options string', findSort('alpha', { alpha: 1 }));
it('should use find options shallow array', findSort(['alpha', 1], { alpha: 1 }));
it('should use find options deep array', findSort([['alpha', 1]], { alpha: 1 }));

it('should use cursor.sort object', cursorSort({ alpha: 1 }, { alpha: 1 }));
it('should use cursor.sort string', cursorSort('alpha', { alpha: 1 }));
it('should use cursor.sort shallow array', cursorSort(['alpha', 1], { alpha: 1 }));
it('should use cursor.sort deep array', cursorSort([['alpha', 1]], { alpha: 1 }));

it('formatSort - one key', () => {
expect(formatSort('alpha')).to.deep.equal({ alpha: 1 });
expect(formatSort(['alpha'])).to.deep.equal({ alpha: 1 });
expect(formatSort('alpha', 1)).to.deep.equal({ alpha: 1 });
expect(formatSort('alpha', 'asc')).to.deep.equal({ alpha: 1 });
expect(formatSort([['alpha', 'asc']])).to.deep.equal({ alpha: 1 });
expect(formatSort('alpha', 'ascending')).to.deep.equal({ alpha: 1 });
expect(formatSort({ alpha: 1 })).to.deep.equal({ alpha: 1 });
expect(formatSort('beta')).to.deep.equal({ beta: 1 });
expect(formatSort(['beta'])).to.deep.equal({ beta: 1 });
expect(formatSort('beta', -1)).to.deep.equal({ beta: -1 });
expect(formatSort('beta', 'desc')).to.deep.equal({ beta: -1 });
expect(formatSort('beta', 'descending')).to.deep.equal({ beta: -1 });
expect(formatSort({ beta: -1 })).to.deep.equal({ beta: -1 });
expect(formatSort({ alpha: { $meta: 'hi' } })).to.deep.equal({
alpha: { $meta: 'hi' }
});
});

it('formatSort - multi key', () => {
expect(formatSort(['alpha', 'beta'])).to.deep.equal({ alpha: 1, beta: 1 });
expect(formatSort({ alpha: 1, beta: 1 })).to.deep.equal({ alpha: 1, beta: 1 });
expect(
formatSort([
['alpha', 'asc'],
['beta', 'ascending']
])
).to.deep.equal({ alpha: 1, beta: 1 });
expect(formatSort({ alpha: { $meta: 'hi' }, beta: 'ascending' })).to.deep.equal({
alpha: { $meta: 'hi' },
beta: 1
});
});

it('should use allowDiskUse option on sort', {
metadata: { requires: { mongodb: '>=4.4' } },
test: withMonitoredClient('find', function (client, events, done) {
const cursorSort = (input, output) =>
withMonitoredClient('find', function (client, events, done) {
const db = client.db('test');
db.collection('test_sort_allow_disk_use', (err, collection) => {
db.collection('test_sort_dos', (err, collection) => {
expect(err).to.not.exist;
const cursor = collection.find({}).sort(['alpha', 1]).allowDiskUse();
const cursor = collection.find({}).sort(input);
cursor.next(err => {
expect(err).to.not.exist;
const { command } = events.shift();
expect(command.sort).to.deep.equal({ alpha: 1 });
expect(events[0].command.sort).to.deep.equal(output);
expect(command.allowDiskUse).to.be.true;
cursor.close(done);
});
Expand Down