Skip to content

Commit 4d48130

Browse files
authored
PYTHON-4667 Handle $clusterTime from error responses in client Bulk Write (#1822)
1 parent e27b428 commit 4d48130

File tree

5 files changed

+50
-7
lines changed

5 files changed

+50
-7
lines changed

pymongo/asynchronous/bulk.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ async def write_command(
281281
)
282282
if bwc.publish:
283283
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
284+
await client._process_response(reply, bwc.session) # type: ignore[arg-type]
284285
except Exception as exc:
285286
duration = datetime.datetime.now() - bwc.start_time
286287
if isinstance(exc, (NotPrimaryError, OperationFailure)):
@@ -308,6 +309,9 @@ async def write_command(
308309

309310
if bwc.publish:
310311
bwc._fail(request_id, failure, duration)
312+
# Process the response from the server.
313+
if isinstance(exc, (NotPrimaryError, OperationFailure)):
314+
await client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
311315
raise
312316
finally:
313317
bwc.start_time = datetime.datetime.now()
@@ -449,7 +453,6 @@ async def _execute_batch(
449453
else:
450454
request_id, msg, to_send = bwc.batch_command(cmd, ops)
451455
result = await self.write_command(bwc, cmd, request_id, msg, to_send, client) # type: ignore[arg-type]
452-
await client._process_response(result, bwc.session) # type: ignore[arg-type]
453456

454457
return result, to_send # type: ignore[return-value]
455458

pymongo/asynchronous/client_bulk.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,8 @@ async def write_command(
283283
)
284284
if bwc.publish:
285285
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
286+
# Process the response from the server.
287+
await self.client._process_response(reply, bwc.session) # type: ignore[arg-type]
286288
except Exception as exc:
287289
duration = datetime.datetime.now() - bwc.start_time
288290
if isinstance(exc, (NotPrimaryError, OperationFailure)):
@@ -312,6 +314,11 @@ async def write_command(
312314
bwc._fail(request_id, failure, duration)
313315
# Top-level error will be embedded in ClientBulkWriteException.
314316
reply = {"error": exc}
317+
# Process the response from the server.
318+
if isinstance(exc, OperationFailure):
319+
await self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
320+
else:
321+
await self.client._process_response({}, bwc.session) # type: ignore[arg-type]
315322
finally:
316323
bwc.start_time = datetime.datetime.now()
317324
return reply # type: ignore[return-value]
@@ -431,7 +438,6 @@ async def _execute_batch(
431438
result = await self.write_command(
432439
bwc, cmd, request_id, msg, to_send_ops, to_send_ns, self.client
433440
) # type: ignore[arg-type]
434-
await self.client._process_response(result, bwc.session) # type: ignore[arg-type]
435441
return result, to_send_ops, to_send_ns # type: ignore[return-value]
436442

437443
async def _process_results_cursor(

pymongo/synchronous/bulk.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ def write_command(
281281
)
282282
if bwc.publish:
283283
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
284+
client._process_response(reply, bwc.session) # type: ignore[arg-type]
284285
except Exception as exc:
285286
duration = datetime.datetime.now() - bwc.start_time
286287
if isinstance(exc, (NotPrimaryError, OperationFailure)):
@@ -308,6 +309,9 @@ def write_command(
308309

309310
if bwc.publish:
310311
bwc._fail(request_id, failure, duration)
312+
# Process the response from the server.
313+
if isinstance(exc, (NotPrimaryError, OperationFailure)):
314+
client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
311315
raise
312316
finally:
313317
bwc.start_time = datetime.datetime.now()
@@ -449,7 +453,6 @@ def _execute_batch(
449453
else:
450454
request_id, msg, to_send = bwc.batch_command(cmd, ops)
451455
result = self.write_command(bwc, cmd, request_id, msg, to_send, client) # type: ignore[arg-type]
452-
client._process_response(result, bwc.session) # type: ignore[arg-type]
453456

454457
return result, to_send # type: ignore[return-value]
455458

pymongo/synchronous/client_bulk.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,8 @@ def write_command(
283283
)
284284
if bwc.publish:
285285
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
286+
# Process the response from the server.
287+
self.client._process_response(reply, bwc.session) # type: ignore[arg-type]
286288
except Exception as exc:
287289
duration = datetime.datetime.now() - bwc.start_time
288290
if isinstance(exc, (NotPrimaryError, OperationFailure)):
@@ -312,6 +314,11 @@ def write_command(
312314
bwc._fail(request_id, failure, duration)
313315
# Top-level error will be embedded in ClientBulkWriteException.
314316
reply = {"error": exc}
317+
# Process the response from the server.
318+
if isinstance(exc, OperationFailure):
319+
self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
320+
else:
321+
self.client._process_response({}, bwc.session) # type: ignore[arg-type]
315322
finally:
316323
bwc.start_time = datetime.datetime.now()
317324
return reply # type: ignore[return-value]
@@ -429,7 +436,6 @@ def _execute_batch(
429436
"""Executes a batch of bulkWrite server commands (ack)."""
430437
request_id, msg, to_send_ops, to_send_ns = bwc.batch_command(cmd, ops, namespaces)
431438
result = self.write_command(bwc, cmd, request_id, msg, to_send_ops, to_send_ns, self.client) # type: ignore[arg-type]
432-
self.client._process_response(result, bwc.session) # type: ignore[arg-type]
433439
return result, to_send_ops, to_send_ns # type: ignore[return-value]
434440

435441
def _process_results_cursor(

test/mockupdb/test_cluster_time.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,22 @@
2929

3030
from bson import Timestamp
3131
from pymongo import DeleteMany, InsertOne, MongoClient, UpdateOne
32+
from pymongo.errors import OperationFailure
3233

3334
pytestmark = pytest.mark.mockupdb
3435

3536

3637
class TestClusterTime(unittest.TestCase):
37-
def cluster_time_conversation(self, callback, replies):
38+
def cluster_time_conversation(self, callback, replies, max_wire_version=6):
3839
cluster_time = Timestamp(0, 0)
3940
server = MockupDB()
4041

41-
# First test all commands include $clusterTime with wire version 6.
42+
# First test all commands include $clusterTime with max_wire_version.
4243
_ = server.autoresponds(
4344
"ismaster",
4445
{
4546
"minWireVersion": 0,
46-
"maxWireVersion": 6,
47+
"maxWireVersion": max_wire_version,
4748
"$clusterTime": {"clusterTime": cluster_time},
4849
},
4950
)
@@ -166,6 +167,30 @@ def test_monitor(self):
166167
request.reply(reply)
167168
client.close()
168169

170+
def test_collection_bulk_error(self):
171+
def callback(client: MongoClient[dict]) -> None:
172+
with self.assertRaises(OperationFailure):
173+
client.db.collection.bulk_write([InsertOne({}), InsertOne({})])
174+
175+
self.cluster_time_conversation(
176+
callback,
177+
[{"ok": 0, "errmsg": "mock error"}],
178+
)
179+
180+
def test_client_bulk_error(self):
181+
def callback(client: MongoClient[dict]) -> None:
182+
with self.assertRaises(OperationFailure):
183+
client.bulk_write(
184+
[
185+
InsertOne({}, namespace="db.collection"),
186+
InsertOne({}, namespace="db.collection"),
187+
]
188+
)
189+
190+
self.cluster_time_conversation(
191+
callback, [{"ok": 0, "errmsg": "mock error"}], max_wire_version=25
192+
)
193+
169194

170195
if __name__ == "__main__":
171196
unittest.main()

0 commit comments

Comments
 (0)