Skip to content

Commit 9eba451

Browse files
CDRIVER-3893: Conditionally support $merge and $out execution on secondaries (#904)
* Allow readPreference:secondary with $merge and $out. [Finish CDRIVER-3893, CDRIVER-4224, and CDRIVER-4195] * Pass around the effectively chosen read mode used in server selection back to the command builder * Update doc func name * Fix duplicate doc comments * Consolidate duplicate state in suitable_servers() * Less magic numbers * Clarify wire version checks * Force primary read preference if talking to a single server older than 5.0
1 parent ee64af1 commit 9eba451

38 files changed

+1291
-233
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ set (CMAKE_MODULE_PATH
7979
include (InstallRequiredSystemLibraries)
8080
include (GNUInstallDirs)
8181

82+
include(MongoC-Warnings)
83+
8284
# Enable CCache, if possible
8385
include (CCache)
8486
# Link with LLD, if possible

src/libmongoc/src/mongoc/mongoc-aggregate.c

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -276,12 +276,9 @@ _mongoc_aggregate (mongoc_client_t *client,
276276
has_write_key = _has_write_key (&iter);
277277
}
278278

279-
if (has_write_key && cursor->read_prefs->mode != MONGOC_READ_PRIMARY) {
280-
mongoc_read_prefs_destroy (cursor->read_prefs);
281-
cursor->read_prefs = mongoc_read_prefs_new (MONGOC_READ_PRIMARY);
282-
MONGOC_WARNING ("$out or $merge stage specified. Overriding read "
283-
"preference to primary.");
284-
}
279+
/* This has an important effect on server selection when
280+
* readPreferences=secondary. Keep track of this fact for later use. */
281+
cursor->is_aggr_with_write_stage = has_write_key;
285282

286283
/* server id isn't enough. ensure we're connected & know wire version */
287284
server_stream = _mongoc_cursor_fetch_stream (cursor);

src/libmongoc/src/mongoc/mongoc-change-stream.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,13 @@ _make_cursor (mongoc_change_stream_t *stream)
279279
goto cleanup;
280280
}
281281

282-
server_stream = mongoc_cluster_stream_for_reads (
283-
&stream->client->cluster, stream->read_prefs, cs, &reply, &stream->err);
282+
server_stream =
283+
mongoc_cluster_stream_for_reads (&stream->client->cluster,
284+
stream->read_prefs,
285+
cs,
286+
&reply,
287+
/* Not aggregate-with-write */ false,
288+
&stream->err);
284289
if (!server_stream) {
285290
bson_destroy (&stream->err_doc);
286291
bson_copy_to (&reply, &stream->err_doc);

src/libmongoc/src/mongoc/mongoc-client.c

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1791,6 +1791,7 @@ _mongoc_client_retryable_read_command_with_stream (
17911791
parts->read_prefs,
17921792
parts->assembled.session,
17931793
NULL,
1794+
/* Not aggregate-with-write */ false,
17941795
&ignored_error);
17951796

17961797
if (retry_server_stream && retry_server_stream->sd->max_wire_version >=
@@ -1879,7 +1880,12 @@ mongoc_client_command_simple (mongoc_client_t *client,
18791880
* preference argument."
18801881
*/
18811882
server_stream =
1882-
mongoc_cluster_stream_for_reads (cluster, read_prefs, NULL, reply, error);
1883+
mongoc_cluster_stream_for_reads (cluster,
1884+
read_prefs,
1885+
NULL,
1886+
reply,
1887+
/* Not aggregate-with-write */ false,
1888+
error);
18831889

18841890
if (server_stream) {
18851891
ret = _mongoc_client_command_with_stream (
@@ -2039,7 +2045,12 @@ _mongoc_client_command_with_opts (mongoc_client_t *client,
20392045
mongoc_cluster_stream_for_writes (cluster, cs, reply_ptr, error);
20402046
} else {
20412047
server_stream =
2042-
mongoc_cluster_stream_for_reads (cluster, prefs, cs, reply_ptr, error);
2048+
mongoc_cluster_stream_for_reads (cluster,
2049+
prefs,
2050+
cs,
2051+
reply_ptr,
2052+
/* Not aggregate-with-write */ false,
2053+
error);
20432054
}
20442055

20452056
if (!server_stream) {
@@ -2586,8 +2597,12 @@ mongoc_client_kill_cursor (mongoc_client_t *client, int64_t cursor_id)
25862597
}
25872598

25882599
/* see if there's a known writable server - do no I/O or retries */
2589-
selected_server = mongoc_topology_description_select (
2590-
td.ptr, MONGOC_SS_WRITE, read_prefs, topology->local_threshold_msec);
2600+
selected_server =
2601+
mongoc_topology_description_select (td.ptr,
2602+
MONGOC_SS_WRITE,
2603+
read_prefs,
2604+
NULL /* chosen read mode */,
2605+
topology->local_threshold_msec);
25912606

25922607
if (selected_server) {
25932608
server_id = selected_server->id;
@@ -2850,7 +2865,8 @@ mongoc_client_select_server (mongoc_client_t *client,
28502865
return NULL;
28512866
}
28522867

2853-
sd = mongoc_topology_select (client->topology, optype, prefs, error);
2868+
sd = mongoc_topology_select (
2869+
client->topology, optype, prefs, NULL /* chosen read mode */, error);
28542870
if (!sd) {
28552871
return NULL;
28562872
}
@@ -2862,7 +2878,8 @@ mongoc_client_select_server (mongoc_client_t *client,
28622878

28632879
/* check failed, retry once */
28642880
mongoc_server_description_destroy (sd);
2865-
sd = mongoc_topology_select (client->topology, optype, prefs, error);
2881+
sd = mongoc_topology_select (
2882+
client->topology, optype, prefs, NULL /* chosen read mode */, error);
28662883
if (sd) {
28672884
return sd;
28682885
}
@@ -2997,8 +3014,8 @@ _mongoc_client_end_sessions (mongoc_client_t *client)
29973014

29983015
while (!mongoc_server_session_pool_is_empty (t->session_pool)) {
29993016
prefs = mongoc_read_prefs_new (MONGOC_READ_PRIMARY_PREFERRED);
3000-
server_id =
3001-
mongoc_topology_select_server_id (t, MONGOC_SS_READ, prefs, &error);
3017+
server_id = mongoc_topology_select_server_id (
3018+
t, MONGOC_SS_READ, prefs, NULL /* chosen read mode */, &error);
30023019

30033020
mongoc_read_prefs_destroy (prefs);
30043021
if (!server_id) {

src/libmongoc/src/mongoc/mongoc-cluster-private.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ BSON_BEGIN_DECLS
4444
typedef struct _mongoc_cluster_node_t {
4545
mongoc_stream_t *stream;
4646
char *connection_address;
47-
/* handshake_sd is a server description created from the handshake on the stream. */
47+
/* handshake_sd is a server description created from the handshake on the
48+
* stream. */
4849
mongoc_server_description_t *handshake_sd;
4950
} mongoc_cluster_node_t;
5051

@@ -122,6 +123,7 @@ mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
122123
const mongoc_read_prefs_t *read_prefs,
123124
mongoc_client_session_t *cs,
124125
bson_t *reply,
126+
bool is_aggr_with_write,
125127
bson_error_t *error);
126128

127129
/**

src/libmongoc/src/mongoc/mongoc-cluster.c

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2707,6 +2707,7 @@ _mongoc_cluster_select_server_id (mongoc_client_session_t *cs,
27072707
mongoc_topology_t *topology,
27082708
mongoc_ss_optype_t optype,
27092709
const mongoc_read_prefs_t *read_prefs,
2710+
bool *must_use_primary,
27102711
bson_error_t *error)
27112712
{
27122713
uint32_t server_id;
@@ -2715,14 +2716,14 @@ _mongoc_cluster_select_server_id (mongoc_client_session_t *cs,
27152716
server_id = cs->server_id;
27162717
if (!server_id) {
27172718
server_id = mongoc_topology_select_server_id (
2718-
topology, optype, read_prefs, error);
2719+
topology, optype, read_prefs, must_use_primary, error);
27192720
if (server_id) {
27202721
_mongoc_client_session_pin (cs, server_id);
27212722
}
27222723
}
27232724
} else {
2724-
server_id =
2725-
mongoc_topology_select_server_id (topology, optype, read_prefs, error);
2725+
server_id = mongoc_topology_select_server_id (
2726+
topology, optype, read_prefs, must_use_primary, error);
27262727
/* Transactions Spec: Additionally, any non-transaction operation using a
27272728
* pinned ClientSession MUST unpin the session and the operation MUST
27282729
* perform normal server selection. */
@@ -2763,13 +2764,14 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,
27632764
mongoc_server_stream_t *server_stream;
27642765
uint32_t server_id;
27652766
mongoc_topology_t *topology = cluster->client->topology;
2767+
bool must_use_primary = false;
27662768

27672769
ENTRY;
27682770

27692771
BSON_ASSERT (cluster);
27702772

27712773
server_id = _mongoc_cluster_select_server_id (
2772-
cs, topology, optype, read_prefs, error);
2774+
cs, topology, optype, read_prefs, &must_use_primary, error);
27732775

27742776
if (!server_id) {
27752777
_mongoc_bson_init_with_transient_txn_error (cs, reply);
@@ -2779,7 +2781,7 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,
27792781
if (!mongoc_cluster_check_interval (cluster, server_id)) {
27802782
/* Server Selection Spec: try once more */
27812783
server_id = _mongoc_cluster_select_server_id (
2782-
cs, topology, optype, read_prefs, error);
2784+
cs, topology, optype, read_prefs, &must_use_primary, error);
27832785

27842786
if (!server_id) {
27852787
_mongoc_bson_init_with_transient_txn_error (cs, reply);
@@ -2790,6 +2792,9 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,
27902792
/* connect or reconnect to server if necessary */
27912793
server_stream = _mongoc_cluster_stream_for_server (
27922794
cluster, server_id, true /* reconnect_ok */, cs, reply, error);
2795+
if (server_stream) {
2796+
server_stream->must_use_primary = must_use_primary;
2797+
}
27932798

27942799
RETURN (server_stream);
27952800
}
@@ -2799,6 +2804,7 @@ mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
27992804
const mongoc_read_prefs_t *read_prefs,
28002805
mongoc_client_session_t *cs,
28012806
bson_t *reply,
2807+
bool has_write_stage,
28022808
bson_error_t *error)
28032809
{
28042810
const mongoc_read_prefs_t *prefs_override = read_prefs;
@@ -2808,7 +2814,13 @@ mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
28082814
}
28092815

28102816
return _mongoc_cluster_stream_for_optype (
2811-
cluster, MONGOC_SS_READ, prefs_override, cs, reply, error);
2817+
cluster,
2818+
/* Narrow down the optype if this is an aggregate op with a write stage */
2819+
has_write_stage ? MONGOC_SS_AGGREGATE_WITH_WRITE : MONGOC_SS_READ,
2820+
prefs_override,
2821+
cs,
2822+
reply,
2823+
error);
28122824
}
28132825

28142826
mongoc_server_stream_t *

src/libmongoc/src/mongoc/mongoc-cmd.c

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,13 @@ _mongoc_cmd_parts_assemble_mongos (mongoc_cmd_parts_t *parts,
474474
hedge = mongoc_read_prefs_get_hedge (parts->read_prefs);
475475
}
476476

477+
if (server_stream->must_use_primary) {
478+
/* Server selection has overriden the read mode used to generate this
479+
* server stream. This has effects on the body of the message that we send
480+
* to the server */
481+
mode = MONGOC_READ_PRIMARY;
482+
}
483+
477484
/* Server Selection Spec says:
478485
*
479486
* For mode 'primary', drivers MUST NOT set the secondaryOk wire protocol
@@ -820,6 +827,7 @@ mongoc_cmd_parts_assemble (mongoc_cmd_parts_t *parts,
820827
const char *cmd_name;
821828
bool is_get_more;
822829
const mongoc_read_prefs_t *prefs_ptr;
830+
mongoc_read_mode_t mode = mongoc_read_prefs_get_mode (parts->read_prefs);
823831
bool ret = false;
824832

825833
ENTRY;
@@ -878,6 +886,14 @@ mongoc_cmd_parts_assemble (mongoc_cmd_parts_t *parts,
878886
prefs_ptr = parts->read_prefs;
879887
}
880888

889+
mode = mongoc_read_prefs_get_mode (prefs_ptr);
890+
if (server_stream->must_use_primary) {
891+
/* Server selection may have overriden the read mode used to generate this
892+
* server stream. This has effects on the body of the message that we send
893+
* to the server */
894+
mode = MONGOC_READ_PRIMARY;
895+
}
896+
881897
if (server_stream->sd->max_wire_version >= WIRE_VERSION_OP_MSG) {
882898
if (!bson_has_field (parts->body, "$db")) {
883899
BSON_APPEND_UTF8 (&parts->extra, "$db", parts->assembled.db_name);
@@ -892,7 +908,7 @@ mongoc_cmd_parts_assemble (mongoc_cmd_parts_t *parts,
892908
"Read preference in a transaction must be primary");
893909
GOTO (done);
894910
}
895-
} else if (!IS_PREF_PRIMARY (prefs_ptr) &&
911+
} else if (mode != MONGOC_READ_PRIMARY &&
896912
server_type != MONGOC_SERVER_STANDALONE) {
897913
/* "Type Standalone: clients MUST NOT send the read preference to the
898914
* server" */

src/libmongoc/src/mongoc/mongoc-collection.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -861,8 +861,13 @@ mongoc_collection_estimated_document_count (
861861

862862
BSON_ASSERT_PARAM (coll);
863863

864-
server_stream = mongoc_cluster_stream_for_reads (
865-
&coll->client->cluster, read_prefs, NULL, reply, error);
864+
server_stream =
865+
mongoc_cluster_stream_for_reads (&coll->client->cluster,
866+
read_prefs,
867+
NULL,
868+
reply,
869+
/* Not aggregate-with-write */ false,
870+
error);
866871

867872
if (opts && bson_has_field (opts, "sessionId")) {
868873
bson_set_error (error,

src/libmongoc/src/mongoc/mongoc-cursor-private.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "mongoc-buffer-private.h"
2626
#include "mongoc-rpc-private.h"
2727
#include "mongoc-server-stream-private.h"
28+
#include "mongoc-cluster-private.h"
2829

2930

3031
BSON_BEGIN_DECLS
@@ -131,6 +132,16 @@ struct _mongoc_cursor_t {
131132
mongoc_read_prefs_t *read_prefs;
132133
mongoc_write_concern_t *write_concern;
133134

135+
/** If the cursor was created for an operation that might have overridden the
136+
* user's read preferences' read mode, then server selection forced the
137+
* cursor to use a read preference mode of 'primary' server. Whether this
138+
* force occurred is stored here: */
139+
bool must_use_primary;
140+
141+
/** Whether this cursor corresponds to an aggregate command that contains a
142+
* writing-stage */
143+
bool is_aggr_with_write_stage;
144+
134145
bool explicit_session;
135146
mongoc_client_session_t *client_session;
136147

src/libmongoc/src/mongoc/mongoc-cursor.c

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ _mongoc_cursor_new_with_opts (mongoc_client_t *client,
251251
cursor->client = client;
252252
cursor->state = UNPRIMED;
253253
cursor->client_generation = client->generation;
254+
cursor->is_aggr_with_write_stage = false;
254255

255256
bson_init (&cursor->opts);
256257
bson_init (&cursor->error_doc);
@@ -654,22 +655,33 @@ _mongoc_cursor_fetch_stream (mongoc_cursor_t *cursor)
654655
ENTRY;
655656

656657
if (cursor->server_id) {
658+
/* We already did server selection once before. Reuse the prior
659+
* selection to create a new stream on the same server. */
657660
server_stream =
658661
mongoc_cluster_stream_for_server (&cursor->client->cluster,
659662
cursor->server_id,
660663
true /* reconnect_ok */,
661664
cursor->client_session,
662665
&reply,
663666
&cursor->error);
667+
/* Also restore whether primary read preference was forced by server
668+
* selection */
669+
server_stream->must_use_primary = cursor->must_use_primary;
664670
} else {
665-
server_stream = mongoc_cluster_stream_for_reads (&cursor->client->cluster,
666-
cursor->read_prefs,
667-
cursor->client_session,
668-
&reply,
669-
&cursor->error);
671+
server_stream =
672+
mongoc_cluster_stream_for_reads (&cursor->client->cluster,
673+
cursor->read_prefs,
674+
cursor->client_session,
675+
&reply,
676+
cursor->is_aggr_with_write_stage,
677+
&cursor->error);
670678

671679
if (server_stream) {
680+
/* Remember the selected server_id and whether primary read mode was
681+
* forced so that we can re-create an equivalent server_stream at a
682+
* later time */
672683
cursor->server_id = server_stream->sd->id;
684+
cursor->must_use_primary = server_stream->must_use_primary;
673685
}
674686
}
675687

@@ -1083,11 +1095,16 @@ _mongoc_cursor_run_command (mongoc_cursor_t *cursor,
10831095

10841096
mongoc_server_stream_cleanup (server_stream);
10851097

1086-
server_stream = mongoc_cluster_stream_for_reads (&cursor->client->cluster,
1087-
cursor->read_prefs,
1088-
cursor->client_session,
1089-
reply,
1090-
&cursor->error);
1098+
BSON_ASSERT (!cursor->is_aggr_with_write_stage &&
1099+
"Cannot attempt a retry on an aggregate operation that "
1100+
"contains write stages");
1101+
server_stream =
1102+
mongoc_cluster_stream_for_reads (&cursor->client->cluster,
1103+
cursor->read_prefs,
1104+
cursor->client_session,
1105+
reply,
1106+
/* Not aggregate-with-write */ false,
1107+
&cursor->error);
10911108

10921109
if (server_stream &&
10931110
server_stream->sd->max_wire_version >= WIRE_VERSION_RETRY_READS) {

src/libmongoc/src/mongoc/mongoc-read-prefs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ mongoc_read_prefs_is_valid (const mongoc_read_prefs_t *read_prefs)
164164
return false;
165165
}
166166

167+
167168
return true;
168169
}
169170

0 commit comments

Comments
 (0)