Skip to content

CDRIVER-4193 Ensure OP_MSG for handshakes and fix RPC op_egress counters #1256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c747ab8
Move SHM count function into mongoc-counters-private.h
eramongodb Apr 18, 2023
c19b049
Declare test_framework_has_compressors in test-libmongoc.h
eramongodb May 1, 2023
d519876
Allow /counters/op_msg and /counters/op_compressed with auth
eramongodb Apr 28, 2023
d79ee1f
Add _mongoc_rpc_gather_no_inc
eramongodb Apr 25, 2023
25fdd14
Add _mongoc_rpc_op_egress_inc
eramongodb Apr 28, 2023
0257425
CDRIVER-4193 RTT thread should also use hello if serverApi is given
eramongodb Apr 26, 2023
3a10892
Do not increment RPC egress counters in mock server
eramongodb Apr 25, 2023
891e2b8
Add /counters/rpc/egress/cluster/single
eramongodb Apr 28, 2023
aab46d2
Add /counters/rpc/egress/cluster/legacy
eramongodb May 1, 2023
5c6a33d
Add /counters/rpc/egress/cluster/pooled
eramongodb Apr 25, 2023
65a6043
Add /counters/rpc/egress/awaitable_hello
eramongodb Apr 26, 2023
450d410
Add /counters/rpc/egress/mock_server
eramongodb Apr 25, 2023
88c57c0
CDRIVER-4121 Add /counters/rpc/egress/auth
eramongodb May 1, 2023
6da29f5
CDRIVER-4622 RPC egress increment: mongoc_async_cmd_new -> mongoc_asy…
eramongodb Apr 28, 2023
2204e92
RPC egress increment: mongoc_cluster_run_command_opquery and mongoc_c…
eramongodb May 1, 2023
08b5440
RPC egress increment: mongoc_cluster_legacy_rpc_sendv_to_server
eramongodb May 1, 2023
c7204b5
RPC egress increment: _server_monitor_send_and_recv
eramongodb May 1, 2023
18c356a
RPC egress increment: _server_monitor_awaitable_hello
eramongodb May 1, 2023
afa1bc5
Remove references to _mongoc_rpc_gather in test suite
eramongodb May 2, 2023
a540b96
Remove _mongoc_rpc_gather
eramongodb May 2, 2023
47482d0
Rename _mongoc_rpc_gather_no_inc -> _mongc_rpc_gather
eramongodb May 2, 2023
e269430
Add mongoc_<object>_uses_loadbalanced
eramongodb May 1, 2023
b8c26a1
CDRIVER-4265 Use OP_MSG for handshakes when loadBalanced=true
eramongodb May 1, 2023
6552752
Clarify meaning of RPC op_egress counters in documentation
eramongodb May 2, 2023
257d550
Rename rpc_egress -> rpc_op_egress in tests for naming consistency
eramongodb May 2, 2023
a1dde66
Use existing test framework user and password
eramongodb May 3, 2023
190d013
Assert OP_MSG vs. OP_QUERY when mock server receives messages
eramongodb May 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions src/libmongoc/doc/basic-troubleshooting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ The following is a short list of things to check when you have a problem.
Performance Counters
--------------------

The MongoDB C driver comes with an optional unique feature to help developers and sysadmins troubleshoot problems in production.
Performance counters are available for each process using the driver.
The MongoDB C Driver comes with an optional and unique feature to help developers and sysadmins troubleshoot problems in production.
Performance counters are available for each process using the C Driver.
If available, the counters can be accessed outside of the application process via a shared memory segment.
This means that you can graph statistics about your application process easily from tools like Munin or Nagios.
Your author often uses ``watch --interval=0.5 -d mongoc-stat $PID`` to monitor an application.
The counters may be used graph statistics about your application process easily from tools like Munin or Nagios.
For example, the command ``watch --interval=0.5 -d mongoc-stat $PID`` may be used to monitor an application.

Performance counters are only available on Linux platforms and macOS arm64 platforms supporting shared memory segments.
On supported platforms they are enabled by default.
Applications can be built without the counters by specifying the cmake option ``-DENABLE_SHM_COUNTERS=OFF``. Additionally, if
performance counters are already compiled, they can be disabled at runtime by specifying the environment variable ``MONGOC_DISABLE_SHM``.
Performance counters are only available on Linux platforms and macOS arm64 platforms that support shared memory segments.
On supported platforms, they are enabled by default.
Applications can be built without the counters by specifying the cmake option ``-DENABLE_SHM_COUNTERS=OFF``.
Additionally, if performance counters are already compiled, they can be disabled at runtime by specifying the environment variable ``MONGOC_DISABLE_SHM``.

Performance counters keep track of the following:

