Skip to content

Commit 5d47589

Browse files
authored
CDRIVER-4130 Unconditionally use OP_MSG for server 5.1 and newer (#912)
* Warn and ignore exhaust cursor option given server 5.1 or newer * Unconditionally use OP_MSG for find command given server 5.1 or newer * Unconditionally use OP_MSG for getMore command given server 5.1 or newer * Avoid unnecessary exhaust cursor in /counters/op_msg test * Document exhaust option behavior for find command * Add /Client/exhaust_cursor/fallback test for server 5.1 and newer
1 parent 59aeacb commit 5d47589

File tree

7 files changed

+112
-32
lines changed

7 files changed

+112
-32
lines changed

src/libmongoc/doc/mongoc_collection_find_with_opts.rst

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ Option BSON type Option BSON type
132132
``allowDiskUse`` bool
133133
======================= ================== =================== ==================
134134

135-
All options are documented in the reference page for `the "find" command`_ in the MongoDB server manual, except for "maxAwaitTimeMS" and "sessionId".
135+
All options are documented in the reference page for `the "find" command`_ in the MongoDB server manual, except for "maxAwaitTimeMS", "sessionId", and "exhaust".
136136

137137
"maxAwaitTimeMS" is the maximum amount of time for the server to wait on new documents to satisfy a query, if "tailable" and "awaitData" are both true.
138138
If no new documents are found, the tailable cursor receives an empty batch. The "maxAwaitTimeMS" option is ignored for MongoDB older than 3.4.
@@ -141,6 +141,8 @@ To add a "sessionId", construct a :symbol:`mongoc_client_session_t` with :symbol
141141

142142
To add a "readConcern", construct a :symbol:`mongoc_read_concern_t` with :symbol:`mongoc_read_concern_new` and configure it with :symbol:`mongoc_read_concern_set_level`. Then use :symbol:`mongoc_read_concern_append` to add the read concern to ``opts``.
143143

144+
"exhaust" requests the construction of an exhaust cursor. For MongoDB servers before 5.1, this option converts the command into a legacy OP_QUERY message. For MongoDB servers 5.1 and newer, this option is ignored and a normal cursor is constructed instead.
145+
144146
For some options like "collation", the driver returns an error if the server version is too old to support the feature.
145147
Any fields in ``opts`` that are not listed here are passed to the server unmodified.
146148

@@ -153,7 +155,7 @@ The ``snapshot`` boolean option is removed in MongoDB 4.0. The ``maxScan`` optio
153155

154156
.. seealso::
155157

156-
| `The "find" command`_ in the MongoDB Manual. All options listed there are supported by the C Driver. For MongoDB servers before 3.2, or for exhaust queries, the driver transparently converts the query to a legacy OP_QUERY message.
158+
| `The "find" command`_ in the MongoDB Manual. All options listed there are supported by the C Driver. For MongoDB servers before 3.2, the driver transparently converts the query to a legacy OP_QUERY message.
157159
158160
.. _the "find" command: https://docs.mongodb.org/master/reference/command/find/
159161

src/libmongoc/src/mongoc/mongoc-cmd.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,8 @@ mongoc_cmd_parts_append_opts (mongoc_cmd_parts_t *parts,
181181
parts->assembled.session = cs;
182182
continue;
183183
} else if (BSON_ITER_IS_KEY (iter, "serverId") ||
184-
BSON_ITER_IS_KEY (iter, "maxAwaitTimeMS")) {
184+
BSON_ITER_IS_KEY (iter, "maxAwaitTimeMS") ||
185+
BSON_ITER_IS_KEY (iter, "exhaust")) {
185186
continue;
186187
}
187188

@@ -629,7 +630,8 @@ _mongoc_cmd_parts_assemble_mongod (mongoc_cmd_parts_t *parts,
629630
case MONGOC_TOPOLOGY_LOAD_BALANCED:
630631
case MONGOC_TOPOLOGY_DESCRIPTION_TYPES:
631632
default:
632-
/* must not call this function w/ sharded, load balanced, or unknown topology type */
633+
/* must not call this function w/ sharded, load balanced, or unknown
634+
* topology type */
633635
BSON_ASSERT (false);
634636
}
635637
} /* if (!parts->is_write_command) */
@@ -1024,7 +1026,8 @@ mongoc_cmd_parts_assemble (mongoc_cmd_parts_t *parts,
10241026
ret = true;
10251027
} else if (server_type == MONGOC_SERVER_MONGOS ||
10261028
server_stream->topology_type == MONGOC_TOPOLOGY_LOAD_BALANCED) {
1027-
/* TODO (CDRIVER-4117) remove the check of the topology description type. */
1029+
/* TODO (CDRIVER-4117) remove the check of the topology description type.
1030+
*/
10281031
_mongoc_cmd_parts_assemble_mongos (parts, server_stream);
10291032
ret = true;
10301033
} else {

src/libmongoc/src/mongoc/mongoc-cursor-cmd.c

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ static getmore_type_t
3737
_getmore_type (mongoc_cursor_t *cursor)
3838
{
3939
mongoc_server_stream_t *server_stream;
40-
bool use_cmd;
40+
int32_t wire_version;
4141
data_cmd_t *data = (data_cmd_t *) cursor->impl.data;
4242
if (data->getmore_type != UNKNOWN) {
4343
return data->getmore_type;
@@ -46,10 +46,21 @@ _getmore_type (mongoc_cursor_t *cursor)
4646
if (!server_stream) {
4747
return UNKNOWN;
4848
}
49-
use_cmd = server_stream->sd->max_wire_version >= WIRE_VERSION_FIND_CMD &&
50-
!_mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_EXHAUST);
51-
data->getmore_type = use_cmd ? GETMORE_CMD : OP_GETMORE;
49+
wire_version = server_stream->sd->max_wire_version;
5250
mongoc_server_stream_cleanup (server_stream);
51+
52+
if (
53+
/* Server version 5.1 and newer do not support OP_GETMORE. */
54+
wire_version > WIRE_VERSION_5_0 ||
55+
/* Fallback to legacy OP_GETMORE wire protocol messages if exhaust cursor
56+
requested with server version 3.6 or newer . */
57+
(wire_version >= WIRE_VERSION_FIND_CMD &&
58+
!_mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_EXHAUST))) {
59+
data->getmore_type = GETMORE_CMD;
60+
} else {
61+
data->getmore_type = OP_GETMORE;
62+
}
63+
5364
return data->getmore_type;
5465
}
5566

