Skip to content

Commit 0fb7eef

Browse files
committed
WIP: fixing connection logic
1 parent 1619456 commit 0fb7eef

File tree

5 files changed

+130
-3
lines changed

5 files changed

+130
-3
lines changed

src/libmongoc/src/mongoc/mongoc-cluster.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2256,6 +2256,7 @@ _mongoc_cluster_stream_for_server (mongoc_cluster_t *cluster,
22562256
* into account.
22572257
*/
22582258

2259+
// LBTODO: do not invalidate the server!
22592260
mongoc_topology_invalidate_server (topology, server_id, err_ptr);
22602261
mongoc_cluster_disconnect_node (cluster, server_id);
22612262
bson_mutex_lock (&topology->mutex);
@@ -2382,13 +2383,15 @@ mongoc_cluster_fetch_stream_single (mongoc_cluster_t *cluster,
23822383
}
23832384
} else {
23842385
if (!reconnect_ok) {
2386+
MONGOC_DEBUG ("mongoc_cluster_fetch_stream_single reconnect_ok==false, erroring");
23852387
stream_not_found (
23862388
topology, server_id, scanner_node->host.host_and_port, error);
23872389
return NULL;
23882390
}
23892391

23902392
/* save the scanner node address in case it is removed during the scan. */
23912393
address = bson_strdup (scanner_node->host.host_and_port);
2394+
MONGOC_DEBUG ("mongoc_cluster_fetch_stream_single is doing a blocking scan");
23922395
_mongoc_topology_do_blocking_scan (topology, error);
23932396
if (error->code) {
23942397
bson_free (address);
@@ -3040,6 +3043,7 @@ mongoc_cluster_check_interval (mongoc_cluster_t *cluster, uint32_t server_id)
30403043
MONGOC_ERROR_STREAM_SOCKET,
30413044
"connection closed");
30423045
mongoc_cluster_disconnect_node (cluster, server_id);
3046+
// LBTODO (error handing)
30433047
mongoc_topology_invalidate_server (topology, server_id, &error);
30443048
return false;
30453049
}
@@ -3066,6 +3070,7 @@ mongoc_cluster_check_interval (mongoc_cluster_t *cluster, uint32_t server_id)
30663070

30673071
if (!r) {
30683072
mongoc_cluster_disconnect_node (cluster, server_id);
3073+
// LBTODO (error handling): do not invalidate the server!
30693074
mongoc_topology_invalidate_server (topology, server_id, &error);
30703075
}
30713076
}

src/libmongoc/src/mongoc/mongoc-topology-private.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,9 @@ _mongoc_topology_get_handshake_cmd (mongoc_topology_t *topology);
226226
void
227227
_mongoc_topology_request_scan (mongoc_topology_t *topology);
228228

229+
// LBTODO: call this when in load balancer mode.
230+
// If a network error occurs on a server, there is no reason to assume
231+
// we should wait for a cooldown period before attempting subsequent connection.
229232
void
230233
_mongoc_topology_bypass_cooldown (mongoc_topology_t *topology);
231234

src/libmongoc/src/mongoc/mongoc-topology-scanner.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,7 @@ _async_success (mongoc_async_cmd_t *acmd,
654654
hello_response, &node->speculative_auth_response);
655655
}
656656

