Skip to content

CDRIVER-4316 Allow both int32 and int64 for batchSize in cursor opts #1300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 15, 2023
29 changes: 26 additions & 3 deletions src/libmongoc/src/mongoc/mongoc-cursor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1449,9 +1449,32 @@ void
mongoc_cursor_set_batch_size (mongoc_cursor_t *cursor, uint32_t batch_size)
{
BSON_ASSERT (cursor);

_mongoc_cursor_set_opt_int64 (
cursor, MONGOC_CURSOR_BATCH_SIZE, (int64_t) batch_size);
bson_iter_t iter;
if (!bson_iter_init_find (&iter, &cursor->opts, MONGOC_CURSOR_BATCH_SIZE)) {
bson_append_int64 (&cursor->opts,
MONGOC_CURSOR_BATCH_SIZE,
MONGOC_CURSOR_BATCH_SIZE_LEN,
batch_size);
} else if (BSON_ITER_HOLDS_INT64 (&iter)) {
bson_iter_overwrite_int64 (&iter, (int64_t) batch_size);
} else if (BSON_ITER_HOLDS_INT32 (&iter)) {
if (!bson_in_range_int32_t_unsigned (batch_size)) {
MONGOC_WARNING ("unable to overwrite stored int32 batchSize with "
"out-of-range value %" PRIu32,
batch_size);
return;
}
bson_iter_overwrite_int32 (&iter, (int32_t) batch_size);
} else if (BSON_ITER_HOLDS_DOUBLE (&iter)) {
bson_iter_overwrite_double (&iter, (double) batch_size);
} else if (BSON_ITER_HOLDS_DECIMAL128 (&iter)) {
bson_decimal128_t val;
val.high = 0x3040000000000000;
val.low = (uint64_t) batch_size;
bson_iter_overwrite_decimal128 (&iter, &val);
} else {
MONGOC_WARNING ("unable to overwrite non-numeric stored batchSize");
}
}


Expand Down
83 changes: 83 additions & 0 deletions src/libmongoc/tests/test-mongoc-collection-find-with-opts.c
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,88 @@ test_server_id_option (void)
mongoc_client_destroy (client);
}

static void
test_find_batchSize (void)
{
mongoc_client_t *client;
mongoc_collection_t *collection;
bson_error_t error;
mongoc_cursor_t *cursor;

client = test_framework_new_default_client ();
collection = mongoc_client_get_collection (client, "db", "collection");

// Test a cursor with an int32 batchSize.
{
cursor = mongoc_collection_find_with_opts (
collection,
tmp_bson ("{}"),
tmp_bson ("{'batchSize': { '$numberInt': '1' }}"),
NULL);

ASSERT_OR_PRINT (!mongoc_cursor_error (cursor, &error), error);

// Exhaust the cursor.
{
// Note: Cursors are lazy. The `find` command is sent on the first call
// to `mongoc_cursor_next`.
const bson_t *got;
while (mongoc_cursor_next (cursor, &got))
;
}

ASSERT_OR_PRINT (!mongoc_cursor_error (cursor, &error), error);
mongoc_cursor_destroy (cursor);
}

// Test a cursor with an int64 batchSize.
{
cursor = mongoc_collection_find_with_opts (
collection,
tmp_bson ("{}"),
tmp_bson ("{'batchSize': { '$numberLong': '1' }}"),
NULL);

ASSERT_OR_PRINT (!mongoc_cursor_error (cursor, &error), error);

// Exhaust the cursor.
{
// Note: Cursors are lazy. The `find` command is sent on the first call
// to `mongoc_cursor_next`.
const bson_t *got;
while (mongoc_cursor_next (cursor, &got))
;
}

ASSERT_OR_PRINT (!mongoc_cursor_error (cursor, &error), error);
mongoc_cursor_destroy (cursor);
}

// Test a cursor with a string batchSize.
{
cursor = mongoc_collection_find_with_opts (
collection, tmp_bson ("{}"), tmp_bson ("{'batchSize': 'foo'}"), NULL);

ASSERT_OR_PRINT (!mongoc_cursor_error (cursor, &error), error);

// Attempt to exhaust the cursor.
{
// Note: Cursors are lazy. The `find` command is sent on the first call
// to `mongoc_cursor_next`.
const bson_t *got;
while (mongoc_cursor_next (cursor, &got))
;
}

// Expect an error from the server.
ASSERT (mongoc_cursor_error (cursor, &error));
mongoc_cursor_destroy (cursor);
}

mongoc_collection_destroy (collection);
mongoc_client_destroy (client);
}

