Skip to content

RUBY-932 Update Server Discovery and Monitoring code and tests #637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jun 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 69 additions & 13 deletions lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ class Cluster
include Event::Subscriber
include Loggable

# @return [ Array<String> ] The provided seed addresses.
attr_reader :addresses

# @return [ Hash ] The options hash.
attr_reader :options

Expand Down Expand Up @@ -69,9 +66,9 @@ def add(host)
if !addresses.include?(address)
if addition_allowed?(address)
log_debug([ "Adding #{address.to_s} to the cluster." ])
addresses.push(address)
@update_lock.synchronize { @addresses.push(address) }
server = Server.new(address, self, event_listeners, options)
@servers.push(server)
@update_lock.synchronize { @servers.push(server) }
server
end
end
Expand All @@ -92,9 +89,9 @@ def initialize(seeds, options = {})
@event_listeners = Event::Listeners.new
@options = options.freeze
@topology = Topology.initial(seeds, options)
@update_lock = Mutex.new

subscribe_to(Event::SERVER_ADDED, Event::ServerAdded.new(self))
subscribe_to(Event::SERVER_REMOVED, Event::ServerRemoved.new(self))
subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self))
subscribe_to(Event::PRIMARY_ELECTED, Event::PrimaryElected.new(self))

seeds.each{ |seed| add(seed) }
Expand Down Expand Up @@ -136,10 +133,10 @@ def next_primary
#
# @since 2.0.0
def elect_primary!(description)
@topology = topology.elect_primary(description, @servers)
@topology = topology.elect_primary(description, servers_list)
end

# Removed the server from the cluster for the provided address, if it
# Remove the server from the cluster for the provided address, if it
# exists.
#
# @example Remove the server from the cluster.
Expand All @@ -151,9 +148,10 @@ def elect_primary!(description)
def remove(host)
log_debug([ "#{host} being removed from the cluster." ])
address = Address.new(host)
removed_servers = @servers.reject!{ |server| server.address == address }
removed_servers = @servers.select { |s| s.address == address }
@update_lock.synchronize { @servers = @servers - removed_servers }
removed_servers.each{ |server| server.disconnect! } if removed_servers
addresses.reject!{ |addr| addr == address }
@update_lock.synchronize { @addresses.reject! { |addr| addr == address } }
end

# Force a scan of all known servers in the cluster.
Expand All @@ -168,7 +166,7 @@ def remove(host)
#
# @since 2.0.0
def scan!
@servers.each{ |server| server.scan! } and true
servers_list.each{ |server| server.scan! } and true
end

# Get a list of server candidates from the cluster that can have operations
Expand All @@ -181,7 +179,37 @@ def scan!
#
# @since 2.0.0
def servers
topology.servers(@servers.compact).compact
topology.servers(servers_list.compact).compact
end

# Add hosts in a description to the cluster.
#
# @example Add hosts in a description to the cluster.
# cluster.add_hosts(description)
#
# @param [ Mongo::Server::Description ] description The description.
#
# @since 2.0.6
def add_hosts(description)
if topology.add_hosts?(description, servers_list)
description.servers.each { |s| add(s) }
end
end

# Remove hosts in a description from the cluster.
#
# @example Remove hosts in a description from the cluster.
# cluster.remove_hosts(description)
#
# @param [ Mongo::Server::Description ] description The description.
#
# @since 2.0.6
def remove_hosts(description)
if topology.remove_hosts?(description)
servers_list.each do |s|
remove(s.address.to_s) if topology.remove_server?(description, s)
end
end
end

# Create a cluster for the provided client, for use when we don't want the
Expand All @@ -202,6 +230,18 @@ def self.create(client)
client.instance_variable_set(:@cluster, cluster)
end

# The addresses in the cluster.
#
# @example Get the addresses in the cluster.
# cluster.addresses
#
# @return [ Array<Mongo::Address> ] The addresses.
#
# @since 2.0.6
def addresses
addresses_list
end