Expand All @@ -37,6 +37,10 @@ Performance counters keep track of the following:
* Authentication successes and failures.
* Number of wire protocol errors.

.. note::
An operation is considered "sent" when one or more bytes of the corresponding message is written to the stream, regardless of whether the entire message is successfully written or if the operation ultimately succeeds or fails.
This does not include bytes that may be written during the stream connection process, such as TLS handshake messages.

To access counters for a given process, simply provide the process id to the ``mongoc-stat`` program installed with the MongoDB C Driver.

.. code-block:: none
Expand Down
1 change: 1 addition & 0 deletions src/libmongoc/src/mongoc/mongoc-async-cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ _mongoc_async_cmd_phase_send (mongoc_async_cmd_t *acmd)
used_temp_iovec = true;
}

_mongoc_rpc_op_egress_inc (&acmd->rpc);
bytes = mongoc_stream_writev (acmd->stream, iovec, niovec, 0);

if (used_temp_iovec) {
Expand Down
10 changes: 8 additions & 2 deletions src/libmongoc/src/mongoc/mongoc-client-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,17 @@ mongoc_client_connect (bool buffered,
bson_error_t *error);


/* Returns true if a versioned server API has been selected,
* otherwise returns false. */
/* Returns true if a versioned server API has been selected, otherwise returns
* false. */
bool
mongoc_client_uses_server_api (const mongoc_client_t *client);


/* Returns true if load balancing mode has been selected, otherwise returns
* false. */
bool
mongoc_client_uses_loadbalanced (const mongoc_client_t *client);

BSON_END_DECLS

#endif /* MONGOC_CLIENT_PRIVATE_H */
6 changes: 6 additions & 0 deletions src/libmongoc/src/mongoc/mongoc-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -3184,3 +3184,9 @@ mongoc_client_uses_server_api (const mongoc_client_t *client)
{
return mongoc_topology_uses_server_api (client->topology);
}

bool
mongoc_client_uses_loadbalanced (const mongoc_client_t *client)
{
return mongoc_topology_uses_loadbalanced (client->topology);
}
9 changes: 7 additions & 2 deletions src/libmongoc/src/mongoc/mongoc-cluster-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,16 @@ _mongoc_cluster_get_auth_cmd_x509 (const mongoc_uri_t *uri,
bson_t *cmd /* OUT */,
bson_error_t *error /* OUT */);

/* Returns true if a versioned server API has been selected,
* otherwise returns false. */
/* Returns true if a versioned server API has been selected, otherwise returns
* false. */
bool
mongoc_cluster_uses_server_api (const mongoc_cluster_t *cluster);

/* Returns true if load balancing mode has been selected, otherwise returns
* false. */
bool
mongoc_cluster_uses_loadbalanced (const mongoc_cluster_t *cluster);

#ifdef MONGOC_ENABLE_CRYPTO
void
_mongoc_cluster_init_scram (const mongoc_cluster_t *cluster,
Expand Down
39 changes: 29 additions & 10 deletions src/libmongoc/src/mongoc/mongoc-cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ mongoc_cluster_run_command_opquery (mongoc_cluster_t *cluster,
/*
* send and receive
*/
_mongoc_rpc_op_egress_inc (&rpc);
if (!_mongoc_stream_writev_full (stream,
cluster->iov.data,
cluster->iov.len,
Expand Down Expand Up @@ -635,6 +636,14 @@ mongoc_cluster_run_command_monitored (mongoc_cluster_t *cluster,
}


static bool
_should_use_op_msg (const mongoc_cluster_t *cluster)
{
return mongoc_cluster_uses_server_api (cluster) ||
mongoc_cluster_uses_loadbalanced (cluster);
}


/*
*--------------------------------------------------------------------------
*
Expand Down Expand Up @@ -676,7 +685,7 @@ mongoc_cluster_run_command_private (mongoc_cluster_t *cluster,

server_stream = cmd->server_stream;

if (mongoc_cluster_uses_server_api (cluster) ||
if (_should_use_op_msg (cluster) ||
server_stream->sd->max_wire_version >= WIRE_VERSION_MIN) {
retval = mongoc_cluster_run_opmsg (cluster, cmd, reply, error);
} else {
Expand Down Expand Up @@ -818,24 +827,22 @@ _stream_run_hello (mongoc_cluster_t *cluster,
to either an op_msg or op_query: */
memset (&hello_cmd, 0, sizeof (hello_cmd));


hello_cmd.db_name = "admin";
hello_cmd.command = &handshake_command;
hello_cmd.command_name = _mongoc_get_command_name (&handshake_command);
hello_cmd.server_stream = server_stream;

hello_cmd.is_acknowledged = true;

/* Use OP_QUERY for the handshake, unless the user has specified an
* API version; the correct hello_cmd has already been selected: */
if (!mongoc_cluster_uses_server_api (cluster)) {
if (!_should_use_op_msg (cluster)) {
/* Complete OPCODE_QUERY setup: */
hello_cmd.query_flags = MONGOC_QUERY_SECONDARY_OK;
} else {
/* We're using OP_MSG, and require some additional doctoring: */
bson_append_utf8 (&handshake_command, "$db", 3, "admin", 5);
}

hello_cmd.db_name = "admin";
hello_cmd.command = &handshake_command;
hello_cmd.command_name = _mongoc_get_command_name (&handshake_command);
hello_cmd.server_stream = server_stream;
hello_cmd.is_acknowledged = true;

if (!mongoc_cluster_run_command_private (
cluster, &hello_cmd, &reply, error)) {
if (negotiate_sasl_supported_mechs) {
Expand Down Expand Up @@ -1345,9 +1352,17 @@ _mongoc_cluster_auth_node_x509 (mongoc_cluster_t *cluster,
bool
mongoc_cluster_uses_server_api (const mongoc_cluster_t *cluster)
{
BSON_ASSERT_PARAM (cluster);
return mongoc_client_uses_server_api (cluster->client);
}

bool
mongoc_cluster_uses_loadbalanced (const mongoc_cluster_t *cluster)
{
BSON_ASSERT_PARAM (cluster);
return mongoc_client_uses_loadbalanced (cluster->client);
}

#ifdef MONGOC_ENABLE_CRYPTO
void
_mongoc_cluster_init_scram (const mongoc_cluster_t *cluster,
Expand Down Expand Up @@ -3287,6 +3302,7 @@ mongoc_cluster_legacy_rpc_sendv_to_server (
GOTO (done);
}

_mongoc_rpc_op_egress_inc (rpc);
if (!_mongoc_stream_writev_full (server_stream->stream,
cluster->iov.data,
cluster->iov.len,
Expand Down Expand Up @@ -3561,11 +3577,14 @@ mongoc_cluster_run_opmsg (mongoc_cluster_t *cluster,
}
}
}

_mongoc_rpc_op_egress_inc (&rpc);
ok = _mongoc_stream_writev_full (server_stream->stream,
(mongoc_iovec_t *) cluster->iov.data,
cluster->iov.len,
cluster->sockettimeoutms,
error);

if (!ok) {
/* add info about the command to writev_full's error message */
RUN_CMD_ERR_DECORATE;
Expand Down
1 change: 1 addition & 0 deletions src/libmongoc/src/mongoc/mongoc-cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ mongoc_cmd_parts_assemble (mongoc_cmd_parts_t *parts,
}

if (mongoc_client_uses_server_api (parts->client) ||
mongoc_client_uses_loadbalanced (parts->client) ||
server_stream->sd->max_wire_version >= WIRE_VERSION_MIN) {
if (!bson_has_field (parts->body, "$db")) {
BSON_APPEND_UTF8 (&parts->extra, "$db", parts->assembled.db_name);
Expand Down
17 changes: 17 additions & 0 deletions src/libmongoc/src/mongoc/mongoc-counters-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,23 @@ enum {
bson_atomic_int64_exchange (counter, 0, bson_memory_order_seq_cst); \
} \
bson_atomic_thread_fence (); \
} \
static BSON_INLINE int32_t mongoc_counter_##ident##_count (void) \
{ \
int32_t _sum = 0; \
uint32_t _i; \
for (_i = 0; _i < _mongoc_get_cpu_count (); _i++) { \
const int64_t *counter = \
&BSON_CONCAT (__mongoc_counter_, ident) \
.cpus[_i] \
.slots[BSON_CONCAT (COUNTER_, ident) % SLOTS_PER_CACHELINE]; \
_sum += bson_atomic_int64_fetch (counter, bson_memory_order_seq_cst); \
} \
return _sum; \
}
#include "mongoc-counters.defs"
#undef COUNTER

#else
/* when counters are disabled, these functions are no-ops */
#define COUNTER(ident, Category, Name, Description) \
Expand All @@ -196,6 +210,9 @@ enum {
} \
static BSON_INLINE void mongoc_counter_##ident##_reset (void) \
{ \
} \
static BSON_INLINE void mongoc_counter_##ident##_count (void) \
{ \
}
#include "mongoc-counters.defs"
#undef COUNTER
Expand Down
2 changes: 2 additions & 0 deletions src/libmongoc/src/mongoc/mongoc-rpc-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ BSON_STATIC_ASSERT2 (sizeof_reply_header,
void
_mongoc_rpc_gather (mongoc_rpc_t *rpc, mongoc_array_t *array);
void
_mongoc_rpc_op_egress_inc (const mongoc_rpc_t *rpc);
void
_mongoc_rpc_swab_to_le (mongoc_rpc_t *rpc);
void
_mongoc_rpc_swab_from_le (mongoc_rpc_t *rpc);
Expand Down
84 changes: 61 additions & 23 deletions src/libmongoc/src/mongoc/mongoc-rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -577,67 +577,44 @@
#undef CHECKSUM_FIELD


/*
*--------------------------------------------------------------------------
*
* _mongoc_rpc_gather --
*
* Takes a (native endian) rpc struct and gathers the buffer.
* Caller should swab to little endian after calling gather.
*
* Gather, swab, compress write.
* Read, scatter, uncompress, swab
*
*--------------------------------------------------------------------------
*/

void
_mongoc_rpc_gather (mongoc_rpc_t *rpc, mongoc_array_t *array)
{
mongoc_counter_op_egress_total_inc ();
switch ((mongoc_opcode_t) rpc->header.opcode) {
case MONGOC_OPCODE_REPLY:
_mongoc_rpc_gather_reply (&rpc->reply, &rpc->header, array);
return;

case MONGOC_OPCODE_MSG:
_mongoc_rpc_gather_msg (&rpc->msg, &rpc->header, array);
mongoc_counter_op_egress_msg_inc ();
return;

case MONGOC_OPCODE_UPDATE:
_mongoc_rpc_gather_update (&rpc->update, &rpc->header, array);
mongoc_counter_op_egress_update_inc ();
return;

case MONGOC_OPCODE_INSERT:
_mongoc_rpc_gather_insert (&rpc->insert, &rpc->header, array);
mongoc_counter_op_egress_insert_inc ();
return;

case MONGOC_OPCODE_QUERY:
_mongoc_rpc_gather_query (&rpc->query, &rpc->header, array);
mongoc_counter_op_egress_query_inc ();
return;

case MONGOC_OPCODE_GET_MORE:
_mongoc_rpc_gather_get_more (&rpc->get_more, &rpc->header, array);
mongoc_counter_op_egress_getmore_inc ();
return;

case MONGOC_OPCODE_DELETE:
_mongoc_rpc_gather_delete (&rpc->delete_, &rpc->header, array);
mongoc_counter_op_egress_delete_inc ();
return;

case MONGOC_OPCODE_KILL_CURSORS:
_mongoc_rpc_gather_kill_cursors (&rpc->kill_cursors, &rpc->header, array);
mongoc_counter_op_egress_killcursors_inc ();
return;

case MONGOC_OPCODE_COMPRESSED:
_mongoc_rpc_gather_compressed (&rpc->compressed, &rpc->header, array);
mongoc_counter_op_egress_compressed_inc ();
return;

default:
Expand All @@ -648,6 +625,67 @@ _mongoc_rpc_gather (mongoc_rpc_t *rpc, mongoc_array_t *array)
}


void
_mongoc_rpc_op_egress_inc (const mongoc_rpc_t *rpc)
{
mongoc_opcode_t opcode =
(mongoc_opcode_t) BSON_UINT32_FROM_LE (rpc->header.opcode);

if (opcode == MONGOC_OPCODE_COMPRESSED) {
mongoc_counter_op_egress_compressed_inc ();
mongoc_counter_op_egress_total_inc ();

opcode = (mongoc_opcode_t) BSON_UINT32_FROM_LE (
rpc->compressed.original_opcode);
}

mongoc_counter_op_egress_total_inc ();

switch (opcode) {
case MONGOC_OPCODE_REPLY:
return;

case MONGOC_OPCODE_MSG:
mongoc_counter_op_egress_msg_inc ();
return;

case MONGOC_OPCODE_UPDATE:
mongoc_counter_op_egress_update_inc ();
return;

case MONGOC_OPCODE_INSERT:
mongoc_counter_op_egress_insert_inc ();
return;

case MONGOC_OPCODE_QUERY:
mongoc_counter_op_egress_query_inc ();
return;

case MONGOC_OPCODE_GET_MORE:
mongoc_counter_op_egress_getmore_inc ();
return;

case MONGOC_OPCODE_DELETE:
mongoc_counter_op_egress_delete_inc ();
return;

case MONGOC_OPCODE_KILL_CURSORS:
mongoc_counter_op_egress_killcursors_inc ();
return;

case MONGOC_OPCODE_COMPRESSED:
MONGOC_WARNING ("Compressed an OP_COMPRESSED message!?");
BSON_ASSERT (false);
return;

default:
MONGOC_WARNING ("Unknown rpc type: 0x%08x", opcode);
BSON_ASSERT (false);
break;
}
}


void
_mongoc_rpc_swab_to_le (mongoc_rpc_t *rpc)
{
Expand Down
Loading