17
17
18
18
#include "mongoc.h"
19
19
#include "mongoc-apm-private.h"
20
+ #include "mongoc-array-private.h"
20
21
#include "mongoc-counters-private.h"
21
22
#include "mongoc-client-pool-private.h"
22
23
#include "mongoc-client-pool.h"
@@ -52,6 +53,8 @@ struct _mongoc_client_pool_t {
52
53
bool error_api_set ;
53
54
mongoc_server_api_t * api ;
54
55
bool client_initialized ;
56
+ // `last_known_serverids` is a sorted array of uint32_t.
57
+ mongoc_array_t last_known_serverids ;
55
58
};
56
59
57
60
@@ -146,6 +149,7 @@ mongoc_client_pool_new_with_error (const mongoc_uri_t *uri, bson_error_t *error)
146
149
}
147
150
148
151
pool = (mongoc_client_pool_t * ) bson_malloc0 (sizeof * pool );
152
+ _mongoc_array_init (& pool -> last_known_serverids , sizeof (uint32_t ));
149
153
bson_mutex_init (& pool -> mutex );
150
154
mongoc_cond_init (& pool -> cond );
151
155
_mongoc_queue_init (& pool -> queue );
@@ -229,6 +233,8 @@ mongoc_client_pool_destroy (mongoc_client_pool_t *pool)
229
233
_mongoc_ssl_opts_cleanup (& pool -> ssl_opts , true);
230
234
#endif
231
235
236
+ _mongoc_array_destroy (& pool -> last_known_serverids );
237
+
232
238
bson_free (pool );
233
239
234
240
mongoc_counter_client_pools_active_dec ();
@@ -357,6 +363,53 @@ mongoc_client_pool_try_pop (mongoc_client_pool_t *pool)
357
363
RETURN (client );
358
364
}
359
365
366
+ typedef struct {
367
+ mongoc_array_t * known_server_ids ;
368
+ mongoc_cluster_t * cluster ;
369
+ } prune_ctx ;
370
+
371
+ static int
372
+ server_id_cmp (const void * a_ , const void * b_ )
373
+ {
374
+ const uint32_t * const a = (const uint32_t * ) a_ ;
375
+ const uint32_t * const b = (const uint32_t * ) b_ ;
376
+
377
+ if (* a == * b ) {
378
+ return 0 ;
379
+ }
380
+
381
+ return * a < * b ? -1 : 1 ;
382
+ }
383
+
384
+ // `maybe_prune` removes a `mongoc_cluster_node_t` if the node refers to a removed server.
385
+ static bool
386
+ maybe_prune (void * item , void * ctx_ )
387
+ {
388
+ mongoc_cluster_node_t * cn = (mongoc_cluster_node_t * ) item ;
389
+ prune_ctx * ctx = (prune_ctx * ) ctx_ ;
390
+ // Get the server ID from the cluster node.
391
+ uint32_t server_id = cn -> handshake_sd -> id ;
392
+
393
+ // Check if the cluster node's server ID references a removed server.
394
+ if (!bsearch (
395
+ & server_id , ctx -> known_server_ids -> data , ctx -> known_server_ids -> len , sizeof (uint32_t ), server_id_cmp )) {
396
+ mongoc_cluster_disconnect_node (ctx -> cluster , server_id );
397
+ }
398
+ return true;
399
+ }
400
+
401
+ // `prune_client` closes connections from `client` to servers not contained in `known_server_ids`.
402
+ static void
403
+ prune_client (mongoc_client_t * client , mongoc_array_t * known_server_ids )
404
+ {
405
+ BSON_ASSERT_PARAM (client );
406
+ BSON_ASSERT_PARAM (known_server_ids );
407
+
408
+ mongoc_cluster_t * cluster = & client -> cluster ;
409
+ prune_ctx ctx = {.cluster = cluster , .known_server_ids = known_server_ids };
410
+ mongoc_set_for_each (cluster -> nodes , maybe_prune , & ctx );
411
+ }
412
+
360
413
361
414
void
362
415
mongoc_client_pool_push (mongoc_client_pool_t * pool , mongoc_client_t * client )
@@ -370,6 +423,48 @@ mongoc_client_pool_push (mongoc_client_pool_t *pool, mongoc_client_t *client)
370
423
mongoc_cluster_reset_sockettimeoutms (& client -> cluster );
371
424
372
425
bson_mutex_lock (& pool -> mutex );
426
+ // Check if `last_known_server_ids` needs update.
427
+ bool serverids_have_changed = false;
428
+ {
429
+ mongoc_array_t current_serverids ;
430
+ _mongoc_array_init (& current_serverids , sizeof (uint32_t ));
431
+
432
+ {
433
+ mc_shared_tpld td = mc_tpld_take_ref (pool -> topology );
434
+ const mongoc_set_t * servers = mc_tpld_servers_const (td .ptr );
435
+ for (size_t i = 0 ; i < servers -> items_len ; i ++ ) {
436
+ _mongoc_array_append_val (& current_serverids , servers -> items [i ].id );
437
+ }
438
+ mc_tpld_drop_ref (& td );
439
+ }
440
+
441
+ serverids_have_changed = (current_serverids .len != pool -> last_known_serverids .len ) ||
442
+ memcmp (current_serverids .data ,
443
+ pool -> last_known_serverids .data ,
444
+ current_serverids .len * current_serverids .element_size ) != 0 ;
445
+
446
+ if (serverids_have_changed ) {
447
+ _mongoc_array_destroy (& pool -> last_known_serverids );
448
+ pool -> last_known_serverids = current_serverids ; // Ownership transfer.
449
+ } else {
450
+ _mongoc_array_destroy (& current_serverids );
451
+ }
452
+ }
453
+
454
+ // Check if pooled clients need to be pruned.
455
+ if (serverids_have_changed ) {
456
+ // The set of last known server IDs has changed. Prune all clients in pool.
457
+ mongoc_queue_item_t * ptr = pool -> queue .head ;
458
+ while (ptr != NULL ) {
459
+ prune_client ((mongoc_client_t * ) ptr -> data , & pool -> last_known_serverids );
460
+ ptr = ptr -> next ;
461
+ }
462
+ }
463
+
464
+ // Always prune incoming client. The topology may have changed while client was checked out.
465
+ prune_client (client , & pool -> last_known_serverids );
466
+
467
+ // Push client back into pool.
373
468
_mongoc_queue_push_head (& pool -> queue , client );
374
469
375
470
if (pool -> min_pool_size && _mongoc_queue_get_length (& pool -> queue ) > pool -> min_pool_size ) {
0 commit comments