void
test_collection_find_with_opts_install (TestSuite *suite)
{
Expand Down Expand Up @@ -920,4 +1002,5 @@ test_collection_find_with_opts_install (TestSuite *suite)
TestSuite_AddLive (suite,
"/Collection/find_with_opts/server_id/option",
test_server_id_option);
TestSuite_AddLive (suite, "/Collection/find/batchSize", test_find_batchSize);
}
209 changes: 209 additions & 0 deletions src/libmongoc/tests/test-mongoc-cursor.c
Original file line number Diff line number Diff line change
Expand Up @@ -2350,6 +2350,200 @@ test_find_error_is_alive (void)
mongoc_client_destroy (client);
}

typedef struct _started_event_t {
char *command_name;
bson_t *command;
} started_event_t;

static void
command_started (const mongoc_apm_command_started_t *event)
{
mongoc_array_t *events =
(mongoc_array_t *) mongoc_apm_command_started_get_context (event);
started_event_t *started_event = bson_malloc0 (sizeof (started_event_t));

started_event->command =
bson_copy (mongoc_apm_command_started_get_command (event));
started_event->command_name =
bson_strdup (mongoc_apm_command_started_get_command_name (event));
_mongoc_array_append_val (events, started_event);
}

static void
clear_started_events (mongoc_array_t *events)
{
for (size_t i = 0; i < events->len; i++) {
started_event_t *started_event =
_mongoc_array_index (events, started_event_t *, i);
bson_destroy (started_event->command);
bson_free (started_event->command_name);
bson_free (started_event);
}
_mongoc_array_clear (events);
}

void
numeric_iter_eq (bson_iter_t *iter, int64_t val)
{
ASSERT_CMPINT64 (bson_iter_as_int64 (iter), ==, val);
}

void
decimal128_iter_eq (bson_iter_t *iter, int64_t val)
{
bson_decimal128_t d;
bson_iter_decimal128 (iter, &d);
ASSERT_CMPUINT64 (d.high, ==, 0x3040000000000000);
ASSERT_CMPINT64 (d.low, ==, val);
}

void
test_cursor_batchsize_override (bson_t *findopts,
void (*assert_eq) (bson_iter_t *, int64_t))
{
mongoc_client_t *client;
mongoc_apm_callbacks_t *cbs;
mongoc_collection_t *coll;
bson_error_t error;
mongoc_array_t started_events;

client = test_framework_new_default_client ();
cbs = mongoc_apm_callbacks_new ();
_mongoc_array_init (&started_events, sizeof (started_event_t *));
mongoc_apm_set_command_started_cb (cbs, command_started);
coll = mongoc_client_get_collection (client, "db", "coll");

/* Drop and insert two documents into the collection */
{
bson_t *to_insert = BCON_NEW ("x", "y");

// Ignore "ns not found" error on drop.
mongoc_collection_drop (coll, NULL);
ASSERT_OR_PRINT (
mongoc_collection_insert_one (
coll, to_insert, NULL /* opts */, NULL /* reply */, &error),
error);
ASSERT_OR_PRINT (
mongoc_collection_insert_one (
coll, to_insert, NULL /* opts */, NULL /* reply */, &error),
error);
bson_destroy (to_insert);
}

mongoc_client_set_apm_callbacks (client, cbs, &started_events);

/* Create a cursor and iterate once. */
{
const bson_t *got;
bson_t *filter = bson_new ();
mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (
coll, filter, findopts, NULL /* read_prefs */);
/* Attempt to overwrite the 'batchSize' with 2. */
mongoc_cursor_set_batch_size (cursor, 2);
/* Assert no command started events. The cursor does not send 'find' until
* the first call to mongoc_cursor_next. */
ASSERT_CMPSIZE_T (started_events.len, ==, 0);
/* Iterate once. */
ASSERT (mongoc_cursor_next (cursor, &got));

mongoc_cursor_destroy (cursor);
bson_destroy (findopts);
bson_destroy (filter);
}

/* Check events. */
{
started_event_t *started_event;
bson_iter_t iter;
/* Expect first event is find. */
started_event =
_mongoc_array_index (&started_events, started_event_t *, 0);
ASSERT_CMPSTR (started_event->command_name, "find");
/* Expect the batchSize sent to be 2. */
ASSERT (bson_iter_init_find (&iter, started_event->command, "batchSize"));
assert_eq (&iter, 2);
}

mongoc_collection_destroy (coll);
mongoc_apm_callbacks_destroy (cbs);
mongoc_client_destroy (client);

clear_started_events (&started_events);
_mongoc_array_destroy (&started_events);
}

