Skip to content

Commit 27d2a68

Browse files
authored
CDRIVER-5925 apply batchSize:0 to aggregate in change stream (#1909)
Apply `batchSize:0` to `aggregate` for change stream when specified. Previously it was ignored.
1 parent 0dfdb56 commit 27d2a68

File tree

6 files changed

+145
-7
lines changed

6 files changed

+145
-7
lines changed

NEWS

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
Unreleased (2.0.0)
22
==================
33

4+
Changes:
5+
6+
* Passing `batchSize:0` as an option to `mongoc_client_watch`, `mongoc_database_watch`, or `mongoc_collection_watch`
7+
now applies `batchSize:0` to the `aggregate` command. Useful to request an immediate cursor. Previously the value
8+
was ignored.
9+
410
Removals:
511

612
* The compatibility "forwarding" headers have been removed (previously added

build/generate-opts.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def flatten(items):
4949

5050
class Struct(OrderedDict):
5151
def __init__(self, items, opts_name='opts', generate_rst=True,
52-
generate_code=True, allow_extra=True, **defaults):
52+
generate_code=True, allow_extra=True, rst_prelude=None, **defaults):
5353
"""Define an options struct.
5454
5555
- items: List of pairs: (optionName, info)
@@ -64,6 +64,7 @@ def __init__(self, items, opts_name='opts', generate_rst=True,
6464
self.generate_code = generate_code
6565
self.allow_extra = allow_extra
6666
self.defaults = defaults
67+
self.rst_prelude = rst_prelude
6768

6869
def default(self, item, fallback):
6970
return self.defaults.get(item, fallback)
@@ -293,7 +294,7 @@ def __init__(self, items, **defaults):
293294
], limit=0, allow_extra=False)),
294295

295296
('mongoc_change_stream_opts_t', Struct([
296-
('batchSize', {'type': 'int32_t', 'help': 'An ``int32`` representing number of documents requested to be returned on each call to :symbol:`mongoc_change_stream_next`'}),
297+
('batchSize', {'type': 'int32_t', 'help': 'An ``int32`` requesting a limit of documents returned in each server reply. If positive, the ``batchSize`` is applied to both ``aggregate`` and ``getMore`` commands. If 0, the ``batchSize`` is only applied to the ``aggregate`` command (Useful to request an immediate cursor without significant server-side work. See `Aggregate Data Specifying Batch Size <https://www.mongodb.com/docs/manual/reference/command/aggregate/#aggregate-data-specifying-batch-size>`_). If omitted or negative, the value is ignored and server defaults are used (See `Cursor Batches <https://www.mongodb.com/docs/manual/core/cursors/#cursor-batches>`_ for a description of server defaults).'}),
297298
('resumeAfter', {'type': 'document', 'help': 'A ``Document`` representing the logical starting point of the change stream. The result of :symbol:`mongoc_change_stream_get_resume_token()` or the ``_id`` field of any change received from a change stream can be used here. This option is mutually exclusive with ``startAfter`` and ``startAtOperationTime``.'}),
298299
('startAfter', {'type': 'document', 'help': 'A ``Document`` representing the logical starting point of the change stream. Unlike ``resumeAfter``, this can resume notifications after an "invalidate" event. The result of :symbol:`mongoc_change_stream_get_resume_token()` or the ``_id`` field of any change received from a change stream can be used here. This option is mutually exclusive with ``resumeAfter`` and ``startAtOperationTime``.'}),
299300
('startAtOperationTime', {'type': 'timestamp', 'help': 'A ``Timestamp``. The change stream only provides changes that occurred at or after the specified timestamp. Any command run against the server will return an operation time that can be used here. This option is mutually exclusive with ``resumeAfter`` and ``startAfter``.'}),
@@ -317,7 +318,7 @@ def __init__(self, items, **defaults):
317318
}),
318319
('showExpandedEvents', { 'type': 'bool', 'help': 'Set to ``true`` to return an expanded list of change stream events. Available only on MongoDB versions >=6.0'}),
319320
comment_option_string_pre_4_4,
320-
], fullDocument=None, fullDocumentBeforeChange=None)),
321+
], fullDocument=None, fullDocumentBeforeChange=None, batchSize=-1, rst_prelude=".. versionchanged:: 2.0.0 ``batchSize`` of 0 is applied to the ``aggregate`` command. 0 was previously ignored.")),
321322

322323
('mongoc_create_index_opts_t', Struct([
323324
write_concern_option,
@@ -474,6 +475,9 @@ def document_opts(struct, f):
474475
print(file_name)
475476
f = open(joinpath(doc_includes, file_name), 'w')
476477
f.write (disclaimer)
478+
if struct.rst_prelude is not None:
479+
f.write(struct.rst_prelude)
480+
f.write("\n\n")
477481
f.write(
478482
"``%s`` may be NULL or a BSON document with additional"
479483
" command options:\n\n" % struct.opts_name)

src/libmongoc/doc/includes/change-stream-opts.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
Generated with build/generate-opts.py
44
DO NOT EDIT THIS FILE
55

6+
.. versionchanged:: 2.0.0 ``batchSize`` of 0 is applied to the ``aggregate`` command. 0 was previously ignored.
7+
68
``opts`` may be NULL or a BSON document with additional command options:
79

8-
* ``batchSize``: An ``int32`` representing number of documents requested to be returned on each call to :symbol:`mongoc_change_stream_next`
10+
* ``batchSize``: An ``int32`` requesting a limit of documents returned in each server reply. If positive, the ``batchSize`` is applied to both ``aggregate`` and ``getMore`` commands. If 0, the ``batchSize`` is only applied to the ``aggregate`` command (Useful to request an immediate cursor without significant server-side work. See `Aggregate Data Specifying Batch Size <https://www.mongodb.com/docs/manual/reference/command/aggregate/#aggregate-data-specifying-batch-size>`_). If omitted or negative, the value is ignored and server defaults are used (See `Cursor Batches <https://www.mongodb.com/docs/manual/core/cursors/#cursor-batches>`_ for a description of server defaults).
911
* ``resumeAfter``: A ``Document`` representing the logical starting point of the change stream. The result of :symbol:`mongoc_change_stream_get_resume_token()` or the ``_id`` field of any change received from a change stream can be used here. This option is mutually exclusive with ``startAfter`` and ``startAtOperationTime``.
1012
* ``startAfter``: A ``Document`` representing the logical starting point of the change stream. Unlike ``resumeAfter``, this can resume notifications after an "invalidate" event. The result of :symbol:`mongoc_change_stream_get_resume_token()` or the ``_id`` field of any change received from a change stream can be used here. This option is mutually exclusive with ``resumeAfter`` and ``startAtOperationTime``.
1113
* ``startAtOperationTime``: A ``Timestamp``. The change stream only provides changes that occurred at or after the specified timestamp. Any command run against the server will return an operation time that can be used here. This option is mutually exclusive with ``resumeAfter`` and ``startAfter``.

src/libmongoc/src/mongoc/mongoc-change-stream.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,10 @@ _make_command (mongoc_change_stream_t *stream, bson_t *command)
192192

193193
/* Add batch size if needed */
194194
bson_append_document_begin (command, "cursor", 6, &cursor_doc);
195-
if (stream->batch_size > 0) {
195+
if (stream->batch_size >= 0) {
196+
// `batchSize:0` is supported and applied to `aggregate`. `batchSize:0` requests an immediate cursor. This is
197+
// useful to avoid a long-running server-side aggregate. Once created, `mongoc_change_stream_destroy` can use
198+
// `killCursors` to kill the server-side cursor.
196199
bson_append_int32 (&cursor_doc, "batchSize", 9, stream->batch_size);
197200
}
198201
bson_append_document_end (command, &cursor_doc);
@@ -377,7 +380,7 @@ _change_stream_init (mongoc_change_stream_t *stream, const bson_t *pipeline, con
377380

378381
_mongoc_timestamp_set (&stream->operation_time, &stream->opts.startAtOperationTime);
379382

380-
stream->batch_size = stream->opts.batchSize;
383+
stream->batch_size = stream->opts.batchSize; // `stream->opts.batchSize` is -1 if not present in `opts`.
381384
stream->max_await_time_ms = stream->opts.maxAwaitTimeMS;
382385
stream->show_expanded_events = stream->opts.showExpandedEvents;
383386

src/libmongoc/src/mongoc/mongoc-opts.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1650,7 +1650,7 @@ _mongoc_change_stream_opts_parse (
16501650

16511651
BSON_ASSERT (client || true); // client may be NULL.
16521652

1653-
mongoc_change_stream_opts->batchSize = 0;
1653+
mongoc_change_stream_opts->batchSize = -1;
16541654
bson_init (&mongoc_change_stream_opts->resumeAfter);
16551655
bson_init (&mongoc_change_stream_opts->startAfter);
16561656
memset (&mongoc_change_stream_opts->startAtOperationTime, 0, sizeof (mongoc_timestamp_t));

src/libmongoc/tests/test-mongoc-change-stream.c

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2124,6 +2124,123 @@ prose_test_18 (void)
21242124
mock_server_destroy (server);
21252125
}
21262126

2127+
typedef struct {
2128+
bson_t *commands[6];
2129+
size_t commands_len;
2130+
bson_t *replies[6];
2131+
size_t replies_len;
2132+
} test_events_t;
2133+
2134+
static void
2135+
test_events_started_cb (const mongoc_apm_command_started_t *e)
2136+
{
2137+
test_events_t *te = mongoc_apm_command_started_get_context (e);
2138+
ASSERT_CMPSIZE_T (te->commands_len, <, sizeof (te->commands) / sizeof (te->commands[0]));
2139+
te->commands[te->commands_len++] = bson_copy (mongoc_apm_command_started_get_command (e));
2140+
}
2141+
2142+
static void
2143+
test_events_succeeded_cb (const mongoc_apm_command_succeeded_t *e)
2144+
{
2145+
test_events_t *te = mongoc_apm_command_succeeded_get_context (e);
2146+
ASSERT_CMPSIZE_T (te->replies_len, <, sizeof (te->replies) / sizeof (te->replies[0]));
2147+
te->replies[te->replies_len++] = bson_copy (mongoc_apm_command_succeeded_get_reply (e));
2148+
}
2149+
2150+
// Test that batchSize:0 is applied to the `aggregate` command.
2151+
static void
2152+
test_change_stream_batchSize0 (void *test_ctx)
2153+
{
2154+
bson_error_t error;
2155+
2156+
// Create a change stream. Capture a resume token. Insert documents to create future events.
2157+
bson_t *resumeToken;
2158+
{
2159+
mongoc_client_t *client = test_framework_new_default_client ();
2160+
mongoc_collection_t *coll = drop_and_get_coll (client, "db", "coll");
2161+
mongoc_change_stream_t *cs = mongoc_collection_watch (coll, tmp_bson ("{}"), NULL);
2162+
resumeToken = bson_copy (mongoc_change_stream_get_resume_token (cs));
2163+
// Insert documents to create future events.
2164+
ASSERT_OR_PRINT (mongoc_collection_insert_one (coll, tmp_bson ("{'_id': 1}"), NULL, NULL, &error), error);
2165+
ASSERT_OR_PRINT (mongoc_collection_insert_one (coll, tmp_bson ("{'_id': 2}"), NULL, NULL, &error), error);
2166+
mongoc_change_stream_destroy (cs);
2167+
mongoc_collection_destroy (coll);
2168+
mongoc_client_destroy (client);
2169+
}
2170+
2171+
2172+
// Create another change stream with the resumeToken and batchSize:0.
2173+
test_events_t te = {.commands_len = 0, .replies_len = 0};
2174+
{
2175+
mongoc_client_t *client = test_framework_new_default_client ();
2176+
// Capture events.
2177+
{
2178+
mongoc_apm_callbacks_t *cbs = mongoc_apm_callbacks_new ();
2179+
mongoc_apm_set_command_started_cb (cbs, test_events_started_cb);
2180+
mongoc_apm_set_command_succeeded_cb (cbs, test_events_succeeded_cb);
2181+
ASSERT (mongoc_client_set_apm_callbacks (client, cbs, &te));
2182+
mongoc_apm_callbacks_destroy (cbs);
2183+
}
2184+
mongoc_collection_t *coll = mongoc_client_get_collection (client, "db", "coll");
2185+
// Iterate change stream.
2186+
{
2187+
bson_t *opts = BCON_NEW ("resumeAfter", BCON_DOCUMENT (resumeToken), "batchSize", BCON_INT32 (0));
2188+
mongoc_change_stream_t *cs = mongoc_collection_watch (coll, tmp_bson ("{}"), opts);
2189+
const bson_t *ignored;
2190+
while (mongoc_change_stream_next (cs, &ignored))
2191+
;
2192+
ASSERT_OR_PRINT (!mongoc_change_stream_error_document (cs, &error, NULL), error);
2193+
bson_destroy (opts);
2194+
mongoc_change_stream_destroy (cs);
2195+
}
2196+
mongoc_collection_destroy (coll);
2197+
2198+
// Check captured events.
2199+
{
2200+
// Expect aggregate is sent with `batchSize:0`
2201+
ASSERT (te.commands[0]);
2202+
ASSERT_MATCH (te.commands[0], BSON_STR ({"aggregate" : "coll", "cursor" : {"batchSize" : 0}}));
2203+
// Expect reply has no documents.
2204+
ASSERT (te.replies[0]);
2205+
ASSERT_MATCH (te.replies[0], BSON_STR ({"cursor" : {"firstBatch" : []}}));
2206+
2207+
// Expect getMore is sent without `batchSize`
2208+
ASSERT (te.commands[1]);
2209+
ASSERT_MATCH (te.commands[1], BSON_STR ({"getMore" : {"$$type" : "long"}, "batchSize" : {"$exists" : false}}));
2210+
// Expect reply has both documents.
2211+
ASSERT (te.replies[1]);
2212+
ASSERT_MATCH (
2213+
te.replies[1],
2214+
BSON_STR ({"cursor" : {"nextBatch" : [ {"operationType" : "insert"}, {"operationType" : "insert"} ]}}));
2215+
2216+
// Expect another getMore is sent without `batchSize`
2217+
ASSERT (te.commands[2]);
2218+
ASSERT_MATCH (te.commands[2], BSON_STR ({"getMore" : {"$$type" : "long"}, "batchSize" : {"$exists" : false}}));
2219+
// Expect reply has no more documents
2220+
ASSERT (te.replies[2]);
2221+
ASSERT_MATCH (te.replies[2], BSON_STR ({"cursor" : {"nextBatch" : []}}));
2222+
2223+
// Expect killCursors is sent to kill server-side cursor.
2224+
ASSERT (te.commands[3]);
2225+
ASSERT_MATCH (te.commands[3], BSON_STR ({"killCursors" : "coll"}));
2226+
ASSERT (te.replies[3]);
2227+
ASSERT_MATCH (te.replies[3], BSON_STR ({"ok" : 1}));
2228+
2229+
ASSERT (!te.commands[4]);
2230+
ASSERT (!te.replies[4]);
2231+
}
2232+
mongoc_client_destroy (client);
2233+
}
2234+
2235+
bson_destroy (resumeToken);
2236+
for (size_t i = 0; i < te.commands_len; i++) {
2237+
bson_destroy (te.commands[i]);
2238+
}
2239+
for (size_t i = 0; i < te.replies_len; i++) {
2240+
bson_destroy (te.replies[i]);
2241+
}
2242+
}
2243+
21272244

21282245
void
21292246
test_change_stream_install (TestSuite *suite)
@@ -2267,4 +2384,10 @@ test_change_stream_install (TestSuite *suite)
22672384
test_framework_skip_if_not_rs_version_7);
22682385
TestSuite_AddMockServerTest (suite, "/change_streams/prose_test_17", prose_test_17);
22692386
TestSuite_AddMockServerTest (suite, "/change_streams/prose_test_18", prose_test_18);
2387+
TestSuite_AddFull (suite,
2388+
"/change_stream/batchSize0",
2389+
test_change_stream_batchSize0,
2390+
NULL,
2391+
NULL,
2392+
test_framework_skip_if_not_replset);
22702393
}

0 commit comments

Comments
 (0)