Skip to content

CDRIVER-4617 Refactor libmongoc to use mcd-rpc #1307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/libmongoc/src/mongoc/mongoc-async-cmd-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@

#include <bson/bson.h>

#include "mcd-rpc.h"
#include "mongoc-client.h"
#include "mongoc-async-private.h"
#include "mongoc-array-private.h"
#include "mongoc-buffer-private.h"
#include "mongoc-cmd-private.h"
#include "mongoc-rpc-private.h"
#include "mongoc-stream.h"

BSON_BEGIN_DECLS
Expand Down Expand Up @@ -59,12 +59,11 @@ typedef struct _mongoc_async_cmd {
int64_t timeout_msec;
bson_t cmd;
mongoc_buffer_t buffer;
mongoc_array_t array;
mongoc_iovec_t *iovec;
size_t niovec;
size_t bytes_written;
size_t bytes_to_read;
mongoc_rpc_t rpc;
mcd_rpc_message *rpc;
bson_t reply;
bool reply_needs_cleanup;
char *ns;
Expand All @@ -85,7 +84,7 @@ mongoc_async_cmd_new (mongoc_async_t *async,
void *setup_ctx,
const char *dbname,
const bson_t *cmd,
const mongoc_opcode_t cmd_opcode,
const int32_t cmd_opcode,
mongoc_async_cmd_cb_t cb,
void *cb_data,
int64_t timeout_msec);
Expand Down
126 changes: 66 additions & 60 deletions src/libmongoc/src/mongoc/mongoc-async-cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,42 +134,47 @@ mongoc_async_cmd_run (mongoc_async_cmd_t *acmd)
return false;
}

