Skip to content

Commit eece2a7

Browse files
committed
Merge pull request #730 from mongodb/finalize-monitor
Stop monitor on server garbage collection.
2 parents 62c46ce + dba773b commit eece2a7

File tree

7 files changed

+115
-61
lines changed

7 files changed

+115
-61
lines changed

lib/mongo/address.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,20 @@ def ==(other)
6363
host == other.host && port == other.port
6464
end
6565

66+
# Check equality for hashing.
67+
#
68+
# @example Check hashing equality.
69+
# address.eql?(other)
70+
#
71+
# @param [ Object ] other The other object.
72+
#
73+
# @return [ true, false ] If the objects are equal.
74+
#
75+
# @since 2.2.0
76+
def eql?(other)
77+
self == other
78+
end
79+
6680
# Calculate the hash value for the address.
6781
#
6882
# @example Calculate the hash value.

lib/mongo/cluster.rb

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,34 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
106106
@options = options.freeze
107107
@topology = Topology.initial(seeds, options)
108108
@update_lock = Mutex.new
109+
@pool_lock = Mutex.new
109110

110111
subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self))
111112
subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self))
112113
subscribe_to(Event::PRIMARY_ELECTED, Event::PrimaryElected.new(self))
113114

114115
seeds.each{ |seed| add(seed) }
116+
ObjectSpace.define_finalizer(self, self.class.finalize(pools))
117+
end
118+
119+
# Finalize the cluster for garbage collection. Disconnects all the scoped
120+
# connection pools.
121+
#
122+
# @example Finalize the cluster.
123+
# Cluster.finalize(pools)
124+
#
125+
# @param [ Hash<Address, Server::ConnectionPool> ] pools The connection
126+
# pools.
127+
#
128+
# @return [ Proc ] The Finalizer.
129+
#
130+
# @since 2.2.0
131+
def self.finalize(pools)
132+
proc do
133+
pools.values.each do |pool|
134+
pool.disconnect!
135+
end
136+
end
115137
end
116138

117139
# Get the nicer formatted string for use in inspection.
@@ -168,6 +190,22 @@ def max_read_retries
168190
options[:max_read_retries] || MAX_READ_RETRIES
169191
end
170192

193+
# Get the scoped connection pool for the server.
194+
#
195+
# @example Get the connection pool.
196+
# cluster.pool(server)
197+
#
198+
# @param [ Server ] server The server.
199+
#
200+
# @return [ Server::ConnectionPool ] The connection pool.
201+
#
202+
# @since 2.2.0
203+
def pool(server)
204+
@pool_lock.synchronize do
205+
pools[server.address] ||= Server::ConnectionPool.get(server)
206+
end
207+
end
208+
171209
# Get the interval, in seconds, in which a mongos read operation is
172210
# retried.
173211
#
@@ -339,6 +377,10 @@ def addition_allowed?(address)
339377
!@topology.single? || direct_connection?(address)
340378
end
341379

380+
def pools
381+
@pools ||= {}
382+
end
383+
342384
def servers_list
343385
@update_lock.synchronize do
344386
@servers.reduce([]) do |servers, server|

lib/mongo/server.rb

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,19 @@ def disconnect!
120120
monitor.stop! and true
121121
end
122122

123+
# When the server is flagged for garbage collection, stop the monitor
124+
# thread.
125+
#
126+
# @example Finalize the object.
127+
# Server.finalize(monitor)
128+
#
129+
# @param [ Server::Monitor ] monitor The server monitor.
130+
#
131+
# @since 2.2.0
132+
def self.finalize(monitor)
133+
proc { monitor.stop! }
134+
end
135+
123136
# Instantiate a new server object. Will start the background refresh and
124137
# subscribe to the appropriate events.
125138
#
@@ -145,6 +158,7 @@ def initialize(address, cluster, monitoring, event_listeners, options = {})
145158
@monitor = Monitor.new(address, event_listeners, options)
146159
monitor.scan!
147160
monitor.run!
161+
ObjectSpace.define_finalizer(self, self.class.finalize(monitor))
148162
end
149163

150164
# Get a pretty printed server inspection.
@@ -168,7 +182,7 @@ def inspect
168182
#
169183
# @since 2.0.0
170184
def pool
171-
@pool ||= ConnectionPool.get(self)
185+
@pool ||= cluster.pool(self)
172186
end
173187

174188
# Determine if the provided tags are a subset of the server's tags.

lib/mongo/server/connection_pool.rb

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@ class Server
2323
class ConnectionPool
2424
include Loggable
2525

26-
# Used for synchronization of pools access.
27-
MUTEX = Mutex.new
28-
2926
# @return [ Hash ] options The pool options.
3027
attr_reader :options
3128

@@ -131,22 +128,10 @@ class << self
131128
#
132129
# @since 2.0.0
133130
def get(server)
134-
MUTEX.synchronize do
135-
pools[server.address] ||= create_pool(server)
136-
end
137-
end
138-
139-
private
140-
141-
def create_pool(server)
142131
ConnectionPool.new(server.options) do
143132
Connection.new(server, server.options)
144133
end
145134
end
146-
147-
def pools
148-
@pools ||= {}
149-
end
150135
end
151136
end
152137
end

