Skip to content

CDRIVER-5925 apply batchSize:0 to aggregate in change stream #1909

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions NEWS
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Unreleased (2.0.0)
==================

Changes:

* Passing `batchSize:0` as an option to `mongoc_client_watch`, `mongoc_database_watch`, or `mongoc_collection_watch`
now applies `batchSize:0` to the `aggregate` command. Useful to request an immediate cursor. Previously the value
was ignored.

Removals:

* The compatibility "forwarding" headers have been removed (previously added
Expand Down
10 changes: 7 additions & 3 deletions build/generate-opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def flatten(items):

class Struct(OrderedDict):
def __init__(self, items, opts_name='opts', generate_rst=True,
generate_code=True, allow_extra=True, **defaults):
generate_code=True, allow_extra=True, rst_prelude=None, **defaults):
"""Define an options struct.

- items: List of pairs: (optionName, info)
Expand All @@ -64,6 +64,7 @@ def __init__(self, items, opts_name='opts', generate_rst=True,
self.generate_code = generate_code
self.allow_extra = allow_extra
self.defaults = defaults
self.rst_prelude = rst_prelude

def default(self, item, fallback):
return self.defaults.get(item, fallback)
Expand Down Expand Up @@ -293,7 +294,7 @@ def __init__(self, items, **defaults):
], limit=0, allow_extra=False)),

('mongoc_change_stream_opts_t', Struct([
('batchSize', {'type': 'int32_t', 'help': 'An ``int32`` representing number of documents requested to be returned on each call to :symbol:`mongoc_change_stream_next`'}),
('batchSize', {'type': 'int32_t', 'help': 'An ``int32`` requesting a limit of documents returned in each server reply. If positive, the ``batchSize`` is applied to both ``aggregate`` and ``getMore`` commands. If 0, the ``batchSize`` is only applied to the ``aggregate`` command (Useful to request an immediate cursor without significant server-side work. See `Aggregate Data Specifying Batch Size <https://www.mongodb.com/docs/manual/reference/command/aggregate/#aggregate-data-specifying-batch-size>`_). If omitted or negative, the value is ignored and server defaults are used (See `Cursor Batches <https://www.mongodb.com/docs/manual/core/cursors/#cursor-batches>`_ for a description of server defaults).'}),
('resumeAfter', {'type': 'document', 'help': 'A ``Document`` representing the logical starting point of the change stream. The result of :symbol:`mongoc_change_stream_get_resume_token()` or the ``_id`` field of any change received from a change stream can be used here. This option is mutually exclusive with ``startAfter`` and ``startAtOperationTime``.'}),
('startAfter', {'type': 'document', 'help': 'A ``Document`` representing the logical starting point of the change stream. Unlike ``resumeAfter``, this can resume notifications after an "invalidate" event. The result of :symbol:`mongoc_change_stream_get_resume_token()` or the ``_id`` field of any change received from a change stream can be used here. This option is mutually exclusive with ``resumeAfter`` and ``startAtOperationTime``.'}),
('startAtOperationTime', {'type': 'timestamp', 'help': 'A ``Timestamp``. The change stream only provides changes that occurred at or after the specified timestamp. Any command run against the server will return an operation time that can be used here. This option is mutually exclusive with ``resumeAfter`` and ``startAfter``.'}),
Expand All @@ -317,7 +318,7 @@ def __init__(self, items, **defaults):
}),
('showExpandedEvents', { 'type': 'bool', 'help': 'Set to ``true`` to return an expanded list of change stream events. Available only on MongoDB versions >=6.0'}),
comment_option_string_pre_4_4,
], fullDocument=None, fullDocumentBeforeChange=None)),
], fullDocument=None, fullDocumentBeforeChange=None, batchSize=-1, rst_prelude=".. versionchanged:: 2.0.0 ``batchSize`` of 0 is applied to the ``aggregate`` command. 0 was previously ignored.")),

('mongoc_create_index_opts_t', Struct([
write_concern_option,
Expand Down Expand Up @@ -474,6 +475,9 @@ def document_opts(struct, f):
print(file_name)
f = open(joinpath(doc_includes, file_name), 'w')
f.write (disclaimer)
if struct.rst_prelude is not None:
f.write(struct.rst_prelude)
f.write("\n\n")
f.write(
"``%s`` may be NULL or a BSON document with additional"
" command options:\n\n" % struct.opts_name)
Expand Down
4 changes: 3 additions & 1 deletion src/libmongoc/doc/includes/change-stream-opts.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
Generated with build/generate-opts.py
DO NOT EDIT THIS FILE

.. versionchanged:: 2.0.0 ``batchSize`` of 0 is applied to the ``aggregate`` command. 0 was previously ignored.

``opts`` may be NULL or a BSON document with additional command options:

* ``batchSize``: An ``int32`` representing number of documents requested to be returned on each call to :symbol:`mongoc_change_stream_next`
* ``batchSize``: An ``int32`` requesting a limit of documents returned in each server reply. If positive, the ``batchSize`` is applied to both ``aggregate`` and ``getMore`` commands. If 0, the ``batchSize`` is only applied to the ``aggregate`` command (Useful to request an immediate cursor without significant server-side work. See `Aggregate Data Specifying Batch Size <https://www.mongodb.com/docs/manual/reference/command/aggregate/#aggregate-data-specifying-batch-size>`_). If omitted or negative, the value is ignored and server defaults are used (See `Cursor Batches <https://www.mongodb.com/docs/manual/core/cursors/#cursor-batches>`_ for a description of server defaults).
* ``resumeAfter``: A ``Document`` representing the logical starting point of the change stream. The result of :symbol:`mongoc_change_stream_get_resume_token()` or the ``_id`` field of any change received from a change stream can be used here. This option is mutually exclusive with ``startAfter`` and ``startAtOperationTime``.
* ``startAfter``: A ``Document`` representing the logical starting point of the change stream. Unlike ``resumeAfter``, this can resume notifications after an "invalidate" event. The result of :symbol:`mongoc_change_stream_get_resume_token()` or the ``_id`` field of any change received from a change stream can be used here. This option is mutually exclusive with ``resumeAfter`` and ``startAtOperationTime``.
* ``startAtOperationTime``: A ``Timestamp``. The change stream only provides changes that occurred at or after the specified timestamp. Any command run against the server will return an operation time that can be used here. This option is mutually exclusive with ``resumeAfter`` and ``startAfter``.
Expand Down
7 changes: 5 additions & 2 deletions src/libmongoc/src/mongoc/mongoc-change-stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ _make_command (mongoc_change_stream_t *stream, bson_t *command)

/* Add batch size if needed */
bson_append_document_begin (command, "cursor", 6, &cursor_doc);
if (stream->batch_size > 0) {
if (stream->batch_size >= 0) {
// `batchSize:0` is supported and applied to `aggregate`. `batchSize:0` requests an immediate cursor. This is
// useful to avoid a long-running server-side aggregate. Once created, `mongoc_change_stream_destroy` can use
// `killCursors` to kill the server-side cursor.
bson_append_int32 (&cursor_doc, "batchSize", 9, stream->batch_size);
}
bson_append_document_end (command, &cursor_doc);
Expand Down Expand Up @@ -377,7 +380,7 @@ _change_stream_init (mongoc_change_stream_t *stream, const bson_t *pipeline, con

_mongoc_timestamp_set (&stream->operation_time, &stream->opts.startAtOperationTime);

stream->batch_size = stream->opts.batchSize;
stream->batch_size = stream->opts.batchSize; // `stream->opts.batchSize` is -1 if not present in `opts`.
stream->max_await_time_ms = stream->opts.maxAwaitTimeMS;
stream->show_expanded_events = stream->opts.showExpandedEvents;

Expand Down
2 changes: 1 addition & 1 deletion src/libmongoc/src/mongoc/mongoc-opts.c
Original file line number Diff line number Diff line change
Expand Up @@ -1650,7 +1650,7 @@ _mongoc_change_stream_opts_parse (

BSON_ASSERT (client || true); // client may be NULL.

mongoc_change_stream_opts->batchSize = 0;
mongoc_change_stream_opts->batchSize = -1;
bson_init (&mongoc_change_stream_opts->resumeAfter);
bson_init (&mongoc_change_stream_opts->startAfter);
memset (&mongoc_change_stream_opts->startAtOperationTime, 0, sizeof (mongoc_timestamp_t));
Expand Down
123 changes: 123 additions & 0 deletions src/libmongoc/tests/test-mongoc-change-stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -2124,6 +2124,123 @@ prose_test_18 (void)
mock_server_destroy (server);
}

typedef struct {
bson_t *commands[6];
size_t commands_len;
bson_t *replies[6];
size_t replies_len;
} test_events_t;

static void
test_events_started_cb (const mongoc_apm_command_started_t *e)
{
test_events_t *te = mongoc_apm_command_started_get_context (e);
ASSERT_CMPSIZE_T (te->commands_len, <, sizeof (te->commands) / sizeof (te->commands[0]));
te->commands[te->commands_len++] = bson_copy (mongoc_apm_command_started_get_command (e));
}

static void
test_events_succeeded_cb (const mongoc_apm_command_succeeded_t *e)
{
test_events_t *te = mongoc_apm_command_succeeded_get_context (e);
ASSERT_CMPSIZE_T (te->replies_len, <, sizeof (te->replies) / sizeof (te->replies[0]));
te->replies[te->replies_len++] = bson_copy (mongoc_apm_command_succeeded_get_reply (e));
}

// Test that batchSize:0 is applied to the `aggregate` command.
static void
test_change_stream_batchSize0 (void *test_ctx)
{
bson_error_t error;

// Create a change stream. Capture a resume token. Insert documents to create future events.
bson_t *resumeToken;
{
mongoc_client_t *client = test_framework_new_default_client ();
mongoc_collection_t *coll = drop_and_get_coll (client, "db", "coll");
mongoc_change_stream_t *cs = mongoc_collection_watch (coll, tmp_bson ("{}"), NULL);
resumeToken = bson_copy (mongoc_change_stream_get_resume_token (cs));
// Insert documents to create future events.
ASSERT_OR_PRINT (mongoc_collection_insert_one (coll, tmp_bson ("{'_id': 1}"), NULL, NULL, &error), error);
ASSERT_OR_PRINT (mongoc_collection_insert_one (coll, tmp_bson ("{'_id': 2}"), NULL, NULL, &error), error);
mongoc_change_stream_destroy (cs);
mongoc_collection_destroy (coll);
mongoc_client_destroy (client);
}


// Create another change stream with the resumeToken and batchSize:0.
test_events_t te = {.commands_len = 0, .replies_len = 0};
{
mongoc_client_t *client = test_framework_new_default_client ();
// Capture events.
{
mongoc_apm_callbacks_t *cbs = mongoc_apm_callbacks_new ();
mongoc_apm_set_command_started_cb (cbs, test_events_started_cb);
mongoc_apm_set_command_succeeded_cb (cbs, test_events_succeeded_cb);
ASSERT (mongoc_client_set_apm_callbacks (client, cbs, &te));
mongoc_apm_callbacks_destroy (cbs);
}
mongoc_collection_t *coll = mongoc_client_get_collection (client, "db", "coll");
// Iterate change stream.
{
bson_t *opts = BCON_NEW ("resumeAfter", BCON_DOCUMENT (resumeToken), "batchSize", BCON_INT32 (0));
mongoc_change_stream_t *cs = mongoc_collection_watch (coll, tmp_bson ("{}"), opts);
const bson_t *ignored;
while (mongoc_change_stream_next (cs, &ignored))
;
ASSERT_OR_PRINT (!mongoc_change_stream_error_document (cs, &error, NULL), error);
bson_destroy (opts);
mongoc_change_stream_destroy (cs);
}
mongoc_collection_destroy (coll);

// Check captured events.
{
// Expect aggregate is sent with `batchSize:0`
ASSERT (te.commands[0]);
ASSERT_MATCH (te.commands[0], BSON_STR ({"aggregate" : "coll", "cursor" : {"batchSize" : 0}}));
// Expect reply has no documents.
ASSERT (te.replies[0]);
ASSERT_MATCH (te.replies[0], BSON_STR ({"cursor" : {"firstBatch" : []}}));

// Expect getMore is sent without `batchSize`
ASSERT (te.commands[1]);
ASSERT_MATCH (te.commands[1], BSON_STR ({"getMore" : {"$$type" : "long"}, "batchSize" : {"$exists" : false}}));
// Expect reply has both documents.
ASSERT (te.replies[1]);
ASSERT_MATCH (
te.replies[1],
BSON_STR ({"cursor" : {"nextBatch" : [ {"operationType" : "insert"}, {"operationType" : "insert"} ]}}));

// Expect another getMore is sent without `batchSize`
ASSERT (te.commands[2]);
ASSERT_MATCH (te.commands[2], BSON_STR ({"getMore" : {"$$type" : "long"}, "batchSize" : {"$exists" : false}}));
// Expect reply has no more documents
ASSERT (te.replies[2]);
ASSERT_MATCH (te.replies[2], BSON_STR ({"cursor" : {"nextBatch" : []}}));

// Expect killCursors is sent to kill server-side cursor.
ASSERT (te.commands[3]);
ASSERT_MATCH (te.commands[3], BSON_STR ({"killCursors" : "coll"}));
ASSERT (te.replies[3]);
ASSERT_MATCH (te.replies[3], BSON_STR ({"ok" : 1}));

ASSERT (!te.commands[4]);
ASSERT (!te.replies[4]);
}
mongoc_client_destroy (client);
}

bson_destroy (resumeToken);
for (size_t i = 0; i < te.commands_len; i++) {
bson_destroy (te.commands[i]);
}
for (size_t i = 0; i < te.replies_len; i++) {
bson_destroy (te.replies[i]);
}
}


void
test_change_stream_install (TestSuite *suite)
Expand Down Expand Up @@ -2267,4 +2384,10 @@ test_change_stream_install (TestSuite *suite)
test_framework_skip_if_not_rs_version_7);
TestSuite_AddMockServerTest (suite, "/change_streams/prose_test_17", prose_test_17);
TestSuite_AddMockServerTest (suite, "/change_streams/prose_test_18", prose_test_18);
TestSuite_AddFull (suite,
"/change_stream/batchSize0",
test_change_stream_batchSize0,
NULL,
NULL,
test_framework_skip_if_not_replset);
}