@@ -395,18 +395,25 @@ mongoc_transaction_opts_get_read_prefs (const mongoc_transaction_opt_t *opts)
395
395
return opts -> read_prefs ;
396
396
}
397
397
398
-
399
- mongoc_session_opt_t *
400
- mongoc_session_opts_new (void )
398
+ bool
399
+ mongoc_session_opts_get_causal_consistency (const mongoc_session_opt_t * opts )
401
400
{
402
- mongoc_session_opt_t * opts = bson_malloc0 ( sizeof ( mongoc_session_opt_t )) ;
401
+ ENTRY ;
403
402
404
- /* Driver Sessions Spec: causal consistency is true by default */
405
- mongoc_session_opts_set_causal_consistency (opts , true);
403
+ BSON_ASSERT (opts );
406
404
407
- return opts ;
405
+ RETURN (!!( opts -> flags & MONGOC_SESSION_CAUSAL_CONSISTENCY )) ;
408
406
}
409
407
408
+ bool
409
+ mongoc_session_opts_get_snapshot (const mongoc_session_opt_t * opts )
410
+ {
411
+ ENTRY ;
412
+
413
+ BSON_ASSERT (opts );
414
+
415
+ RETURN (!!(opts -> flags & MONGOC_SESSION_SNAPSHOT ));
416
+ }
410
417
411
418
void
412
419
mongoc_session_opts_set_causal_consistency (mongoc_session_opt_t * opts ,
@@ -425,16 +432,35 @@ mongoc_session_opts_set_causal_consistency (mongoc_session_opt_t *opts,
425
432
EXIT ;
426
433
}
427
434
428
- bool
429
- mongoc_session_opts_get_causal_consistency ( const mongoc_session_opt_t * opts )
435
+ void
436
+ mongoc_session_opts_set_snapshot ( mongoc_session_opt_t * opts , bool snapshot )
430
437
{
431
438
ENTRY ;
432
439
433
440
BSON_ASSERT (opts );
434
441
435
- RETURN (!!(opts -> flags & MONGOC_SESSION_CAUSAL_CONSISTENCY ));
442
+ if (snapshot ) {
443
+ opts -> flags |= MONGOC_SESSION_SNAPSHOT ;
444
+ } else {
445
+ opts -> flags &= ~MONGOC_SESSION_SNAPSHOT ;
446
+ }
447
+
448
+ EXIT ;
436
449
}
437
450
451
+ mongoc_session_opt_t *
452
+ mongoc_session_opts_new (void )
453
+ {
454
+ mongoc_session_opt_t * opts = bson_malloc0 (sizeof (mongoc_session_opt_t ));
455
+
456
+ /* Driver Sessions Spec: causal consistency is true by default */
457
+ mongoc_session_opts_set_causal_consistency (opts , true);
458
+
459
+ /* Snapshot Reads Spec: snapshot is false by default */
460
+ mongoc_session_opts_set_snapshot (opts , false);
461
+
462
+ return opts ;
463
+ }
438
464
439
465
void
440
466
mongoc_session_opts_set_default_transaction_opts (
@@ -606,21 +632,30 @@ _mongoc_cluster_time_greater (const bson_t *new, const bson_t *old)
606
632
void
607
633
_mongoc_client_session_handle_reply (mongoc_client_session_t * session ,
608
634
bool is_acknowledged ,
635
+ const char * cmd_name ,
609
636
const bson_t * reply )
610
637
{
611
638
bson_iter_t iter ;
639
+ bson_iter_t cursor_iter ;
612
640
uint32_t len ;
613
641
const uint8_t * data ;
614
642
bson_t cluster_time ;
615
- uint32_t t ;
616
- uint32_t i ;
643
+ uint32_t operation_t ;
644
+ uint32_t operation_i ;
645
+ uint32_t snapshot_t ;
646
+ uint32_t snapshot_i ;
647
+ bool is_find_aggregate_distinct ;
617
648
618
649
BSON_ASSERT (session );
619
650
620
651
if (!reply || !bson_iter_init (& iter , reply )) {
621
652
return ;
622
653
}
623
654
655
+ is_find_aggregate_distinct =
656
+ (!strcmp (cmd_name , "find" ) || !strcmp (cmd_name , "aggregate" ) ||
657
+ !strcmp (cmd_name , "distinct" ));
658
+
624
659
if (mongoc_error_has_label (reply , "TransientTransactionError" )) {
625
660
/* Transaction Spec: "Drivers MUST unpin a ClientSession when a command
626
661
* within a transaction, including commitTransaction and abortTransaction,
@@ -639,8 +674,39 @@ _mongoc_client_session_handle_reply (mongoc_client_session_t *session,
639
674
mongoc_client_session_advance_cluster_time (session , & cluster_time );
640
675
} else if (!strcmp (bson_iter_key (& iter ), "operationTime" ) &&
641
676
BSON_ITER_HOLDS_TIMESTAMP (& iter ) && is_acknowledged ) {
642
- bson_iter_timestamp (& iter , & t , & i );
643
- mongoc_client_session_advance_operation_time (session , t , i );
677
+ bson_iter_timestamp (& iter , & operation_t , & operation_i );
678
+ mongoc_client_session_advance_operation_time (
679
+ session , operation_t , operation_i );
680
+ } else if (is_find_aggregate_distinct &&
681
+ !strcmp (bson_iter_key (& iter ), "atClusterTime" ) &&
682
+ mongoc_session_opts_get_snapshot (& session -> opts ) &&
683
+ !session -> snapshot_time_set ) {
684
+ /* If command is "find", "aggregate" or "distinct", atClusterTime is on
685
+ * top level of reply, snapshot is enabled for the session, and
686
+ * snapshot_time has not already been set, set it. */
687
+ bson_iter_timestamp (& iter , & snapshot_t , & snapshot_i );
688
+ _mongoc_client_session_set_snapshot_time (
689
+ session , snapshot_t , snapshot_i );
690
+ } else if (is_find_aggregate_distinct &&
691
+ !strcmp (bson_iter_key (& iter ), "cursor" ) &&
692
+ mongoc_session_opts_get_snapshot (& session -> opts ) &&
693
+ !session -> snapshot_time_set ) {
694
+ /* If command is "find", "aggregate" or "distinct", cursor is present,
695
+ * snapshot is enabled for the session, and snapshot_time has not
696
+ * already been set, try to find atClusterTime in cursor field to set
697
+ * snapshot_time. */
698
+ bson_iter_recurse (& iter , & cursor_iter );
699
+
700
+ while (bson_iter_next (& cursor_iter )) {
701
+ /* If atClusterTime is in cursor and is a valid timestamp, use it to
702
+ * set snapshot_time. */
703
+ if (!strcmp (bson_iter_key (& cursor_iter ), "atClusterTime" ) &&
704
+ BSON_ITER_HOLDS_TIMESTAMP (& cursor_iter )) {
705
+ bson_iter_timestamp (& cursor_iter , & snapshot_t , & snapshot_i );
706
+ _mongoc_client_session_set_snapshot_time (
707
+ session , snapshot_t , snapshot_i );
708
+ }
709
+ }
644
710
}
645
711
}
646
712
}
@@ -751,6 +817,9 @@ _mongoc_client_session_new (mongoc_client_t *client,
751
817
session -> opts .flags = MONGOC_SESSION_CAUSAL_CONSISTENCY ;
752
818
}
753
819
820
+ /* snapshot_time_set is false by default */
821
+ _mongoc_client_session_clear_snapshot_time (session );
822
+
754
823
/* these values are used for testing only. */
755
824
session -> with_txn_timeout_ms = 0 ;
756
825
session -> fail_commit_label = NULL ;
@@ -1055,6 +1124,15 @@ mongoc_client_session_start_transaction (mongoc_client_session_t *session,
1055
1124
GOTO (done );
1056
1125
}
1057
1126
1127
+ if (mongoc_session_opts_get_snapshot (& session -> opts )) {
1128
+ bson_set_error (error ,
1129
+ MONGOC_ERROR_TRANSACTION ,
1130
+ MONGOC_ERROR_TRANSACTION_INVALID_STATE ,
1131
+ "Transactions are not supported in snapshot sessions" );
1132
+ ret = false;
1133
+ GOTO (done );
1134
+ }
1135
+
1058
1136
if (sd -> max_wire_version < 7 ||
1059
1137
(sd -> max_wire_version < 8 && sd -> type == MONGOC_SERVER_MONGOS )) {
1060
1138
bson_set_error (error ,
@@ -1491,14 +1569,17 @@ void
1491
1569
_mongoc_client_session_append_read_concern (const mongoc_client_session_t * cs ,
1492
1570
const bson_t * rc ,
1493
1571
bool is_read_command ,
1572
+ const char * cmd_name ,
1494
1573
bson_t * cmd )
1495
1574
{
1496
1575
const mongoc_read_concern_t * txn_rc ;
1497
1576
mongoc_internal_transaction_state_t txn_state ;
1498
1577
bool user_rc_has_level ;
1499
1578
bool txn_has_level ;
1500
1579
bool has_timestamp ;
1580
+ bool is_snapshot ;
1501
1581
bool has_level ;
1582
+ bool is_find_aggregate_distinct ;
1502
1583
bson_t child ;
1503
1584
1504
1585
ENTRY ;
@@ -1512,16 +1593,24 @@ _mongoc_client_session_append_read_concern (const mongoc_client_session_t *cs,
1512
1593
return ;
1513
1594
}
1514
1595
1596
+ is_find_aggregate_distinct =
1597
+ (!strcmp (cmd_name , "find" ) || !strcmp (cmd_name , "aggregate" ) ||
1598
+ !strcmp (cmd_name , "distinct" ));
1599
+
1515
1600
has_timestamp =
1516
1601
(txn_state == MONGOC_INTERNAL_TRANSACTION_STARTING || is_read_command ) &&
1517
1602
mongoc_session_opts_get_causal_consistency (& cs -> opts ) &&
1518
1603
cs -> operation_timestamp ;
1604
+ is_snapshot = is_find_aggregate_distinct &&
1605
+ mongoc_session_opts_get_snapshot (& cs -> opts );
1519
1606
user_rc_has_level = rc && bson_has_field (rc , "level" );
1520
1607
txn_has_level = txn_state == MONGOC_INTERNAL_TRANSACTION_STARTING &&
1521
1608
!mongoc_read_concern_is_default (txn_rc );
1522
1609
has_level = user_rc_has_level || txn_has_level ;
1523
1610
1524
- if (!has_timestamp && !has_level ) {
1611
+ /* do not append read concern if no causal consistency, snapshot disabled and
1612
+ * no read concern is provided. */
1613
+ if (!has_timestamp && !is_snapshot && !has_level ) {
1525
1614
return ;
1526
1615
}
1527
1616
@@ -1531,18 +1620,32 @@ _mongoc_client_session_append_read_concern (const mongoc_client_session_t *cs,
1531
1620
}
1532
1621
1533
1622
if (txn_state == MONGOC_INTERNAL_TRANSACTION_STARTING ) {
1534
- /* add transaction's read concern level unless user overrides */
1535
- if (txn_has_level && !user_rc_has_level ) {
1623
+ /* add transaction's read concern level unless user overrides or snapshot
1624
+ * is enabled. */
1625
+ if (txn_has_level && !user_rc_has_level && !is_snapshot ) {
1536
1626
bson_append_utf8 (& child , "level" , 5 , txn_rc -> level , -1 );
1537
1627
}
1538
1628
}
1629
+ if (is_snapshot ) {
1630
+ bson_append_utf8 (
1631
+ & child , "level" , 5 , MONGOC_READ_CONCERN_LEVEL_SNAPSHOT , -1 );
1632
+ }
1539
1633
1634
+ /* append afterClusterTime if causal consistency and operation_time is set.
1635
+ * otherwise append atClusterTime if snapshot enabled and snapshot_time is
1636
+ * set. */
1540
1637
if (has_timestamp ) {
1541
1638
bson_append_timestamp (& child ,
1542
1639
"afterClusterTime" ,
1543
1640
16 ,
1544
1641
cs -> operation_timestamp ,
1545
1642
cs -> operation_increment );
1643
+ } else if (is_snapshot && cs -> snapshot_time_set ) {
1644
+ bson_append_timestamp (& child ,
1645
+ "atClusterTime" ,
1646
+ 13 ,
1647
+ cs -> snapshot_time_timestamp ,
1648
+ cs -> snapshot_time_increment );
1546
1649
}
1547
1650
1548
1651
bson_append_document_end (cmd , & child );
@@ -1621,6 +1724,27 @@ _mongoc_client_session_pin (mongoc_client_session_t *session,
1621
1724
session -> server_id = server_id ;
1622
1725
}
1623
1726
1727
+ void
1728
+ _mongoc_client_session_set_snapshot_time (mongoc_client_session_t * session ,
1729
+ uint32_t t ,
1730
+ uint32_t i )
1731
+ {
1732
+ BSON_ASSERT (session );
1733
+ BSON_ASSERT (!session -> snapshot_time_set );
1734
+
1735
+ session -> snapshot_time_set = true;
1736
+ session -> snapshot_time_timestamp = t ;
1737
+ session -> snapshot_time_increment = i ;
1738
+ }
1739
+
1740
+ void
1741
+ _mongoc_client_session_clear_snapshot_time (mongoc_client_session_t * session )
1742
+ {
1743
+ BSON_ASSERT (session );
1744
+
1745
+ session -> snapshot_time_set = false;
1746
+ }
1747
+
1624
1748
bool
1625
1749
mongoc_client_session_get_dirty (mongoc_client_session_t * session )
1626
1750
{
0 commit comments