Skip to content

Commit eff456e

Browse files
committed
check for stream validity before sending command
Only select a new stream if another batch is needed.
1 parent 00ceb67 commit eff456e

File tree

1 file changed

+17
-17
lines changed

1 file changed

+17
-17
lines changed

src/libmongoc/src/mongoc/mongoc-bulkwrite.c

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1502,6 +1502,23 @@ mongoc_bulkwrite_execute (mongoc_bulkwrite_t *self, const mongoc_bulkwriteopts_t
15021502
payload->size = (int32_t) ops_byte_len;
15031503
}
15041504

1505+
// Check if stream is valid. `mongoc_cluster_run_retryable_write` may have invalidated stream (e.g. due to
1506+
// processing an error). If invalid, select a new stream before processing more batches.
1507+
if (!mongoc_cluster_stream_valid (&self->client->cluster, parts.assembled.server_stream)) {
1508+
bson_t reply;
1509+
// Select a server and create a stream again.
1510+
mongoc_server_stream_cleanup (ss);
1511+
ss = mongoc_cluster_stream_for_writes (
1512+
&self->client->cluster, NULL /* session */, NULL /* deprioritized servers */, &reply, &error);
1513+
1514+
if (ss) {
1515+
parts.assembled.server_stream = ss;
1516+
} else {
1517+
_bulkwriteexception_set_error (ret.exc, &error);
1518+
goto batch_fail;
1519+
}
1520+
}
1521+
15051522
// Send command.
15061523
{
15071524
mongoc_server_stream_t *new_ss = NULL;
@@ -1763,23 +1780,6 @@ mongoc_bulkwrite_execute (mongoc_bulkwrite_t *self, const mongoc_bulkwriteopts_t
17631780
}
17641781
}
17651782
}
1766-
1767-
// Check if stream is valid. `mongoc_cluster_run_retryable_write` may have invalidated stream (e.g. due to
1768-
// processing an error). If invalid, select a new stream before processing more batches.
1769-
if (!mongoc_cluster_stream_valid (&self->client->cluster, parts.assembled.server_stream)) {
1770-
bson_t reply;
1771-
// Select a server and create a stream again.
1772-
mongoc_server_stream_cleanup (ss);
1773-
ss = mongoc_cluster_stream_for_writes (
1774-
&self->client->cluster, NULL /* session */, NULL /* deprioritized servers */, &reply, &error);
1775-
1776-
if (ss) {
1777-
parts.assembled.server_stream = ss;
1778-
} else {
1779-
_bulkwriteexception_set_error (ret.exc, &error);
1780-
goto batch_fail;
1781-
}
1782-
}
17831783
}
17841784

17851785
ops_doc_offset += ops_doc_len;

0 commit comments

Comments
 (0)