void
_mongoc_async_cmd_init_send (const mongoc_opcode_t cmd_opcode,
static void
_mongoc_async_cmd_init_send (const int32_t cmd_opcode,
mongoc_async_cmd_t *acmd,
const char *dbname)
{
acmd->rpc.header.msg_len = 0;
acmd->rpc.header.request_id = ++acmd->async->request_id;
acmd->rpc.header.response_to = 0;
BSON_ASSERT (cmd_opcode == MONGOC_OP_CODE_QUERY ||
cmd_opcode == MONGOC_OP_CODE_MSG);

if (MONGOC_OPCODE_QUERY == cmd_opcode) {
acmd->ns = bson_strdup_printf ("%s.$cmd", dbname);
acmd->rpc.header.opcode = MONGOC_OPCODE_QUERY;
acmd->rpc.query.flags = MONGOC_QUERY_SECONDARY_OK;
acmd->rpc.query.collection = acmd->ns;
acmd->rpc.query.skip = 0;
acmd->rpc.query.n_return = -1;
acmd->rpc.query.query = bson_get_data (&acmd->cmd);
acmd->rpc.query.fields = NULL;
}
int32_t message_length = 0;

if (MONGOC_OPCODE_MSG == cmd_opcode) {
acmd->rpc.header.opcode = MONGOC_OPCODE_MSG;
message_length += mcd_rpc_header_set_message_length (acmd->rpc, 0);
message_length +=
mcd_rpc_header_set_request_id (acmd->rpc, ++acmd->async->request_id);
message_length += mcd_rpc_header_set_response_to (acmd->rpc, 0);
message_length += mcd_rpc_header_set_op_code (acmd->rpc, cmd_opcode);

acmd->rpc.msg.msg_len = 0;
acmd->rpc.msg.flags = 0;
acmd->rpc.msg.n_sections = 1;
acmd->rpc.msg.sections[0].payload_type = 0;
acmd->rpc.msg.sections[0].payload.bson_document =
bson_get_data (&acmd->cmd);
if (cmd_opcode == MONGOC_OP_CODE_QUERY) {
acmd->ns = bson_strdup_printf ("%s.$cmd", dbname);
message_length += mcd_rpc_op_query_set_flags (
acmd->rpc, MONGOC_OP_QUERY_FLAG_SECONDARY_OK);
message_length +=
mcd_rpc_op_query_set_full_collection_name (acmd->rpc, acmd->ns);
message_length += mcd_rpc_op_query_set_number_to_skip (acmd->rpc, 0);
message_length += mcd_rpc_op_query_set_number_to_return (acmd->rpc, -1);
message_length +=
mcd_rpc_op_query_set_query (acmd->rpc, bson_get_data (&acmd->cmd));
} else {
mcd_rpc_op_msg_set_sections_count (acmd->rpc, 1u);
message_length +=
mcd_rpc_op_msg_set_flag_bits (acmd->rpc, MONGOC_OP_MSG_FLAG_NONE);
message_length += mcd_rpc_op_msg_section_set_kind (acmd->rpc, 0u, 0);
message_length += mcd_rpc_op_msg_section_set_body (
acmd->rpc, 0u, bson_get_data (&acmd->cmd));
}

mcd_rpc_message_set_length (acmd->rpc, message_length);

/* This will always be hello, which are not allowed to be compressed */
_mongoc_rpc_gather (&acmd->rpc, &acmd->array);
acmd->iovec = (mongoc_iovec_t *) acmd->array.data;
acmd->niovec = acmd->array.len;
_mongoc_rpc_swab_to_le (&acmd->rpc);
acmd->iovec = mcd_rpc_message_to_iovecs (acmd->rpc, &acmd->niovec);
BSON_ASSERT (acmd->iovec);

acmd->bytes_written = 0;
}

Expand Down Expand Up @@ -198,17 +203,15 @@ mongoc_async_cmd_new (mongoc_async_t *async,
void *setup_ctx,
const char *dbname,
const bson_t *cmd,
const mongoc_opcode_t cmd_opcode, /* OP_QUERY or OP_MSG */
const int32_t cmd_opcode, /* OP_QUERY or OP_MSG */
mongoc_async_cmd_cb_t cb,
void *cb_data,
int64_t timeout_msec)
{
mongoc_async_cmd_t *acmd;
BSON_ASSERT_PARAM (cmd);
BSON_ASSERT_PARAM (dbname);

BSON_ASSERT (cmd);
BSON_ASSERT (dbname);

acmd = BSON_ALIGNED_ALLOC0 (mongoc_async_cmd_t);
mongoc_async_cmd_t *const acmd = BSON_ALIGNED_ALLOC0 (mongoc_async_cmd_t);
acmd->async = async;
acmd->dns_result = dns_result;
acmd->timeout_msec = timeout_msec;
Expand All @@ -222,12 +225,13 @@ mongoc_async_cmd_new (mongoc_async_t *async,
acmd->connect_started = bson_get_monotonic_time ();
bson_copy_to (cmd, &acmd->cmd);

if (MONGOC_OPCODE_MSG == cmd_opcode) {
/* If we're sending an OPCODE_MSG, we need to add the "db" field: */
if (MONGOC_OP_CODE_MSG == cmd_opcode) {
/* If we're sending an OP_MSG, we need to add the "db" field: */
bson_append_utf8 (&acmd->cmd, "$db", 3, "admin", 5);
}

_mongoc_array_init (&acmd->array, sizeof (mongoc_iovec_t));
acmd->rpc = mcd_rpc_message_new ();
acmd->iovec = NULL;
_mongoc_buffer_init (&acmd->buffer, NULL, 0, NULL, NULL);

_mongoc_async_cmd_init_send (cmd_opcode, acmd, dbname);
Expand Down Expand Up @@ -255,8 +259,9 @@ mongoc_async_cmd_destroy (mongoc_async_cmd_t *acmd)
bson_destroy (&acmd->reply);
}

_mongoc_array_destroy (&acmd->array);
bson_free (acmd->iovec);
_mongoc_buffer_destroy (&acmd->buffer);
mcd_rpc_message_destroy (acmd->rpc);

bson_free (acmd->ns);
bson_free (acmd);
Expand Down Expand Up @@ -348,7 +353,7 @@ _mongoc_async_cmd_phase_send (mongoc_async_cmd_t *acmd)
used_temp_iovec = true;
}

_mongoc_rpc_op_egress_inc (&acmd->rpc);
mcd_rpc_message_egress (acmd->rpc);
bytes = mongoc_stream_writev (acmd->stream, iovec, niovec, 0);

if (used_temp_iovec) {
Expand Down Expand Up @@ -462,38 +467,39 @@ _mongoc_async_cmd_phase_recv_rpc (mongoc_async_cmd_t *acmd)
acmd->bytes_to_read = (size_t) (acmd->bytes_to_read - bytes);

if (!acmd->bytes_to_read) {
if (!_mongoc_rpc_scatter (
&acmd->rpc, acmd->buffer.data, acmd->buffer.len)) {
mcd_rpc_message_reset (acmd->rpc);
if (!mcd_rpc_message_from_data_in_place (
acmd->rpc, acmd->buffer.data, acmd->buffer.len, NULL)) {
bson_set_error (&acmd->error,
MONGOC_ERROR_PROTOCOL,
MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
"Invalid reply from server.");
return MONGOC_ASYNC_CMD_ERROR;
}
if (BSON_UINT32_FROM_LE (acmd->rpc.header.opcode) ==
MONGOC_OPCODE_COMPRESSED) {
uint8_t *buf = NULL;
size_t len =
BSON_UINT32_FROM_LE (acmd->rpc.compressed.uncompressed_size) +
sizeof (mongoc_rpc_header_t);

buf = bson_malloc0 (len);
if (!_mongoc_rpc_decompress (&acmd->rpc, buf, len)) {
bson_free (buf);
bson_set_error (&acmd->error,
MONGOC_ERROR_PROTOCOL,
MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
"Could not decompress server reply");
return MONGOC_ASYNC_CMD_ERROR;
}
mcd_rpc_message_ingress (acmd->rpc);

_mongoc_buffer_destroy (&acmd->buffer);
_mongoc_buffer_init (&acmd->buffer, buf, len, NULL, NULL);
void *decompressed_data;
size_t decompressed_data_len;

if (!mcd_rpc_message_decompress_if_necessary (
acmd->rpc, &decompressed_data, &decompressed_data_len)) {
bson_set_error (&acmd->error,
MONGOC_ERROR_PROTOCOL,
MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
"Could not decompress server reply");
return MONGOC_ASYNC_CMD_ERROR;
}

_mongoc_rpc_swab_from_le (&acmd->rpc);
if (decompressed_data) {
_mongoc_buffer_destroy (&acmd->buffer);
_mongoc_buffer_init (&acmd->buffer,
decompressed_data,
decompressed_data_len,
NULL,
NULL);
}

if (!_mongoc_rpc_get_first_document (&acmd->rpc, &acmd->reply)) {
if (!mcd_rpc_message_get_body (acmd->rpc, &acmd->reply)) {
bson_set_error (&acmd->error,
MONGOC_ERROR_PROTOCOL,
MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
Expand Down
2 changes: 1 addition & 1 deletion src/libmongoc/src/mongoc/mongoc-client-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ _mongoc_client_create_stream (mongoc_client_t *client,

bool
_mongoc_client_recv (mongoc_client_t *client,
mongoc_rpc_t *rpc,
mcd_rpc_message *rpc,
mongoc_buffer_t *buffer,
mongoc_server_stream_t *server_stream,
bson_error_t *error);
Expand Down
67 changes: 31 additions & 36 deletions src/libmongoc/src/mongoc/mongoc-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -951,25 +951,9 @@ _mongoc_client_create_stream (mongoc_client_t *client,
}


/*
*--------------------------------------------------------------------------
*
* _mongoc_client_recv --
*
* Receives a RPC from a remote MongoDB cluster node.
*
* Returns:
* true if successful; otherwise false and @error is set.
*
* Side effects:
* @error is set if return value is false.
*
*--------------------------------------------------------------------------
*/

bool
_mongoc_client_recv (mongoc_client_t *client,
mongoc_rpc_t *rpc,
mcd_rpc_message *rpc,
mongoc_buffer_t *buffer,
mongoc_server_stream_t *server_stream,
bson_error_t *error)
Expand All @@ -978,6 +962,7 @@ _mongoc_client_recv (mongoc_client_t *client,
BSON_ASSERT (rpc);
BSON_ASSERT (buffer);
BSON_ASSERT (server_stream);
BSON_ASSERT_PARAM (error);

return mongoc_cluster_try_recv (
&client->cluster, rpc, buffer, server_stream, error);
Expand Down Expand Up @@ -2478,36 +2463,44 @@ _mongoc_client_op_killcursors (mongoc_cluster_t *cluster,
const char *db,
const char *collection)
{
int64_t started;
mongoc_rpc_t rpc = {{0}};
bson_error_t error;
bool has_ns;
bool r;
BSON_ASSERT_PARAM (cluster);
BSON_ASSERT_PARAM (server_stream);
BSON_ASSERT (db || true);
BSON_ASSERT (collection || true);

const bool has_ns = db && collection;
const int64_t started = bson_get_monotonic_time ();

/* called by old mongoc_client_kill_cursor without db/collection? */
has_ns = (db && collection);
started = bson_get_monotonic_time ();
mcd_rpc_message *const rpc = mcd_rpc_message_new ();

++cluster->request_id;
{
int32_t message_length = 0;

rpc.header.msg_len = 0;
rpc.header.request_id = cluster->request_id;
rpc.header.response_to = 0;
rpc.header.opcode = MONGOC_OPCODE_KILL_CURSORS;
rpc.kill_cursors.zero = 0;
rpc.kill_cursors.cursors = &cursor_id;
rpc.kill_cursors.n_cursors = 1;
message_length += mcd_rpc_header_set_message_length (rpc, 0);
message_length +=
mcd_rpc_header_set_request_id (rpc, ++cluster->request_id);
message_length += mcd_rpc_header_set_response_to (rpc, 0);
message_length +=
mcd_rpc_header_set_op_code (rpc, MONGOC_OP_CODE_KILL_CURSORS);

message_length += sizeof (int32_t); // ZERO
message_length +=
mcd_rpc_op_kill_cursors_set_cursor_ids (rpc, &cursor_id, 1);

mcd_rpc_message_set_length (rpc, message_length);
}

if (has_ns) {
_mongoc_client_monitor_op_killcursors (
cluster, server_stream, cursor_id, operation_id, db, collection);
}

r = mongoc_cluster_legacy_rpc_sendv_to_server (
cluster, &rpc, server_stream, &error);
bson_error_t error;
const bool res = mongoc_cluster_legacy_rpc_sendv_to_server (
cluster, rpc, server_stream, &error);

if (has_ns) {
if (r) {
if (res) {
_mongoc_client_monitor_op_killcursors_succeeded (
cluster,
bson_get_monotonic_time () - started,
Expand All @@ -2523,6 +2516,8 @@ _mongoc_client_op_killcursors (mongoc_cluster_t *cluster,
operation_id);
}
}

mcd_rpc_message_destroy (rpc);
}


Expand Down
Loading