Skip to content

CDRIVER-4617 Remove legacy mongoc-rpc entities #1312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/libmongoc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,6 @@ set (test-libmongoc-sources
${PROJECT_SOURCE_DIR}/tests/test-mongoc-read-write-concern.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-retryable-reads.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-retryable-writes.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-rpc.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-sample-commands.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-scram.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-sdam-monitoring.c
Expand Down
11 changes: 0 additions & 11 deletions src/libmongoc/src/mongoc/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,15 +1,4 @@
set (src_libmongoc_src_mongoc_DIST_defs
op-delete.def
op-get-more.def
op-header.def
op-insert.def
op-kill-cursors.def
op-msg.def
op-query.def
op-reply.def
op-reply-header.def
op-update.def
op-compressed.def
mongoc-counters.defs
)

Expand Down
13 changes: 0 additions & 13 deletions src/libmongoc/src/mongoc/mongoc-cluster-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,6 @@ _mongoc_cluster_get_auth_cmd_scram (mongoc_crypto_hash_algorithm_t algo,
bson_error_t *error /* OUT */);
#endif /* MONGOC_ENABLE_CRYPTO */

char *
_mongoc_rpc_compress (struct _mongoc_cluster_t *cluster,
int32_t compressor_id,
mongoc_rpc_t *rpc_le,
bson_error_t *error);
bool
_mongoc_rpc_decompress (mongoc_rpc_t *rpc_le, uint8_t *buf, size_t buflen);