657+
// LBTODO: skip this if load balanced?
657658
/* mongoc_topology_scanner_cb_t takes rtt_msec, not usec */
658659
ts->cb (node->id,
659660
hello_response,
@@ -715,6 +716,7 @@ _async_error_or_timeout (mongoc_async_cmd_t *acmd,
715716

716717
/* call the topology scanner callback. cannot connect to this node.
717718
* callback takes rtt_msec, not usec. */
719+
// LBTODO: skip this if load balanced?
718720
ts->cb (node->id, NULL, duration_usec / 1000, ts->cb_data, error);
719721
} else {
720722
/* there are still more commands left for this node or it succeeded
@@ -978,6 +980,8 @@ mongoc_topology_scanner_node_setup (mongoc_topology_scanner_node_t *node,
978980
_mongoc_topology_scanner_monitor_heartbeat_started (node->ts, &node->host);
979981
start = bson_get_monotonic_time ();
980982

983+
// LBTODO assert that a node already has a stream.
984+
// A load balanced cluster only uses the topology scanner for connection establishment. Not monitoring.
981985
/* if there is already a working stream, push it back to be re-scanned. */
982986
if (node->stream) {
983987
_begin_hello_cmd (node, node->stream, true /* is_setup_done */, NULL, 0);
@@ -1117,6 +1121,7 @@ mongoc_topology_scanner_start (mongoc_topology_scanner_t *ts,
11171121

11181122
DL_FOREACH_SAFE (ts->nodes, node, tmp)
11191123
{
1124+
// LBTODO: Should we skip the cooldown period?
11201125
skip =
11211126
obey_cooldown && mongoc_topology_scanner_node_in_cooldown (node, now);
11221127

src/libmongoc/src/mongoc/mongoc-topology.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,7 +1010,10 @@ mongoc_topology_select_server_id (mongoc_topology_t *topology,
10101010
_mongoc_topology_description_monitor_opening (&topology->description);
10111011

10121012
if (topology->description.type == MONGOC_TOPOLOGY_LOADBALANCED) {
1013+
MONGOC_DEBUG ("mongoc_topology_select_server_id is returning single load balancer, not proceeding with monitoring");
10131014
/* Bypass server selection loop. Always select the only server. */
1015+
// LBTODO: by skipping topology scanning, no connection is created
1016+
// to the load balancer here. It will be created in mongoc_cluster_fetch_stream_singled.
10141017
selected_server = mongoc_topology_description_select (
10151018
&topology->description, optype, read_prefs, local_threshold_ms);
10161019

@@ -1021,6 +1024,7 @@ mongoc_topology_select_server_id (mongoc_topology_t *topology,
10211024
error);
10221025
return 0;
10231026
}
1027+
return selected_server->id;
10241028
}
10251029

10261030
tried_once = false;
@@ -1130,6 +1134,8 @@ mongoc_topology_select_server_id (mongoc_topology_t *topology,
11301134
error);
11311135
return 0;
11321136
}
1137+
bson_mutex_unlock (&topology->mutex);
1138+
return selected_server->id;
11331139
}
11341140

11351141
if (!selected_server) {
@@ -1329,6 +1335,12 @@ _mongoc_topology_update_from_handshake (mongoc_topology_t *topology,
13291335

13301336
bson_mutex_lock (&topology->mutex);
13311337

1338+
if (topology->description.type == MONGOC_TOPOLOGY_LOADBALANCED) {
1339+
/* Do not update the topology from a connection handshake. */
1340+
bson_mutex_unlock (&topology->mutex);
1341+
return true;
1342+
}
1343+
13321344
/* return false if server was removed from topology */
13331345
has_server = _mongoc_topology_update_no_lock (sd->id,
13341346
&sd->last_hello_response,

src/libmongoc/tests/test-mongoc-loadbalanced.c

Lines changed: 105 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
#include "TestSuite.h"
2222

2323
static char *
24-
loadbalanced_uri (void)
24+
loadbalanced_single_uri (void)
2525
{
2626
/* TODO (CDRIVER-4062): This will need to add TLS and auth to the URI when
2727
* run in evergreen. */
@@ -33,7 +33,7 @@ test_loadbalanced_sessions_supported (void *unused)
3333
{
3434
mongoc_client_t *client;
3535
mongoc_client_session_t *session;
36-
char *uristr = loadbalanced_uri ();
36+
char *uristr = loadbalanced_single_uri ();
3737
bson_error_t error;
3838

3939
client = mongoc_client_new (uristr);
@@ -50,7 +50,7 @@ test_loadbalanced_sessions_do_not_expire (void *unused)
5050
{
5151
mongoc_client_t *client;
5252
mongoc_client_session_t *session;
53-
char *uristr = loadbalanced_uri ();
53+
char *uristr = loadbalanced_single_uri ();
5454
bson_error_t error;
5555
bson_t *lsid1;
5656
bson_t *lsid2;
@@ -79,6 +79,102 @@ test_loadbalanced_sessions_do_not_expire (void *unused)
7979
mongoc_client_destroy (client);
8080
}
8181

82+
typedef struct {
83+
int topology_changed;
84+
int server_changed;
85+
int heartbeat_started;
86+
int heartbeat_succeeded;
87+
int heartbeat_failed;
88+
} sdam_event_counters_t;
89+
90+
static void server_changed(mongoc_apm_server_changed_t *event) {
91+
sdam_event_counters_t *counters = (sdam_event_counters_t*) mongoc_apm_server_changed_get_context (event);
92+
counters->server_changed++;
93+
}
94+
static void topology_changed(mongoc_apm_topology_changed_t *event) {
95+
sdam_event_counters_t *counters = (sdam_event_counters_t*) mongoc_apm_topology_changed_get_context (event);
96+
counters->topology_changed++;
97+
}
98+
static void heartbeat_started(mongoc_apm_server_heartbeat_started_t *event) {
99+
sdam_event_counters_t *counters = (sdam_event_counters_t*) mongoc_apm_server_heartbeat_started_get_context (event);
100+
counters->heartbeat_started++;
101+
}
102+
static void heartbeat_failed(mongoc_apm_server_heartbeat_failed_t *event) {
103+
sdam_event_counters_t *counters = (sdam_event_counters_t*) mongoc_apm_server_heartbeat_failed_get_context (event);
104+
counters->heartbeat_failed++;
105+
}
106+
static void heartbeat_succeeded(mongoc_apm_server_heartbeat_succeeded_t *event) {
107+
sdam_event_counters_t *counters = (sdam_event_counters_t*) mongoc_apm_server_heartbeat_succeeded_get_context (event);
108+
counters->heartbeat_succeeded++;
109+
}
110+
111+
static mongoc_apm_callbacks_t * make_callbacks (void) {
112+
mongoc_apm_callbacks_t *callbacks;
113+
114+
callbacks = mongoc_apm_callbacks_new ();
115+
mongoc_apm_set_server_changed_cb (callbacks, server_changed);
116+
mongoc_apm_set_topology_changed_cb (callbacks, topology_changed);
117+
mongoc_apm_set_server_heartbeat_started_cb (callbacks, heartbeat_started);
118+
mongoc_apm_set_server_heartbeat_failed_cb (callbacks, heartbeat_failed);
119+
mongoc_apm_set_server_heartbeat_succeeded_cb (callbacks, heartbeat_succeeded);
120+
return callbacks;
121+
}
122+
123+
/* Test that connection to a load balanced cluster is possible, and the topology does not change after a connection is created. */
124+
static void
125+
test_loadbalanced_ping_single (void *unused) {
126+
mongoc_client_t *client;
127+
mongoc_client_session_t *session;
128+
char *uristr = loadbalanced_single_uri ();
129+
bson_error_t error;
130+
mongoc_server_description_t *sd;
131+
mongoc_apm_callbacks_t *callbacks;
132+
sdam_event_counters_t counters = {0};
133+
bool ret;
134+
135+
client = mongoc_client_new (uristr);
136+
callbacks = make_callbacks();
137+
mongoc_client_set_apm_callbacks (client, callbacks, &counters);
138+
139+
sd = mongoc_client_select_server (client, true, NULL /* read prefs */, &error);
140+
ASSERT_OR_PRINT (sd != NULL, error);
141+
142+
/* There are two topology changed events:
143+
* - The initial changed event
144+
* - The server transition of Unknown => LoadBalancer */
145+
ASSERT_CMPINT (counters.topology_changed, ==, 2);
146+
ASSERT_CMPINT (counters.server_changed, ==, 1);
147+
/* Selecting a server does NOT open a new connection in load balanced mode. */
148+
ASSERT_CMPINT (counters.heartbeat_started, ==, 0);
149+
150+
// LBTODO: this is where I left off.
151+
// The server description is updated by monitoring and includes the wire version.
152+
// If we skip monitoring and force the server description, how do we update the wire version from the handshake?
153+
/* The first operation requiring I/O opens a connection. */
154+
ret = mongoc_client_command_simple (client, "admin", tmp_bson ("{'ping': 1}"), NULL /* read prefs */, NULL /* reply */, &error);
155+
ASSERT_OR_PRINT (ret, error);
156+
157+
/* No topology changes should have occurred */
158+
ASSERT_CMPINT (counters.topology_changed, ==, 2);
159+
ASSERT_CMPINT (counters.server_changed, ==, 1);
160+
/* The new connection should have produced a heartbeat event. */
161+
ASSERT_CMPINT (counters.heartbeat_started, ==, 1);
162+
ASSERT_CMPINT (counters.heartbeat_succeeded, ==, 1);
163+
164+
bson_free (uristr);
165+
mongoc_server_description_destroy (sd);
166+
mongoc_apm_callbacks_destroy (callbacks);
167+
mongoc_client_destroy (client);
168+
}
169+
170+
/* Test that the handshake reply does not update the topology in load balanced mode. */
171+
static void
172+
test_loadbalanced_handshake_does_not_update (void *unused);
173+
174+
/* Test that re-establishing a connection after a network error does not wait for the cooldown period in single threaded mode. */
175+
static void
176+
test_loadbalanced_no_cooldown_on_reconnect (void *unused);
177+
82178
static int
83179
skip_if_not_loadbalanced (void)
84180
{
@@ -104,4 +200,10 @@ test_loadbalanced_install (TestSuite *suite)
104200
NULL /* ctx */,
105201
NULL /* dtor */,
106202
skip_if_not_loadbalanced);
203+
TestSuite_AddFull (suite,
204+
"/loadbalanced/ping/single",
205+
test_loadbalanced_ping_single,
206+
NULL /* ctx */,
207+
NULL /* dtor */,
208+
skip_if_not_loadbalanced);
107209
}

0 commit comments

Comments
 (0)