Skip to content

CDRIVER-4034 Allow snapshot reads through sessions #807

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/libmongoc/doc/mongoc_session_opt_t.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ See the example code for :symbol:`mongoc_session_opts_set_causal_consistency`.
mongoc_session_opts_set_causal_consistency
mongoc_session_opts_get_default_transaction_opts
mongoc_session_opts_set_default_transaction_opts
mongoc_session_opts_get_snapshot
mongoc_session_opts_set_snapshot
mongoc_session_opts_get_transaction_opts
mongoc_session_opts_clone
mongoc_session_opts_destroy
6 changes: 6 additions & 0 deletions src/libmongoc/doc/mongoc_session_opts_get_snapshot.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
:man_page: mongoc_session_opts_get_snapshot

mongoc_session_opts_get_snapshot()
==================================

TODO CDRIVER-4036: Document this function and give example code.
6 changes: 6 additions & 0 deletions src/libmongoc/doc/mongoc_session_opts_set_snapshot.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
:man_page: mongoc_session_opts_set_snapshot

mongoc_session_opts_set_snapshot()
==================================

TODO CDRIVER-4036: Document this function and give example code.
13 changes: 13 additions & 0 deletions src/libmongoc/src/mongoc/mongoc-client-session-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct _mongoc_transaction_opt_t {
typedef enum {
MONGOC_SESSION_NO_OPTS = 0,
MONGOC_SESSION_CAUSAL_CONSISTENCY = (1 << 0),
MONGOC_SESSION_SNAPSHOT = (2 << 0),
} mongoc_session_flag_t;

struct _mongoc_session_opt_t {
Expand Down Expand Up @@ -83,6 +84,9 @@ struct _mongoc_client_session_t {
uint32_t client_generation;
uint32_t server_id;
bson_t *recovery_token;
uint32_t snapshot_time_timestamp;
uint32_t snapshot_time_increment;
bool snapshot_time_set;

/* For testing only */
int64_t with_txn_timeout_ms;
Expand All @@ -100,6 +104,7 @@ _mongoc_cluster_time_greater (const bson_t *new, const bson_t *old);
void
_mongoc_client_session_handle_reply (mongoc_client_session_t *session,
bool is_acknowledged,
const char *cmd_name,
const bson_t *reply);

mongoc_server_session_t *
Expand Down Expand Up @@ -143,6 +148,7 @@ void
_mongoc_client_session_append_read_concern (const mongoc_client_session_t *cs,
const bson_t *user_read_concern,
bool is_read_command,
const char *cmd_name,
bson_t *cmd);

void
Expand All @@ -152,5 +158,12 @@ void
_mongoc_client_session_pin (mongoc_client_session_t *session,
uint32_t server_id);

void
_mongoc_client_session_set_snapshot_time (mongoc_client_session_t *session,
uint32_t t,
uint32_t i);

void
_mongoc_client_session_clear_snapshot_time (mongoc_client_session_t *session);

#endif /* MONGOC_CLIENT_SESSION_PRIVATE_H */
158 changes: 141 additions & 17 deletions src/libmongoc/src/mongoc/mongoc-client-session.c
Original file line number Diff line number Diff line change
Expand Up @@ -395,18 +395,25 @@ mongoc_transaction_opts_get_read_prefs (const mongoc_transaction_opt_t *opts)
return opts->read_prefs;
}


mongoc_session_opt_t *
mongoc_session_opts_new (void)
bool
mongoc_session_opts_get_causal_consistency (const mongoc_session_opt_t *opts)
{
mongoc_session_opt_t *opts = bson_malloc0 (sizeof (mongoc_session_opt_t));
ENTRY;

/* Driver Sessions Spec: causal consistency is true by default */
mongoc_session_opts_set_causal_consistency (opts, true);
BSON_ASSERT (opts);

return opts;
RETURN (!!(opts->flags & MONGOC_SESSION_CAUSAL_CONSISTENCY));
}

bool
mongoc_session_opts_get_snapshot (const mongoc_session_opt_t *opts)
{
ENTRY;

BSON_ASSERT (opts);

RETURN (!!(opts->flags & MONGOC_SESSION_SNAPSHOT));
}

void
mongoc_session_opts_set_causal_consistency (mongoc_session_opt_t *opts,
Expand All @@ -425,16 +432,35 @@ mongoc_session_opts_set_causal_consistency (mongoc_session_opt_t *opts,
EXIT;
}

bool
mongoc_session_opts_get_causal_consistency (const mongoc_session_opt_t *opts)
void
mongoc_session_opts_set_snapshot (mongoc_session_opt_t *opts, bool snapshot)
{
ENTRY;

BSON_ASSERT (opts);

RETURN (!!(opts->flags & MONGOC_SESSION_CAUSAL_CONSISTENCY));
if (snapshot) {
opts->flags |= MONGOC_SESSION_SNAPSHOT;
} else {
opts->flags &= ~MONGOC_SESSION_SNAPSHOT;
}

EXIT;
}

mongoc_session_opt_t *
mongoc_session_opts_new (void)
{
mongoc_session_opt_t *opts = bson_malloc0 (sizeof (mongoc_session_opt_t));

/* Driver Sessions Spec: causal consistency is true by default */
mongoc_session_opts_set_causal_consistency (opts, true);

/* Snapshot Reads Spec: snapshot is false by default */
mongoc_session_opts_set_snapshot (opts, false);

return opts;
}

void
mongoc_session_opts_set_default_transaction_opts (
Expand Down Expand Up @@ -606,21 +632,30 @@ _mongoc_cluster_time_greater (const bson_t *new, const bson_t *old)
void
_mongoc_client_session_handle_reply (mongoc_client_session_t *session,
bool is_acknowledged,
const char *cmd_name,
const bson_t *reply)
{
bson_iter_t iter;
bson_iter_t cursor_iter;
uint32_t len;
const uint8_t *data;
bson_t cluster_time;
uint32_t t;
uint32_t i;
uint32_t operation_t;
uint32_t operation_i;
uint32_t snapshot_t;
uint32_t snapshot_i;
bool is_find_aggregate_distinct;

BSON_ASSERT (session);

if (!reply || !bson_iter_init (&iter, reply)) {
return;
}

is_find_aggregate_distinct =
(!strcmp (cmd_name, "find") || !strcmp (cmd_name, "aggregate") ||
!strcmp (cmd_name, "distinct"));

if (mongoc_error_has_label (reply, "TransientTransactionError")) {
/* Transaction Spec: "Drivers MUST unpin a ClientSession when a command
* within a transaction, including commitTransaction and abortTransaction,
Expand All @@ -639,8 +674,39 @@ _mongoc_client_session_handle_reply (mongoc_client_session_t *session,
mongoc_client_session_advance_cluster_time (session, &cluster_time);
} else if (!strcmp (bson_iter_key (&iter), "operationTime") &&
BSON_ITER_HOLDS_TIMESTAMP (&iter) && is_acknowledged) {
bson_iter_timestamp (&iter, &t, &i);
mongoc_client_session_advance_operation_time (session, t, i);
bson_iter_timestamp (&iter, &operation_t, &operation_i);
mongoc_client_session_advance_operation_time (
session, operation_t, operation_i);
} else if (is_find_aggregate_distinct &&
!strcmp (bson_iter_key (&iter), "atClusterTime") &&
mongoc_session_opts_get_snapshot (&session->opts) &&
!session->snapshot_time_set) {
/* If command is "find", "aggregate" or "distinct", atClusterTime is on
* top level of reply, snapshot is enabled for the session, and
* snapshot_time has not already been set, set it. */
bson_iter_timestamp (&iter, &snapshot_t, &snapshot_i);
_mongoc_client_session_set_snapshot_time (
session, snapshot_t, snapshot_i);
} else if (is_find_aggregate_distinct &&
!strcmp (bson_iter_key (&iter), "cursor") &&
mongoc_session_opts_get_snapshot (&session->opts) &&
!session->snapshot_time_set) {
/* If command is "find", "aggregate" or "distinct", cursor is present,
* snapshot is enabled for the session, and snapshot_time has not
* already been set, try to find atClusterTime in cursor field to set
* snapshot_time. */
bson_iter_recurse (&iter, &cursor_iter);

while (bson_iter_next (&cursor_iter)) {
/* If atClusterTime is in cursor and is a valid timestamp, use it to
* set snapshot_time. */
if (!strcmp (bson_iter_key (&cursor_iter), "atClusterTime") &&
BSON_ITER_HOLDS_TIMESTAMP (&cursor_iter)) {
bson_iter_timestamp (&cursor_iter, &snapshot_t, &snapshot_i);
_mongoc_client_session_set_snapshot_time (
session, snapshot_t, snapshot_i);
}
}
}
}
}
Expand Down Expand Up @@ -751,6 +817,9 @@ _mongoc_client_session_new (mongoc_client_t *client,
session->opts.flags = MONGOC_SESSION_CAUSAL_CONSISTENCY;
}

/* snapshot_time_set is false by default */
_mongoc_client_session_clear_snapshot_time (session);

/* these values are used for testing only. */
session->with_txn_timeout_ms = 0;
session->fail_commit_label = NULL;
Expand Down Expand Up @@ -1055,6 +1124,15 @@ mongoc_client_session_start_transaction (mongoc_client_session_t *session,
GOTO (done);
}

if (mongoc_session_opts_get_snapshot (&session->opts)) {
bson_set_error (error,
MONGOC_ERROR_TRANSACTION,
MONGOC_ERROR_TRANSACTION_INVALID_STATE,
"Transactions are not supported in snapshot sessions");
ret = false;
GOTO (done);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new error is an alteration to the spec accompanied by a new spec test "StartTransaction fails in snapshot session".

}

if (sd->max_wire_version < 7 ||
(sd->max_wire_version < 8 && sd->type == MONGOC_SERVER_MONGOS)) {
bson_set_error (error,
Expand Down Expand Up @@ -1491,14 +1569,17 @@ void
_mongoc_client_session_append_read_concern (const mongoc_client_session_t *cs,
const bson_t *rc,
bool is_read_command,
const char *cmd_name,
bson_t *cmd)
{
const mongoc_read_concern_t *txn_rc;
mongoc_internal_transaction_state_t txn_state;
bool user_rc_has_level;
bool txn_has_level;
bool has_timestamp;
bool is_snapshot;
bool has_level;
bool is_find_aggregate_distinct;
bson_t child;

ENTRY;
Expand All @@ -1512,16 +1593,24 @@ _mongoc_client_session_append_read_concern (const mongoc_client_session_t *cs,
return;
}

is_find_aggregate_distinct =
(!strcmp (cmd_name, "find") || !strcmp (cmd_name, "aggregate") ||
!strcmp (cmd_name, "distinct"));

has_timestamp =
(txn_state == MONGOC_INTERNAL_TRANSACTION_STARTING || is_read_command) &&
mongoc_session_opts_get_causal_consistency (&cs->opts) &&
cs->operation_timestamp;
is_snapshot = is_find_aggregate_distinct &&
mongoc_session_opts_get_snapshot (&cs->opts);
user_rc_has_level = rc && bson_has_field (rc, "level");
txn_has_level = txn_state == MONGOC_INTERNAL_TRANSACTION_STARTING &&
!mongoc_read_concern_is_default (txn_rc);
has_level = user_rc_has_level || txn_has_level;

if (!has_timestamp && !has_level) {
/* do not append read concern if no causal consistency, snapshot disabled and
* no read concern is provided. */
if (!has_timestamp && !is_snapshot && !has_level) {
return;
}

Expand All @@ -1531,18 +1620,32 @@ _mongoc_client_session_append_read_concern (const mongoc_client_session_t *cs,
}

if (txn_state == MONGOC_INTERNAL_TRANSACTION_STARTING) {
/* add transaction's read concern level unless user overrides */
if (txn_has_level && !user_rc_has_level) {
/* add transaction's read concern level unless user overrides or snapshot
* is enabled. */
if (txn_has_level && !user_rc_has_level && !is_snapshot) {
bson_append_utf8 (&child, "level", 5, txn_rc->level, -1);
}
}
if (is_snapshot) {
bson_append_utf8 (
&child, "level", 5, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT, -1);
}

/* append afterClusterTime if causal consistency and operation_time is set.
* otherwise append atClusterTime if snapshot enabled and snapshot_time is
* set. */
if (has_timestamp) {
bson_append_timestamp (&child,
"afterClusterTime",
16,
cs->operation_timestamp,
cs->operation_increment);
} else if (is_snapshot && cs->snapshot_time_set) {
bson_append_timestamp (&child,
"atClusterTime",
13,
cs->snapshot_time_timestamp,
cs->snapshot_time_increment);
}

bson_append_document_end (cmd, &child);
Expand Down Expand Up @@ -1621,6 +1724,27 @@ _mongoc_client_session_pin (mongoc_client_session_t *session,
session->server_id = server_id;
}

void
_mongoc_client_session_set_snapshot_time (mongoc_client_session_t *session,
uint32_t t,
uint32_t i)
{
BSON_ASSERT (session);
BSON_ASSERT (!session->snapshot_time_set);

session->snapshot_time_set = true;
session->snapshot_time_timestamp = t;
session->snapshot_time_increment = i;
}

void
_mongoc_client_session_clear_snapshot_time (mongoc_client_session_t *session)
{
BSON_ASSERT (session);

session->snapshot_time_set = false;
}

bool
mongoc_client_session_get_dirty (mongoc_client_session_t *session)
{
Expand Down
6 changes: 6 additions & 0 deletions src/libmongoc/src/mongoc/mongoc-client-session.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ mongoc_session_opts_set_causal_consistency (mongoc_session_opt_t *opts,
MONGOC_EXPORT (bool)
mongoc_session_opts_get_causal_consistency (const mongoc_session_opt_t *opts);

MONGOC_EXPORT (void)
mongoc_session_opts_set_snapshot (mongoc_session_opt_t *opts, bool snapshot);

MONGOC_EXPORT (bool)
mongoc_session_opts_get_snapshot (const mongoc_session_opt_t *opts);

MONGOC_EXPORT (void)
mongoc_session_opts_set_default_transaction_opts (
mongoc_session_opt_t *opts, const mongoc_transaction_opt_t *txn_opts);
Expand Down
Loading