spec/mongo/server/connection_pool_spec.rb

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,22 @@
1818
Mongo::Event::Listeners.new
1919
end
2020

21+
let(:cluster) do
22+
double('cluster')
23+
end
24+
2125
describe '#checkin' do
2226

2327
let(:server) do
24-
Mongo::Server.new(address, double('cluster'), monitoring, listeners, options)
28+
Mongo::Server.new(address, cluster, monitoring, listeners, options)
2529
end
2630

2731
let!(:pool) do
2832
described_class.get(server)
2933
end
3034

3135
after do
36+
expect(cluster).to receive(:pool).with(server).and_return(pool)
3237
server.disconnect!
3338
end
3439

@@ -55,7 +60,7 @@
5560
describe '#checkout' do
5661

5762
let(:server) do
58-
Mongo::Server.new(address, double('cluster'), monitoring, listeners, options)
63+
Mongo::Server.new(address, cluster, monitoring, listeners, options)
5964
end
6065

6166
let!(:pool) do
@@ -103,53 +108,52 @@
103108
describe '#disconnect!' do
104109

105110
let(:server) do
106-
Mongo::Server.new(address, double('cluster'), monitoring, listeners, options)
111+
Mongo::Server.new(address, cluster, monitoring, listeners, options)
107112
end
108113

109114
let!(:pool) do
110115
described_class.get(server)
111116
end
112117

113-
after do
114-
server.disconnect!
115-
end
116-
117118
it 'disconnects the queue' do
118-
expect(pool.send(:queue)).to receive(:disconnect!).twice.and_call_original
119-
pool.disconnect!
119+
expect(cluster).to receive(:pool).with(server).and_return(pool)
120+
expect(pool.send(:queue)).to receive(:disconnect!).once.and_call_original
121+
server.disconnect!
120122
end
121123
end
122124

123125
describe '.get' do
124126

125127
let(:server) do
126-
Mongo::Server.new(address, double('cluster'), monitoring, listeners, options)
128+
Mongo::Server.new(address, cluster, monitoring, listeners, options)
127129
end
128130

129131
let!(:pool) do
130132
described_class.get(server)
131133
end
132134

133135
after do
136+
expect(cluster).to receive(:pool).with(server).and_return(pool)
134137
server.disconnect!
135138
end
136139

137140
it 'returns the pool for the server' do
138-
expect(pool).to eql(described_class.get(server))
141+
expect(pool).to_not be_nil
139142
end
140143
end
141144

142145
describe '#inspect' do
143146

144147
let(:server) do
145-
Mongo::Server.new(address, double('cluster'), monitoring, listeners, options)
148+
Mongo::Server.new(address, cluster, monitoring, listeners, options)
146149
end
147150

148151
let!(:pool) do
149152
described_class.get(server)
150153
end
151154

152155
after do
156+
expect(cluster).to receive(:pool).with(server).and_return(pool)
153157
server.disconnect!
154158
end
155159

@@ -165,7 +169,7 @@
165169
describe '#with_connection' do
166170

167171
let(:server) do
168-
Mongo::Server.new(address, double('cluster'), monitoring, listeners, options)
172+
Mongo::Server.new(address, cluster, monitoring, listeners, options)
169173
end
170174

171175
let!(:pool) do

spec/mongo/server/connection_spec.rb

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,35 +14,34 @@
1414
Mongo::Event::Listeners.new
1515
end
1616

17+
let(:cluster) do
18+
double('cluster')
19+
end
20+
1721
let(:server) do
18-
Mongo::Server.new(address, double('cluster'), monitoring, listeners, TEST_OPTIONS)
22+
Mongo::Server.new(address, cluster, monitoring, listeners, TEST_OPTIONS)
23+
end
24+
25+
let(:pool) do
26+
double('pool')
1927
end
2028

2129
after do
30+
expect(cluster).to receive(:pool).with(server).and_return(pool)
31+
expect(pool).to receive(:disconnect!).and_return(true)
2232
server.disconnect!
2333
end
2434

2535
describe '#connectable?' do
2636

27-
# context 'when the connection is connectable' do
28-
29-
# let(:connection) do
30-
# described_class.new(server)
31-
# end
32-
33-
# it 'returns true' do
34-
# expect(connection).to be_connectable
35-
# end
36-
# end
37-
3837
context 'when the connection is not connectable' do
3938

4039
let(:bad_address) do
4140
Mongo::Address.new('127.0.0.1:666')
4241
end
4342

4443
let(:bad_server) do
45-
Mongo::Server.new(bad_address, double('cluster'), monitoring, listeners, TEST_OPTIONS)
44+
Mongo::Server.new(bad_address, cluster, monitoring, listeners, TEST_OPTIONS)
4645
end
4746

4847
let(:connection) do

0 commit comments

Comments
 (0)