private

def direct_connection?(address)
Expand All @@ -211,5 +251,21 @@ def direct_connection?(address)
def addition_allowed?(address)
[email protected]? || direct_connection?(address)
end

def servers_list
@update_lock.synchronize do
@servers.reduce([]) do |servers, server|
servers << server
end
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this return a copy of the list? Doesn't it need to return a copy in order to avoid overlapping with a thread that's updating the servers while holding the servers_update lock? E.g. it seems like if one thread is in scan! and slowly iterating the list while another thread is in remove_hosts, removing servers while holding the lock, you can still get a runtime err.

end

def addresses_list
@update_lock.synchronize do
@addresses.reduce([]) do |addresses, address|
addresses << address
end
end
end
end
end
65 changes: 65 additions & 0 deletions lib/mongo/cluster/topology/replica_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,54 @@ def servers(servers)
end
end

# Whether a server description's hosts may be added to the cluster.
#
# @example Check if a description's hosts may be added to the cluster.
# topology.add_hosts?(description, servers)
#
# @param [ Mongo::Server::Description ] description The description.
# @param [ Array<Mongo::Server> ] servers The cluster servers.
#
# @return [ true, false ] Whether a description's hosts may be added.
#
# @since 2.0.6
def add_hosts?(description, servers)
!!(member_of_this_set?(description) && !has_primary?(servers))
end

# Whether a description can be used to remove hosts from the cluster.
#
# @example Check if a description can be used to remove hosts from the cluster.
# topology.remove_hosts?(description)
#
# @param [ Mongo::Server::Description ] description The description.
#
# @return [ true, false ] Whether hosts may be removed from the cluster.
#
# @since 2.0.6
def remove_hosts?(description)
!description.config.empty? &&
(description.primary? ||
description.hosts.empty? ||
!member_of_this_set?(description))
end

# Whether a specific server in the cluster can be removed, given a description.
#
# @example Check if a specific server can be removed from the cluster.
# topology.remove_server?(description, server)
#
# @param [ Mongo::Server::Description ] description The description.
# @param [ Mongo::Serve ] server The server in question.
#
# @return [ true, false ] Whether the server can be removed from the cluster.
#
# @since 2.0.6
def remove_server?(description, server)
remove_self?(description, server) ||
(member_of_this_set?(description) && !description.lists_server?(server))
end

# A replica set topology is not sharded.
#
# @example Is the topology sharded?
Expand Down Expand Up @@ -154,6 +202,23 @@ def single?; false; end
#
# @since 2.0.0
def unknown?; false; end

private

def has_primary?(servers)
servers.find { |s| s.primary? }
end

def member_of_this_set?(description)
description.replica_set_member? &&
description.replica_set_name == replica_set_name
end

def remove_self?(description, server)
!member_of_this_set?(description) &&
description.is_server?(server) &&
!description.ghost?
end
end
end
end
Expand Down
50 changes: 50 additions & 0 deletions lib/mongo/cluster/topology/sharded.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,50 @@ def servers(servers)
servers.select{ |server| server.mongos? }
end

# Whether a server description's hosts may be added to the cluster.
#
# @example Check if a description's hosts may be added to the cluster.
# topology.add_hosts?(description, servers)
#
# @param [ Mongo::Server::Description ] description The description.
# @param [ Array<Mongo::Server> ] servers The cluster servers.
#
# @return [ false ] A description's hosts are never added to a
# sharded cluster.
#
# @since 2.0.6
def add_hosts?(description, servers); false; end

# Whether a description can be used to remove hosts from the cluster.
#
# @example Check if a description can be used to remove hosts from
# the cluster.
# topology.remove_hosts?(description)
#
# @param [ Mongo::Server::Description ] description The description.
#
# @return [ true ] A description can always be used to remove hosts
# from a sharded cluster.
#
# @since 2.0.6
def remove_hosts?(description); true; end