bool
mcd_rpc_message_compress (mcd_rpc_message *rpc,
int32_t compressor_id,
Expand All @@ -279,11 +271,6 @@ mcd_rpc_message_decompress (mcd_rpc_message *rpc,
void **data,
size_t *data_len);

bool
_mongoc_rpc_decompress_if_necessary (mongoc_rpc_t *rpc,
mongoc_buffer_t *buffer /* IN/OUT */,
bson_error_t *error /* OUT */);

bool
mcd_rpc_message_decompress_if_necessary (mcd_rpc_message *rpc,
void **data,
Expand Down
176 changes: 0 additions & 176 deletions src/libmongoc/src/mongoc/mongoc-cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -3790,142 +3790,6 @@ mongoc_cluster_run_opmsg (mongoc_cluster_t *cluster,
}


/*
*--------------------------------------------------------------------------
*
* _mongoc_rpc_compress --
*
* Takes a (little endian) rpc struct and creates a OP_COMPRESSED
* compressed opcode based on the provided compressor_id.
* The in-place updated rpc struct remains little endian.
*
* Side effects:
* Overwrites the RPC, and clears and overwrites the cluster buffer
* with the compressed results.
*
*--------------------------------------------------------------------------
*/

char *
_mongoc_rpc_compress (struct _mongoc_cluster_t *cluster,
int32_t compressor_id,
mongoc_rpc_t *rpc_le,
bson_error_t *error)
{
const size_t allocate = BSON_UINT32_FROM_LE (rpc_le->header.msg_len) - 16u;
BSON_ASSERT (allocate > 0u);

char *const data = bson_malloc0 (allocate);
const size_t size = _mongoc_cluster_buffer_iovec (
cluster->iov.data, cluster->iov.len, 16, data);
size_t output_length =
mongoc_compressor_max_compressed_length (compressor_id, size);

if (!output_length) {
bson_set_error (error,
MONGOC_ERROR_COMMAND,
MONGOC_ERROR_COMMAND_INVALID_ARG,
"Could not determine compression bounds for %s",
mongoc_compressor_id_to_name (compressor_id));
bson_free (data);
return NULL;
}

int32_t compression_level = -1;

if (compressor_id == MONGOC_COMPRESSOR_ZLIB_ID) {
compression_level = mongoc_uri_get_option_as_int32 (
cluster->uri, MONGOC_URI_ZLIBCOMPRESSIONLEVEL, -1);
}

BSON_ASSERT (size > 0u);

char *const output = (char *) bson_malloc0 (output_length);
if (mongoc_compress (compressor_id,
compression_level,
data,
size,
output,
&output_length)) {
rpc_le->header.msg_len = 0;
rpc_le->compressed.original_opcode =
BSON_UINT32_FROM_LE (rpc_le->header.opcode);
rpc_le->header.opcode = MONGOC_OPCODE_COMPRESSED;
rpc_le->header.request_id =
BSON_UINT32_FROM_LE (rpc_le->header.request_id);
rpc_le->header.response_to =
BSON_UINT32_FROM_LE (rpc_le->header.response_to);

BSON_ASSERT (bson_in_range_unsigned (int32_t, size));
BSON_ASSERT (bson_in_range_unsigned (int32_t, output_length));

rpc_le->compressed.uncompressed_size = (int32_t) size;
rpc_le->compressed.compressor_id = compressor_id;
rpc_le->compressed.compressed_message = (const uint8_t *) output;
rpc_le->compressed.compressed_message_len = (int32_t) output_length;
bson_free (data);


_mongoc_array_destroy (&cluster->iov);
_mongoc_array_init (&cluster->iov, sizeof (mongoc_iovec_t));
_mongoc_rpc_gather (rpc_le, &cluster->iov);
_mongoc_rpc_swab_to_le (rpc_le);
return output;
} else {
MONGOC_WARNING ("Could not compress data with %s",
mongoc_compressor_id_to_name (compressor_id));
}
bson_free (data);
bson_free (output);
return NULL;
}

/*
*--------------------------------------------------------------------------
*
* _mongoc_rpc_decompress --
*
* Takes a (little endian) rpc struct assumed to be OP_COMPRESSED
* and decompresses the opcode into its original opcode.
* The in-place updated rpc struct remains little endian.
*
* Side effects:
* Overwrites the RPC, along with the provided buf with the
* compressed results.
*
*--------------------------------------------------------------------------
*/

bool
_mongoc_rpc_decompress (mongoc_rpc_t *rpc_le, uint8_t *buf, size_t buflen)
{
size_t uncompressed_size =
BSON_UINT32_FROM_LE (rpc_le->compressed.uncompressed_size);
bool ok;
size_t msg_len = BSON_UINT32_TO_LE (buflen);
const size_t original_uncompressed_size = uncompressed_size;

BSON_ASSERT (uncompressed_size <= buflen);
memcpy (buf, (void *) (&msg_len), 4);
memcpy (buf + 4, (void *) (&rpc_le->header.request_id), 4);
memcpy (buf + 8, (void *) (&rpc_le->header.response_to), 4);
memcpy (buf + 12, (void *) (&rpc_le->compressed.original_opcode), 4);

ok = mongoc_uncompress (rpc_le->compressed.compressor_id,
rpc_le->compressed.compressed_message,
rpc_le->compressed.compressed_message_len,
buf + 16,
&uncompressed_size);

BSON_ASSERT (original_uncompressed_size == uncompressed_size);

if (ok) {
return _mongoc_rpc_scatter (rpc_le, buf, buflen);
}

return false;
}

bool
mcd_rpc_message_compress (mcd_rpc_message *rpc,
int32_t compressor_id,
Expand Down Expand Up @@ -4107,46 +3971,6 @@ mcd_rpc_message_decompress (mcd_rpc_message *rpc, void **data, size_t *data_len)
return mcd_rpc_message_from_data_in_place (rpc, *data, *data_len, NULL);
}


/* If rpc is OP_COMPRESSED, decompress it into buffer.
*
* Assumes rpc is still in network little-endian representation (i.e.
* _mongoc_rpc_swab_to_le has not been called).
* Returns true if rpc is not OP_COMPRESSED (and is a no-op) or if decompression
* succeeds.
* Return false and sets error otherwise.
*/
bool
_mongoc_rpc_decompress_if_necessary (mongoc_rpc_t *rpc,
mongoc_buffer_t *buffer /* IN/OUT */,
bson_error_t *error /* OUT */)
{
uint8_t *buf = NULL;
size_t len;

if (BSON_UINT32_FROM_LE (rpc->header.opcode) != MONGOC_OPCODE_COMPRESSED) {
return true;
}

len = BSON_UINT32_FROM_LE (rpc->compressed.uncompressed_size) +
sizeof (mongoc_rpc_header_t);

buf = bson_malloc0 (len);
if (!_mongoc_rpc_decompress (rpc, buf, len)) {
bson_free (buf);
bson_set_error (error,
MONGOC_ERROR_PROTOCOL,
MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
"Could not decompress server reply");
return false;
}

_mongoc_buffer_destroy (buffer);
_mongoc_buffer_init (buffer, buf, len, NULL, NULL);

return true;
}

bool
mcd_rpc_message_decompress_if_necessary (mcd_rpc_message *rpc,
void **data,
Expand Down
134 changes: 0 additions & 134 deletions src/libmongoc/src/mongoc/mongoc-rpc-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,152 +19,18 @@

#include "mongoc-prelude.h"

#include "mongoc-array-private.h"
#include "mongoc-iovec.h"

#include "mcd-rpc.h"

#include <bson/bson.h>

#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>

BSON_BEGIN_DECLS

typedef struct _mongoc_rpc_section_t {
uint8_t payload_type;
union {
/* payload_type == 0 */
const uint8_t *bson_document;
/* payload_type == 1 */
struct {
int32_t size;
uint32_t size_le;
const char *identifier;
const uint8_t *bson_documents;
} sequence;
} payload;
} mongoc_rpc_section_t;

#define RPC(_name, _code) \
typedef struct { \
_code \
} mongoc_rpc_##_name##_t;
#define ENUM_FIELD(_name) uint32_t _name;
#define INT32_FIELD(_name) int32_t _name;
#define UINT8_FIELD(_name) uint8_t _name;
#define INT64_FIELD(_name) int64_t _name;
#define INT64_ARRAY_FIELD(_len, _name) \
int32_t _len; \
int64_t *_name;
#define CSTRING_FIELD(_name) const char *_name;
#define BSON_FIELD(_name) const uint8_t *_name;
#define BSON_ARRAY_FIELD(_name) \
const uint8_t *_name; \
int32_t _name##_len;
#define IOVEC_ARRAY_FIELD(_name) \
const mongoc_iovec_t *_name; \
int32_t n_##_name; \
mongoc_iovec_t _name##_recv;
#define SECTION_ARRAY_FIELD(_name) \
mongoc_rpc_section_t _name[2]; \
int32_t n_##_name;
#define RAW_BUFFER_FIELD(_name) \
const uint8_t *_name; \
int32_t _name##_len;
#define BSON_OPTIONAL(_check, _code) _code
#define CHECKSUM_FIELD(_name) uint32_t _name;


#pragma pack(1)
#include "op-delete.def"
#include "op-get-more.def"
#include "op-header.def"
#include "op-insert.def"
#include "op-kill-cursors.def"
#include "op-query.def"
#include "op-reply.def"
#include "op-reply-header.def"
#include "op-update.def"
#include "op-compressed.def"
/* restore default packing */
#pragma pack()

#include "op-msg.def"

typedef union {
mongoc_rpc_delete_t delete_;
mongoc_rpc_get_more_t get_more;
mongoc_rpc_header_t header;
mongoc_rpc_insert_t insert;
mongoc_rpc_kill_cursors_t kill_cursors;
mongoc_rpc_msg_t msg;
mongoc_rpc_query_t query;
mongoc_rpc_reply_t reply;
mongoc_rpc_reply_header_t reply_header;
mongoc_rpc_update_t update;
mongoc_rpc_compressed_t compressed;
int32_t _init; // Used only for initialization.
} mongoc_rpc_t;


BSON_STATIC_ASSERT2 (sizeof_rpc_header, sizeof (mongoc_rpc_header_t) == 16);
BSON_STATIC_ASSERT2 (offsetof_rpc_header,
offsetof (mongoc_rpc_header_t, opcode) ==
offsetof (mongoc_rpc_reply_t, opcode));
BSON_STATIC_ASSERT2 (sizeof_reply_header,
sizeof (mongoc_rpc_reply_header_t) == 36);


#undef RPC
#undef ENUM_FIELD
#undef UINT8_FIELD
#undef INT32_FIELD
#undef INT64_FIELD
#undef INT64_ARRAY_FIELD
#undef CSTRING_FIELD
#undef BSON_FIELD
#undef BSON_ARRAY_FIELD
#undef IOVEC_ARRAY_FIELD
#undef SECTION_ARRAY_FIELD
#undef BSON_OPTIONAL
#undef RAW_BUFFER_FIELD
#undef CHECKSUM_FIELD


void
_mongoc_rpc_gather (mongoc_rpc_t *rpc, mongoc_array_t *array);
void
_mongoc_rpc_op_egress_inc (const mongoc_rpc_t *rpc);
void
_mongoc_rpc_swab_to_le (mongoc_rpc_t *rpc);
void
_mongoc_rpc_swab_from_le (mongoc_rpc_t *rpc);
void
_mongoc_rpc_printf (mongoc_rpc_t *rpc);
bool
_mongoc_rpc_scatter (mongoc_rpc_t *rpc, const uint8_t *buf, size_t buflen);
bool
_mongoc_rpc_scatter_reply_header_only (mongoc_rpc_t *rpc,
const uint8_t *buf,
size_t buflen);

bool
mcd_rpc_message_get_body (const mcd_rpc_message *rpc, bson_t *reply);

bool
_mongoc_rpc_get_first_document (mongoc_rpc_t *rpc, bson_t *reply);
bool
_mongoc_rpc_reply_get_first (mongoc_rpc_reply_t *reply, bson_t *bson);
bool
_mongoc_rpc_reply_get_first_msg (mongoc_rpc_msg_t *reply, bson_t *bson);
bool
_mongoc_rpc_check_ok (mongoc_rpc_t *rpc,
int32_t error_api_version,
bson_error_t *error /* OUT */,
bson_t *error_doc /* OUT */);

bool
mcd_rpc_message_check_ok (mcd_rpc_message *rpc,
int32_t error_api_version,
Expand Down
Loading