Skip to content

CDRIVER-3997 Use "hello" command for monitoring if supported #797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct _mongoc_server_description_t {
int64_t last_update_time_usec;
bson_t last_hello_response;
bool has_hello_response;
bool hello_ok;
const char *connection_address;
/* SDAM dictates storing me/hosts/passives/arbiters after being "normalized
* to lower-case" Instead, they are stored in the casing they are received,
Expand Down
5 changes: 5 additions & 0 deletions src/libmongoc/src/mongoc/mongoc-server-description.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ mongoc_server_description_reset (mongoc_server_description_t *sd)
sd->max_write_batch_size = MONGOC_DEFAULT_WRITE_BATCH_SIZE;
sd->session_timeout_minutes = MONGOC_NO_SESSIONS;
sd->last_write_date_ms = -1;
sd->hello_ok = false;

/* always leave last hello in an init-ed state until we destroy sd */
bson_destroy (&sd->last_hello_response);
Expand Down Expand Up @@ -592,6 +593,10 @@ mongoc_server_description_handle_hello (mongoc_server_description_t *sd,
if (!BSON_ITER_HOLDS_BOOL (&iter))
goto failure;
is_primary = bson_iter_bool (&iter);
} else if (strcmp ("helloOk", bson_iter_key (&iter)) == 0) {
if (!BSON_ITER_HOLDS_BOOL (&iter))
goto failure;
sd->hello_ok = bson_iter_bool (&iter);
} else if (strcmp ("me", bson_iter_key (&iter)) == 0) {
if (!BSON_ITER_HOLDS_UTF8 (&iter))
goto failure;
Expand Down
34 changes: 22 additions & 12 deletions src/libmongoc/src/mongoc/mongoc-server-monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,15 +322,16 @@ _server_monitor_send_and_recv_opquery (mongoc_server_monitor_t *server_monitor,

static bool
_server_monitor_polling_hello (mongoc_server_monitor_t *server_monitor,
bool hello_ok,
bson_t *hello_response,
bson_error_t *error)
{
bson_t cmd;
const bson_t *hello;
bool ret;

hello = _mongoc_topology_scanner_get_hello_cmd (
server_monitor->topology->scanner);
hello = _mongoc_topology_scanner_get_monitoring_cmd (
server_monitor->topology->scanner, hello_ok);
bson_copy_to (hello, &cmd);

_server_monitor_append_cluster_time (server_monitor, &cmd);
Expand Down Expand Up @@ -600,7 +601,7 @@ _server_monitor_awaitable_hello_recv (mongoc_server_monitor_t *server_monitor,
*/
static bool
_server_monitor_awaitable_hello (mongoc_server_monitor_t *server_monitor,
const bson_t *topology_version,
const mongoc_server_description_t *description,
bson_t *hello_response,
bool *cancelled,
bson_error_t *error)
Expand All @@ -609,12 +610,12 @@ _server_monitor_awaitable_hello (mongoc_server_monitor_t *server_monitor,
const bson_t *hello;
bool ret = false;

hello = _mongoc_topology_scanner_get_hello_cmd (
server_monitor->topology->scanner);
hello = _mongoc_topology_scanner_get_monitoring_cmd (
server_monitor->topology->scanner, description->hello_ok);
bson_copy_to (hello, &cmd);

_server_monitor_append_cluster_time (server_monitor, &cmd);
bson_append_document (&cmd, "topologyVersion", 15, topology_version);
bson_append_document (&cmd, "topologyVersion", 15, &description->topology_version);
bson_append_int32 (
&cmd, "maxAwaitTimeMS", 14, server_monitor->heartbeat_frequency_ms);
bson_append_utf8 (&cmd, "$db", 3, "admin", 5);
Expand Down Expand Up @@ -861,7 +862,7 @@ mongoc_server_monitor_check_server (
MONITOR_LOG (server_monitor, "awaitable hello");
ret = _server_monitor_awaitable_hello (
server_monitor,
&previous_description->topology_version,
previous_description,
&hello_response,
cancelled,
&error);
Expand All @@ -872,7 +873,7 @@ mongoc_server_monitor_check_server (
awaited = false;
_server_monitor_heartbeat_started (server_monitor, awaited);
ret =
_server_monitor_polling_hello (server_monitor, &hello_response, &error);
_server_monitor_polling_hello (server_monitor, previous_description->hello_ok, &hello_response, &error);

exit:
duration_us = _now_us () - start_us;
Expand Down Expand Up @@ -1092,6 +1093,7 @@ static BSON_THREAD_FUN (_server_monitor_thread, server_monitor_void)

static bool
_server_monitor_ping_server (mongoc_server_monitor_t *server_monitor,
bool hello_ok,
int64_t *rtt_ms)
{
bool ret = false;
Expand All @@ -1111,7 +1113,7 @@ _server_monitor_ping_server (mongoc_server_monitor_t *server_monitor,
if (server_monitor->stream) {
MONITOR_LOG (server_monitor, "rtt polling hello");
ret = _server_monitor_polling_hello (
server_monitor, &hello_response, &error);
server_monitor, hello_ok, &hello_response, &error);
if (ret) {
*rtt_ms = (_now_us () - start_us) / 1000;
}
Expand All @@ -1127,12 +1129,14 @@ _server_monitor_ping_server (mongoc_server_monitor_t *server_monitor,
static BSON_THREAD_FUN (_server_monitor_rtt_thread, server_monitor_void)
{
mongoc_server_monitor_t *server_monitor;
mongoc_server_description_t *sd;

server_monitor = (mongoc_server_monitor_t *) server_monitor_void;

while (true) {
int64_t rtt_ms;
bson_error_t error;
bool hello_ok;

bson_mutex_lock (&server_monitor->shared.mutex);
if (server_monitor->shared.state != MONGOC_THREAD_RUNNING) {
Expand All @@ -1141,10 +1145,16 @@ static BSON_THREAD_FUN (_server_monitor_rtt_thread, server_monitor_void)
}
bson_mutex_unlock (&server_monitor->shared.mutex);

_server_monitor_ping_server (server_monitor, &rtt_ms);
if (rtt_ms != MONGOC_RTT_UNSET) {
mongoc_server_description_t *sd;
bson_mutex_lock (&server_monitor->topology->mutex);
sd = mongoc_topology_description_server_by_id (
&server_monitor->topology->description,
server_monitor->description->id,
&error);
hello_ok = sd ? sd->hello_ok : false;
bson_mutex_unlock (&server_monitor->topology->mutex);

_server_monitor_ping_server (server_monitor, hello_ok, &rtt_ms);
if (rtt_ms != MONGOC_RTT_UNSET) {
bson_mutex_lock (&server_monitor->topology->mutex);
sd = mongoc_topology_description_server_by_id (
&server_monitor->topology->description,
Expand Down
10 changes: 7 additions & 3 deletions src/libmongoc/src/mongoc/mongoc-topology-scanner-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef struct mongoc_topology_scanner_node {
int64_t last_used;
int64_t last_failed;
bool has_auth;
bool hello_ok;
mongoc_host_list_t host;
struct mongoc_topology_scanner *ts;

Expand Down Expand Up @@ -84,7 +85,8 @@ typedef struct mongoc_topology_scanner {
int64_t connect_timeout_msec;
mongoc_topology_scanner_node_t *nodes;
bson_t hello_cmd;
bson_t hello_cmd_with_handshake;
bson_t legacy_hello_cmd;
bson_t handshake_cmd;
bson_t cluster_time;
bool handshake_ok_to_send;
const char *appname;
Expand Down Expand Up @@ -130,7 +132,8 @@ mongoc_topology_scanner_valid (mongoc_topology_scanner_t *ts);
void
mongoc_topology_scanner_add (mongoc_topology_scanner_t *ts,
const mongoc_host_list_t *host,
uint32_t id);
uint32_t id,
bool hello_ok);

void
mongoc_topology_scanner_scan (mongoc_topology_scanner_t *ts, uint32_t id);
Expand Down Expand Up @@ -194,7 +197,8 @@ _mongoc_topology_scanner_get_speculative_auth_mechanism (
const mongoc_uri_t *uri);

const bson_t *
_mongoc_topology_scanner_get_hello_cmd (mongoc_topology_scanner_t *ts);
_mongoc_topology_scanner_get_monitoring_cmd (mongoc_topology_scanner_t *ts,
bool hello_ok);

const bson_t *
_mongoc_topology_scanner_get_handshake_cmd (mongoc_topology_scanner_t *ts);
Expand Down
51 changes: 30 additions & 21 deletions src/libmongoc/src/mongoc/mongoc-topology-scanner.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,25 @@ _jumpstart_other_acmds (mongoc_topology_scanner_node_t *node,
static void
_add_hello (mongoc_topology_scanner_t *ts)
{
bson_t *cmd = &ts->hello_cmd;
mongoc_server_api_t *api = ts->api;

BSON_APPEND_INT32 (&ts->hello_cmd, "hello", 1);
BSON_APPEND_BOOL (&ts->hello_cmd, "helloOk", true);

BSON_APPEND_INT32 (&ts->legacy_hello_cmd, HANDSHAKE_CMD_LEGACY_HELLO, 1);
BSON_APPEND_BOOL (&ts->legacy_hello_cmd, "helloOk", true);

if (api) {
BSON_APPEND_INT32 (cmd, "hello", 1);
_mongoc_cmd_append_server_api (cmd, api);
} else {
BSON_APPEND_INT32 (cmd, HANDSHAKE_CMD_LEGACY_HELLO, 1);
_mongoc_cmd_append_server_api (&ts->hello_cmd, api);
}
}

static void
_init_hello (mongoc_topology_scanner_t *ts)
{
bson_init (&ts->hello_cmd);
bson_init (&ts->hello_cmd_with_handshake);
bson_init (&ts->legacy_hello_cmd);
bson_init (&ts->handshake_cmd);
bson_init (&ts->cluster_time);

_add_hello (ts);
Expand All @@ -131,7 +134,8 @@ static void
_reset_hello (mongoc_topology_scanner_t *ts)
{
bson_reinit (&ts->hello_cmd);
bson_reinit (&ts->hello_cmd_with_handshake);
bson_reinit (&ts->legacy_hello_cmd);
bson_reinit (&ts->handshake_cmd);

_add_hello (ts);
}
Expand Down Expand Up @@ -241,9 +245,9 @@ _mongoc_topology_scanner_parse_speculative_authentication (
}

static bool
_build_hello_with_handshake (mongoc_topology_scanner_t *ts)
_build_handshake_cmd (mongoc_topology_scanner_t *ts)
{
bson_t *doc = &ts->hello_cmd_with_handshake;
bson_t *doc = &ts->handshake_cmd;
bson_t subdoc;
bson_iter_t iter;
const char *key;
Expand All @@ -254,7 +258,7 @@ _build_hello_with_handshake (mongoc_topology_scanner_t *ts)
char buf[16];

bson_destroy (doc);
bson_copy_to (&ts->hello_cmd, doc);
bson_copy_to (ts->api ? &ts->hello_cmd : &ts->legacy_hello_cmd, doc);

BSON_APPEND_DOCUMENT_BEGIN (doc, HANDSHAKE_FIELD, &subdoc);
res = _mongoc_handshake_build_doc_with_application (&subdoc, ts->appname);
Expand All @@ -279,12 +283,13 @@ _build_hello_with_handshake (mongoc_topology_scanner_t *ts)
}

const bson_t *
_mongoc_topology_scanner_get_hello_cmd (mongoc_topology_scanner_t *ts)
_mongoc_topology_scanner_get_monitoring_cmd (mongoc_topology_scanner_t *ts,
bool hello_ok)
{
return &ts->hello_cmd;
return hello_ok || ts->api ? &ts->hello_cmd : &ts->legacy_hello_cmd;
}

/* Caller must lock topology->mutex to protect hello_cmd_with_handshake. This
/* Caller must lock topology->mutex to protect handshake_cmd. This
* is called at the start of the scan in _mongoc_topology_run_background, when a
* node is added in _mongoc_topology_reconcile_add_nodes, or when running a
* hello directly on a node in _mongoc_stream_run_hello. */
Expand All @@ -293,19 +298,19 @@ _mongoc_topology_scanner_get_handshake_cmd (mongoc_topology_scanner_t *ts)
{
/* If this is the first time using the node or if it's the first time
* using it after a failure, build handshake doc */
if (bson_empty (&ts->hello_cmd_with_handshake)) {
ts->handshake_ok_to_send = _build_hello_with_handshake (ts);
if (bson_empty (&ts->handshake_cmd)) {
ts->handshake_ok_to_send = _build_handshake_cmd (ts);
if (!ts->handshake_ok_to_send) {
MONGOC_WARNING ("Handshake doc too big, not including in hello");
}
}

/* If the doc turned out to be too big */
if (!ts->handshake_ok_to_send) {
return &ts->hello_cmd;
return ts->api ? &ts->hello_cmd : &ts->legacy_hello_cmd;
}

return &ts->hello_cmd_with_handshake;
return &ts->handshake_cmd;
}

static void
Expand All @@ -320,7 +325,8 @@ _begin_hello_cmd (mongoc_topology_scanner_node_t *node,

if (node->last_used != -1 && node->last_failed == -1) {
/* The node's been used before and not failed recently */
bson_copy_to (&ts->hello_cmd, &cmd);
bson_copy_to (_mongoc_topology_scanner_get_monitoring_cmd (ts, node->hello_ok),
&cmd);
} else {
bson_copy_to (_mongoc_topology_scanner_get_handshake_cmd (ts), &cmd);
}
Expand Down Expand Up @@ -428,7 +434,8 @@ mongoc_topology_scanner_destroy (mongoc_topology_scanner_t *ts)

mongoc_async_destroy (ts->async);
bson_destroy (&ts->hello_cmd);
bson_destroy (&ts->hello_cmd_with_handshake);
bson_destroy (&ts->legacy_hello_cmd);
bson_destroy (&ts->handshake_cmd);
bson_destroy (&ts->cluster_time);
mongoc_server_api_destroy (ts->api);

Expand All @@ -449,7 +456,8 @@ mongoc_topology_scanner_valid (mongoc_topology_scanner_t *ts)
void
mongoc_topology_scanner_add (mongoc_topology_scanner_t *ts,
const mongoc_host_list_t *host,
uint32_t id)
uint32_t id,
bool hello_ok)
{
mongoc_topology_scanner_node_t *node;

Expand All @@ -461,6 +469,7 @@ mongoc_topology_scanner_add (mongoc_topology_scanner_t *ts,
node->ts = ts;
node->last_failed = -1;
node->last_used = -1;
node->hello_ok = hello_ok;
bson_init (&node->speculative_auth_response);

DL_APPEND (ts->nodes, node);
Expand Down Expand Up @@ -1357,7 +1366,7 @@ _jumpstart_other_acmds (mongoc_topology_scanner_node_t *node,
}
}

/* Caller must lock topology->mutex to protect hello_cmd_with_handshake. */
/* Caller must lock topology->mutex to protect handshake_cmd. */
void
_mongoc_topology_scanner_set_server_api (mongoc_topology_scanner_t *ts,
const mongoc_server_api_t *api)
Expand Down
21 changes: 12 additions & 9 deletions src/libmongoc/src/mongoc/mongoc-topology.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ _mongoc_topology_reconcile_add_nodes (mongoc_server_description_t *sd,
mongoc_topology_t *topology)
{
mongoc_topology_scanner_t *scanner = topology->scanner;
mongoc_topology_scanner_node_t *node;

/* quickly search by id, then check if a node for this host was retired in
* this scan. */
if (!mongoc_topology_scanner_get_node (scanner, sd->id) &&
!mongoc_topology_scanner_has_node_for_host (scanner, &sd->host)) {
mongoc_topology_scanner_add (scanner, &sd->host, sd->id);
/* Search by ID and update hello_ok */
node = mongoc_topology_scanner_get_node (scanner, sd->id);
if (node) {
node->hello_ok = sd->hello_ok;
} else if (!mongoc_topology_scanner_has_node_for_host (scanner, &sd->host)) {
/* A node for this host was retired in this scan. */
mongoc_topology_scanner_add (scanner, &sd->host, sd->id, sd->hello_ok);
mongoc_topology_scanner_scan (scanner, sd->id);
}

Expand Down Expand Up @@ -189,9 +192,9 @@ _mongoc_topology_scanner_cb (uint32_t id,
_mongoc_topology_update_no_lock (
id, hello_response, rtt_msec, topology, error);

/* The processing of the hello results above may have added/removed
* server descriptions. We need to reconcile that with our monitoring
* agents
/* The processing of the hello results above may have added, changed, or
* removed server descriptions. We need to reconcile that with our
* monitoring agents
*/
mongoc_topology_reconcile (topology);

Expand Down Expand Up @@ -442,7 +445,7 @@ mongoc_topology_new (const mongoc_uri_t *uri, bool single_threaded)
while (hl) {
mongoc_topology_description_add_server (
&topology->description, hl->host_and_port, &id);
mongoc_topology_scanner_add (topology->scanner, hl, id);
mongoc_topology_scanner_add (topology->scanner, hl, id, false);

hl = hl->next;
}
Expand Down
2 changes: 1 addition & 1 deletion src/libmongoc/tests/json-test.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include "json-test-monitoring.h"
#include "json-test-operations.h"

#define MAX_NUM_TESTS 100
#define MAX_NUM_TESTS 150

typedef void (*test_hook) (bson_t *test);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"a:27017",
{
"ok": 1,
"ismaster": true,
"isWritablePrimary": true,
"hosts": [
"a:27017"
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"a:27017",
{
"ok": 1,
"ismaster": true,
"isWritablePrimary": true,
"hosts": [
"a:27017"
],
Expand Down
Loading