/* Test that mongoc_cursor_set_batch_size overrides a previously set int32
* batchSize. */
void
test_cursor_batchsize_override_int32 (void)
{
bson_t *findopts = BCON_NEW ("batchSize", BCON_INT32 (1));
test_cursor_batchsize_override (findopts, numeric_iter_eq);
}

/* Test that mongoc_cursor_set_batch_size overrides a previously set int64
* batchSize. */
void
test_cursor_batchsize_override_int64 (void)
{
bson_t *findopts = BCON_NEW ("batchSize", BCON_INT64 (1));
test_cursor_batchsize_override (findopts, numeric_iter_eq);
}

/* Test that mongoc_cursor_set_batch_size overrides a previously set double
* batchSize. */
void
test_cursor_batchsize_override_double (void)
{
bson_t *findopts = BCON_NEW ("batchSize", BCON_DOUBLE (1.0));
test_cursor_batchsize_override (findopts, numeric_iter_eq);
}

/* Test that mongoc_cursor_set_batch_size overrides a previously set decimal128
* batchSize. */
void
test_cursor_batchsize_override_decimal128 (void)
{
bson_decimal128_t start_val;
bson_decimal128_from_string ("1", &start_val);
bson_t *findopts = BCON_NEW ("batchSize", BCON_DECIMAL128 (&start_val));
test_cursor_batchsize_override (findopts, decimal128_iter_eq);
}

/* Test that attempting to overwrite an int32 batchSize with an out-of-range
* value raises a warning */
void
test_cursor_batchsize_override_range_warning (void)
{
mongoc_client_t *client;
mongoc_collection_t *coll;
bson_t *findopts = BCON_NEW ("batchSize", BCON_INT32 (1.0));

client = test_framework_new_default_client ();
coll = mongoc_client_get_collection (client, "db", "coll");

/* Create a cursor and attempt to override outside int32 range. */
{
bson_t *filter = bson_new ();
mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (
coll, filter, findopts, NULL /* read_prefs */);

capture_logs (true);
/* Attempt to overwrite the 'batchSize' with uint32_max. */
mongoc_cursor_set_batch_size (cursor, UINT32_MAX);
ASSERT_CAPTURED_LOG (
"mongoc_cursor_set_batch_size",
MONGOC_LOG_LEVEL_WARNING,
"unable to overwrite stored int32 batchSize with out-of-range value");

mongoc_cursor_destroy (cursor);
bson_destroy (findopts);
bson_destroy (filter);
}

mongoc_collection_destroy (coll);
mongoc_client_destroy (client);
}

void
test_cursor_install (TestSuite *suite)
Expand Down Expand Up @@ -2436,4 +2630,19 @@ test_cursor_install (TestSuite *suite)
suite, "/Cursor/error_document/command", test_error_document_command);
TestSuite_AddLive (
suite, "/Cursor/find_error/is_alive", test_find_error_is_alive);
TestSuite_AddLive (suite,
"/Cursor/batchsize_override_int32",
test_cursor_batchsize_override_int32);
TestSuite_AddLive (suite,
"/Cursor/batchsize_override_int64",
test_cursor_batchsize_override_int64);
TestSuite_AddLive (suite,
"/Cursor/batchsize_override_double",
test_cursor_batchsize_override_double);
TestSuite_AddLive (suite,
"/Cursor/batchsize_override_decimal128",
test_cursor_batchsize_override_decimal128);
TestSuite_AddLive (suite,
"/Cursor/batchsize_override_range_warning",
test_cursor_batchsize_override_range_warning);
}