Skip to content

Stop monitor on server garbage collection. #730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions lib/mongo/address.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,20 @@ def ==(other)
host == other.host && port == other.port
end

# Check equality for hashing.
#
# @example Check hashing equality.
# address.eql?(other)
#
# @param [ Object ] other The other object.
#
# @return [ true, false ] If the objects are equal.
#
# @since 2.2.0
def eql?(other)
self == other
end

# Calculate the hash value for the address.
#
# @example Calculate the hash value.
Expand Down
42 changes: 42 additions & 0 deletions lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,34 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
@options = options.freeze
@topology = Topology.initial(seeds, options)
@update_lock = Mutex.new
@pool_lock = Mutex.new

subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self))
subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self))
subscribe_to(Event::PRIMARY_ELECTED, Event::PrimaryElected.new(self))

seeds.each{ |seed| add(seed) }
ObjectSpace.define_finalizer(self, self.class.finalize(pools))
end

# Finalize the cluster for garbage collection. Disconnects all the scoped
# connection pools.
#
# @example Finalize the cluster.
# Cluster.finalize(pools)
#
# @param [ Hash<Address, Server::ConnectionPool> ] pools The connection
# pools.
#
# @return [ Proc ] The Finalizer.
#
# @since 2.2.0
def self.finalize(pools)
proc do
pools.values.each do |pool|
pool.disconnect!
end
end
end

# Get the nicer formatted string for use in inspection.
Expand Down Expand Up @@ -168,6 +190,22 @@ def max_read_retries
options[:max_read_retries] || MAX_READ_RETRIES
end

# Get the scoped connection pool for the server.
#
# @example Get the connection pool.
# cluster.pool(server)
#
# @param [ Server ] server The server.
#
# @return [ Server::ConnectionPool ] The connection pool.
#
# @since 2.2.0
def pool(server)
@pool_lock.synchronize do
pools[server.address] ||= Server::ConnectionPool.get(server)
end
end

# Get the interval, in seconds, in which a mongos read operation is
# retried.
#
Expand Down Expand Up @@ -339,6 +377,10 @@ def addition_allowed?(address)
[email protected]? || direct_connection?(address)
end

def pools
@pools ||= {}
end

def servers_list
@update_lock.synchronize do
@servers.reduce([]) do |servers, server|
Expand Down
16 changes: 15 additions & 1 deletion lib/mongo/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ def disconnect!
monitor.stop! and true
end

# When the server is flagged for garbage collection, stop the monitor
# thread.
#
# @example Finalize the object.
# Server.finalize(monitor)
#
# @param [ Server::Monitor ] monitor The server monitor.
#
# @since 2.2.0
def self.finalize(monitor)
proc { monitor.stop! }
end

# Instantiate a new server object. Will start the background refresh and
# subscribe to the appropriate events.
#
Expand All @@ -145,6 +158,7 @@ def initialize(address, cluster, monitoring, event_listeners, options = {})
@monitor = Monitor.new(address, event_listeners, options)
monitor.scan!
monitor.run!
ObjectSpace.define_finalizer(self, self.class.finalize(monitor))
end

# Get a pretty printed server inspection.
Expand All @@ -168,7 +182,7 @@ def inspect
#
# @since 2.0.0
def pool
@pool ||= ConnectionPool.get(self)
@pool ||= cluster.pool(self)
end

# Determine if the provided tags are a subset of the server's tags.
Expand Down
15 changes: 0 additions & 15 deletions lib/mongo/server/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ class Server
class ConnectionPool
include Loggable

# Used for synchronization of pools access.
MUTEX = Mutex.new

# @return [ Hash ] options The pool options.
attr_reader :options

Expand Down Expand Up @@ -131,22 +128,10 @@ class << self
#
# @since 2.0.0
def get(server)
MUTEX.synchronize do
pools[server.address] ||= create_pool(server)
end
end

private