src/libmongoc/src/mongoc/mongoc-cursor-find.c

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ _mongoc_cursor_impl_find_opquery_init (mongoc_cursor_t *cursor, bson_t *filter);
3232
static mongoc_cursor_state_t
3333
_prime (mongoc_cursor_t *cursor)
3434
{
35-
bool use_find_command;
35+
int32_t wire_version;
3636
mongoc_server_stream_t *server_stream;
3737
data_find_t *data = (data_find_t *) cursor->impl.data;
3838

@@ -41,15 +41,17 @@ _prime (mongoc_cursor_t *cursor)
4141
if (!server_stream) {
4242
return DONE;
4343
}
44-
/* find_getmore_killcursors spec:
45-
* "The find command does not support the exhaust flag from OP_QUERY." */
46-
use_find_command =
47-
server_stream->sd->max_wire_version >= WIRE_VERSION_FIND_CMD &&
48-
!_mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_EXHAUST);
44+
wire_version = server_stream->sd->max_wire_version;
4945
mongoc_server_stream_cleanup (server_stream);
5046

5147
/* set all mongoc_impl_t function pointers. */
52-
if (use_find_command) {
48+
if (
49+
/* Server version 5.1 and newer do not support OP_QUERY. */
50+
wire_version > WIRE_VERSION_5_0 ||
51+
/* Fallback to legacy OP_QUERY wire protocol messages if exhaust cursor
52+
requested with server version 3.6 or newer. */
53+
(wire_version >= WIRE_VERSION_FIND_CMD &&
54+
!_mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_EXHAUST))) {
5355
_mongoc_cursor_impl_find_cmd_init (cursor, &data->filter /* stolen */);
5456
} else {
5557
_mongoc_cursor_impl_find_opquery_init (cursor,

src/libmongoc/src/mongoc/mongoc-cursor.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -963,6 +963,11 @@ _mongoc_cursor_run_command (mongoc_cursor_t *cursor,
963963
_mongoc_bson_init_if_set (reply);
964964
GOTO (done);
965965
}
966+
if (_mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_EXHAUST)) {
967+
MONGOC_WARNING (
968+
"exhaust cursors not supported with OP_MSG, using normal "
969+
"cursor instead");
970+
}
966971
}
967972