# Whether a specific server in the cluster can be removed, given a description.
#
# @example Check if a specific server can be removed from the cluster.
# topology.remove_server?(description, server)
#
# @param [ Mongo::Server::Description ] description The description.
# @param [ Mongo::Serve ] server The server in question.
#
# @return [ true, false ] Whether the server can be removed from the cluster.
#
# @since 2.0.6
def remove_server?(description, server)
remove_self?(description, server) ||
!(server.mongos? || server.unknown?)
end

# A sharded topology is sharded.
#
# @example Is the topology sharded?
Expand Down Expand Up @@ -126,6 +170,12 @@ def single?; false; end
#
# @since 2.0.0
def unknown?; false; end

private

def remove_self?(description, server)
description.is_server?(server) && !description.mongos?
end
end
end
end
Expand Down
41 changes: 41 additions & 0 deletions lib/mongo/cluster/topology/single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,47 @@ def servers(servers, name = nil)
[ servers.detect { |server| !server.unknown? } ]
end

# Whether a server description's hosts may be added to the cluster.
#
# @example Check if a description's hosts may be added to the cluster.
# topology.add_hosts?(description, servers)
#
# @param [ Mongo::Server::Description ] description The description.
# @param [ Array<Mongo::Server> ] servers The cluster servers.
#
# @return [ false ] A description's hosts are never added to a
# cluster of Single topology.
#
# @since 2.0.6
def add_hosts?(description, servers); false; end

# Whether a description can be used to remove hosts from the cluster.
#
# @example Check if a description can be used to remove hosts from
# the cluster.
# topology.remove_hosts?(description)
#
# @param [ Mongo::Server::Description ] description The description.
#
# @return [ true ] A description can never be used to remove hosts
# from a cluster of Single topology.
#
# @since 2.0.6
def remove_hosts?(description); false; end

# Whether a specific server in the cluster can be removed, given a description.
#
# @example Check if a specific server can be removed from the cluster.
# topology.remove_server?(description, server)
#
# @param [ Mongo::Server::Description ] description The description.
# @param [ Mongo::Serve ] server The server in question.
#
# @return [ false ] A server is never removed from a cluster of Single topology.
#
# @since 2.0.6
def remove_server?(description, server); false; end

# A single topology is not sharded.
#
# @example Is the topology sharded?
Expand Down
44 changes: 44 additions & 0 deletions lib/mongo/cluster/topology/unknown.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,50 @@ def single?; false; end
# @since 2.0.0
def unknown?; true; end

# Whether a server description's hosts may be added to the cluster.
#
# @example Check if a description's hosts may be added to the cluster.
# topology.add_hosts?(description, servers)
#
# @param [ Mongo::Server::Description ] description The description.
# @param [ Array<Mongo::Server> ] servers The cluster servers.
#
# @return [ true, false ] Whether a description's hosts may be added.
#
# @since 2.0.6
def add_hosts?(description, servers)
!(description.unknown? || description.ghost?)
end

# Whether a description can be used to remove hosts from the cluster.
#
# @example Check if a description can be used to remove hosts from the cluster.
# topology.remove_hosts?(description)
#
# @param [ Mongo::Server::Description ] description The description.
#
# @return [ true, false ] Whether hosts may be removed from the cluster.
#
# @since 2.0.6
def remove_hosts?(description)
description.standalone?
end

# Whether a specific server in the cluster can be removed, given a description.
#
# @example Check if a specific server can be removed from the cluster.
# topology.remove_server?(description, server)
#
# @param [ Mongo::Server::Description ] description The description.
# @param [ Mongo::Serve ] server The server in question.
#
# @return [ true, false ] Whether the server can be removed from the cluster.
#
# @since 2.0.6
def remove_server?(description, server)
description.standalone? && description.is_server?(server)
end

private

def initialize_replica_set(description, servers)
Expand Down
Loading