def create_pool(server)
ConnectionPool.new(server.options) do
Connection.new(server, server.options)
end
end

def pools
@pools ||= {}
end
end
end
end
Expand Down
30 changes: 17 additions & 13 deletions spec/mongo/server/connection_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@
Mongo::Event::Listeners.new
end

let(:cluster) do
double('cluster')
end

describe '#checkin' do

let(:server) do
Mongo::Server.new(address, double('cluster'), monitoring, listeners, options)
Mongo::Server.new(address, cluster, monitoring, listeners, options)
end

let!(:pool) do
described_class.get(server)
end

after do
expect(cluster).to receive(:pool).with(server).and_return(pool)
server.disconnect!
end

Expand All @@ -55,7 +60,7 @@
describe '#checkout' do

let(:server) do
Mongo::Server.new(address, double('cluster'), monitoring, listeners, options)
Mongo::Server.new(address, cluster, monitoring, listeners, options)
end

let!(:pool) do
Expand Down Expand Up @@ -103,53 +108,52 @@
describe '#disconnect!' do

let(:server) do
Mongo::Server.new(address, double('cluster'), monitoring, listeners, options)
Mongo::Server.new(address, cluster, monitoring, listeners, options)
end

let!(:pool) do
described_class.get(server)
end

after do
server.disconnect!
end

it 'disconnects the queue' do
expect(pool.send(:queue)).to receive(:disconnect!).twice.and_call_original
pool.disconnect!
expect(cluster).to receive(:pool).with(server).and_return(pool)
expect(pool.send(:queue)).to receive(:disconnect!).once.and_call_original
server.disconnect!
end
end

describe '.get' do

let(:server) do
Mongo::Server.new(address, double('cluster'), monitoring, listeners, options)
Mongo::Server.new(address, cluster, monitoring, listeners, options)
end

let!(:pool) do
described_class.get(server)
end

after do
expect(cluster).to receive(:pool).with(server).and_return(pool)
server.disconnect!
end

it 'returns the pool for the server' do
expect(pool).to eql(described_class.get(server))
expect(pool).to_not be_nil
end
end

describe '#inspect' do

let(:server) do
Mongo::Server.new(address, double('cluster'), monitoring, listeners, options)
Mongo::Server.new(address, cluster, monitoring, listeners, options)
end

let!(:pool) do
described_class.get(server)
end

after do
expect(cluster).to receive(:pool).with(server).and_return(pool)
server.disconnect!
end

Expand All @@ -165,7 +169,7 @@
describe '#with_connection' do

let(:server) do
Mongo::Server.new(address, double('cluster'), monitoring, listeners, options)
Mongo::Server.new(address, cluster, monitoring, listeners, options)
end

let!(:pool) do
Expand Down
25 changes: 12 additions & 13 deletions spec/mongo/server/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,34 @@
Mongo::Event::Listeners.new
end

let(:cluster) do
double('cluster')
end

let(:server) do
Mongo::Server.new(address, double('cluster'), monitoring, listeners, TEST_OPTIONS)
Mongo::Server.new(address, cluster, monitoring, listeners, TEST_OPTIONS)
end

let(:pool) do
double('pool')
end

after do
expect(cluster).to receive(:pool).with(server).and_return(pool)
expect(pool).to receive(:disconnect!).and_return(true)
server.disconnect!
end

describe '#connectable?' do

# context 'when the connection is connectable' do

# let(:connection) do
# described_class.new(server)
# end

# it 'returns true' do
# expect(connection).to be_connectable
# end
# end

context 'when the connection is not connectable' do

let(:bad_address) do
Mongo::Address.new('127.0.0.1:666')
end

let(:bad_server) do
Mongo::Server.new(bad_address, double('cluster'), monitoring, listeners, TEST_OPTIONS)
Mongo::Server.new(bad_address, cluster, monitoring, listeners, TEST_OPTIONS)
end

let(:connection) do
Expand Down
Loading