968973
if (parts.assembled.session) {
@@ -1005,6 +1010,14 @@ _mongoc_cursor_run_command (mongoc_cursor_t *cursor,
10051010
GOTO (done);
10061011
}
10071012

1013+
/* Exhaust cursors with OP_MSG not yet supported; fallback to normal cursor.
1014+
* user_query_flags is unused in OP_MSG, so this technically has no effect,
1015+
* but is done anyways to ensure the query flags match handling of options.
1016+
*/
1017+
if (parts.user_query_flags & MONGOC_QUERY_EXHAUST) {
1018+
parts.user_query_flags ^= MONGOC_QUERY_EXHAUST;
1019+
}
1020+
10081021
/* we might use mongoc_cursor_set_hint to target a secondary but have no
10091022
* read preference, so the secondary rejects the read. same if we have a
10101023
* direct connection to a secondary (topology type "single"). with

src/libmongoc/tests/test-mongoc-counters.c

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ static void
129129
test_counters_op_msg (void *ctx)
130130
{
131131
mongoc_collection_t *coll;
132-
mongoc_cursor_t *exhaust_cursor;
132+
mongoc_cursor_t *cursor;
133133
const bson_t *bson;
134134
mongoc_client_t *client;
135135

@@ -144,22 +144,15 @@ test_counters_op_msg (void *ctx)
144144
DIFF_AND_RESET (op_egress_total, ==, 4);
145145
DIFF_AND_RESET (op_ingress_msg, ==, 4);
146146
DIFF_AND_RESET (op_ingress_total, ==, 4);
147-
/* an exhaust cursor still must use an OP_QUERY find. */
148-
exhaust_cursor = mongoc_collection_find (coll,
149-
MONGOC_QUERY_EXHAUST,
150-
0 /* skip */,
151-
0 /* limit */,
152-
1 /* batch size */,
153-
tmp_bson ("{}"),
154-
NULL /* fields */,
155-
NULL /* read prefs */);
156-
while (mongoc_cursor_next (exhaust_cursor, &bson))
147+
cursor =
148+
mongoc_collection_find_with_opts (coll, tmp_bson ("{}"), NULL, NULL);
149+
while (mongoc_cursor_next (cursor, &bson))
157150
;
158-
mongoc_cursor_destroy (exhaust_cursor);
159-
DIFF_AND_RESET (op_egress_msg, ==, 0);
160-
DIFF_AND_RESET (op_ingress_msg, ==, 0);
161-
DIFF_AND_RESET (op_egress_query, ==, 1);
162-
DIFF_AND_RESET (op_ingress_reply, >, 0);
151+
mongoc_cursor_destroy (cursor);
152+
DIFF_AND_RESET (op_egress_msg, >, 0);
153+
DIFF_AND_RESET (op_ingress_msg, >, 0);
154+
DIFF_AND_RESET (op_egress_query, ==, 0);
155+
DIFF_AND_RESET (op_ingress_reply, ==, 0);
163156
DIFF_AND_RESET (op_egress_total, >, 0);
164157
DIFF_AND_RESET (op_ingress_total, >, 0);
165158
mongoc_collection_destroy (coll);

src/libmongoc/tests/test-mongoc-exhaust.c

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,55 @@ test_exhaust_cursor_pool (void *context)
300300
test_exhaust_cursor (true);
301301
}
302302

303+
static void
304+
test_exhaust_cursor_fallback (void *unused)
305+
{
306+
mongoc_client_t *client;
307+
mongoc_collection_t *collection;
308+
bson_error_t error;
309+
mongoc_cursor_t *cursor;
310+
const bson_t *doc;
311+
312+
client = test_framework_new_default_client ();
313+
ASSERT (client);
314+
315+
collection = get_test_collection (client, "test_exhaust_cursor_fallback");
316+
ASSERT (collection);
317+
318+
(void) mongoc_collection_drop (collection, &error);
319+
ASSERT_OR_PRINT (mongoc_collection_insert_one (
320+
collection, tmp_bson ("{'a': 1}"), NULL, NULL, &error),
321+
error);
322+
323+
324+
cursor = mongoc_collection_find_with_opts (
325+
collection, tmp_bson ("{}"), tmp_bson ("{'exhaust': true}"), NULL);
326+
ASSERT (cursor);
327+
328+
/* Cursor should be a normal cursor despite exhaust option. */
329+
ASSERT (!cursor->in_exhaust);
330+
ASSERT (!cursor->client->in_exhaust);
331+
332+
/* Warning message is generated on call to mongoc_cursor_next() during which
333+
* server wire version is discovered, not on call to
334+
* mongoc_collection_find_with_opts(). */
335+
capture_logs (true);
336+
ASSERT_OR_PRINT (mongoc_cursor_next (cursor, &doc),
337+
(mongoc_cursor_error (cursor, &error), error));
338+
ASSERT_CAPTURED_LOG (
339+
"cursor",
340+
MONGOC_LOG_LEVEL_WARNING,
341+
"exhaust cursors not supported with OP_MSG, using normal cursor instead");
342+
capture_logs (false);
343+
344+
ASSERT_MATCH (doc, "{'a': 1}");
345+
ASSERT (!mongoc_cursor_next (cursor, &doc));
346+
347+
mongoc_cursor_destroy (cursor);
348+
mongoc_collection_destroy (collection);
349+
mongoc_client_destroy (client);
350+
}
351+
303352
static void
304353
test_exhaust_cursor_multi_batch (void *context)
305354
{
@@ -682,6 +731,13 @@ test_exhaust_install (TestSuite *suite)
682731
NULL,
683732
skip_if_mongos,
684733
test_framework_skip_if_no_legacy_opcodes);
734+
TestSuite_AddFull (suite,
735+
"/Client/exhaust_cursor/fallback",
736+
test_exhaust_cursor_fallback,
737+
NULL,
738+
NULL,
739+
skip_if_mongos,
740+
test_framework_skip_if_max_wire_version_less_than_14);
685741
TestSuite_AddLive (suite,
686742
"/Client/set_max_await_time_ms",
687743
test_cursor_set_max_await_time_ms);

0 commit comments

Comments
 (0)