Skip to content

RUBY-2869 Do not use mutexes in finalizers #2417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions lib/mongo/cluster/reapers/cursor_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,18 @@ def initialize(cluster)
@to_kill = {}
@active_cursor_ids = Set.new
@mutex = Mutex.new
@kill_spec_queue = Queue.new
end

attr_reader :cluster

# Schedule a kill cursors operation to be eventually executed.
#
# @param [ Cursor::KillSpec ] kill_spec The kill specification.
# @param [ Mongo::Server ] server The server to send the kill cursors
# operation to.
#
# @api private
def schedule_kill_cursor(kill_spec, server)
@mutex.synchronize do
if @active_cursor_ids.include?(kill_spec.cursor_id)
@to_kill[server.address.seed] ||= Set.new
@to_kill[server.address.seed] << kill_spec
end
end
def schedule_kill_cursor(kill_spec)
@kill_spec_queue << kill_spec
end

# Register a cursor id as active.
Expand Down Expand Up @@ -110,6 +104,24 @@ def unregister_cursor(id)
end
end

# Read and decode scheduled kill cursors operations.
#
# This method mutates instance variables without locking, so is is not
# thread safe. Generally, it should not be called itself, this is a helper
# for `kill_cursor` method.
#
# @api private
def read_scheduled_kill_specs
while kill_spec = @kill_spec_queue.pop(true)
if @active_cursor_ids.include?(kill_spec.cursor_id)
@to_kill[kill_spec.server_address] ||= Set.new
@to_kill[kill_spec.server_address] << kill_spec
end
end
rescue ThreadError
# Empty queue, nothing to do.
end

# Execute all pending kill cursors operations.
#
# @example Execute pending kill cursors operations.
Expand All @@ -122,14 +134,14 @@ def kill_cursors
# TODO optimize this to batch kill cursor operations for the same
# server/database/collection instead of killing each cursor
# individually.

loop do
server_address_str = nil
server_address = nil

kill_spec = @mutex.synchronize do
read_scheduled_kill_specs
# Find a server that has any cursors scheduled for destruction.
server_address_str, specs =
@to_kill.detect { |server_address_str, specs| specs.any? }
server_address, specs =
@to_kill.detect { |_, specs| specs.any? }

if specs.nil?
# All servers have empty specs, nothing to do.
Expand Down Expand Up @@ -168,7 +180,7 @@ def kill_cursors
op = Operation::KillCursors.new(spec)

server = cluster.servers.detect do |server|
server.address.seed == server_address_str
server.address == server_address
end

unless server
Expand Down
10 changes: 5 additions & 5 deletions lib/mongo/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ def initialize(view, result, server, options = {})
@session = @options[:session]
unless closed?
register
ObjectSpace.define_finalizer(self, self.class.finalize(kill_spec,
ObjectSpace.define_finalizer(self, self.class.finalize(kill_spec(server),
cluster,
server,
@session))
end
end
Expand All @@ -107,12 +106,12 @@ def initialize(view, result, server, options = {})
# @return [ Proc ] The Finalizer.
#
# @api private
def self.finalize(kill_spec, cluster, server, session)
def self.finalize(kill_spec, cluster, session)
unless KillSpec === kill_spec
raise ArgumentError, "First argument must be a KillSpec: #{kill_spec.inspect}"
end
proc do
cluster.schedule_kill_cursor(kill_spec, server)
cluster.schedule_kill_cursor(kill_spec)
session.end_session if session && session.implicit?
end
end
Expand Down Expand Up @@ -367,12 +366,13 @@ def get_more
end

# @api private
def kill_spec
def kill_spec(server)
KillSpec.new(
cursor_id: id,
coll_name: collection_name,
db_name: database.name,
service_id: initial_result.connection_description.service_id,
server_address: server.address,
)
end

Expand Down
21 changes: 19 additions & 2 deletions lib/mongo/cursor/kill_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,31 @@ class Cursor
# @api private
class KillSpec

def initialize(cursor_id:, coll_name:, db_name:, service_id:)
def initialize(cursor_id:, coll_name:, db_name:, service_id:, server_address:)
@cursor_id = cursor_id
@coll_name = coll_name
@db_name = db_name
@service_id = service_id
@server_address = server_address
end

attr_reader :cursor_id, :coll_name, :db_name, :service_id
attr_reader :cursor_id, :coll_name, :db_name, :service_id, :server_address

def ==(other)
cursor_id == other.cursor_id &&
coll_name == other.coll_name &&
db_name == other.db_name &&
service_id == other.service_id &&
server_address == other.server_address
end

def eql?(other)
self.==(other)
end

def hash
[cursor_id, coll_name, db_name, service_id, server_address].compact.hash
end
end
end
end
37 changes: 22 additions & 15 deletions spec/mongo/cluster/cursor_reaper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@
let(:cursor_id) { 1 }
let(:cursor_kill_spec_1) do
Mongo::Cursor::KillSpec.new(
cursor_id: cursor_id, coll_name: 'c', db_name: 'd', service_id: nil,
cursor_id: cursor_id, coll_name: 'c', db_name: 'd', service_id: nil, server_address: address
)
end
let(:cursor_kill_spec_2) do
Mongo::Cursor::KillSpec.new(
cursor_id: cursor_id, coll_name: 'c', db_name: 'q', service_id: nil,
cursor_id: cursor_id, coll_name: 'c', db_name: 'q', service_id: nil, server_address: address
)
end
let(:to_kill) { reaper.instance_variable_get(:@to_kill)}
Expand All @@ -60,36 +60,40 @@
context 'when there is not a list already for the server' do

before do
reaper.schedule_kill_cursor(cursor_kill_spec_1, server)
reaper.schedule_kill_cursor(cursor_kill_spec_1)
reaper.read_scheduled_kill_specs
end

it 'initializes the list of op specs to a set' do
expect(to_kill.keys).to eq([ address.seed ])
expect(to_kill[address.seed]).to eq(Set.new([cursor_kill_spec_1]))
expect(to_kill.keys).to eq([ address ])
expect(to_kill[address]).to contain_exactly(cursor_kill_spec_1)
end
end

context 'when there is a list of ops already for the server' do

before do
reaper.schedule_kill_cursor(cursor_kill_spec_1, server)
reaper.schedule_kill_cursor(cursor_kill_spec_2, server)
reaper.schedule_kill_cursor(cursor_kill_spec_1)
reaper.read_scheduled_kill_specs
reaper.schedule_kill_cursor(cursor_kill_spec_2)
reaper.read_scheduled_kill_specs
end

it 'adds the op to the server list' do
expect(to_kill.keys).to eq([ address.seed ])
expect(to_kill[address.seed]).to contain_exactly(cursor_kill_spec_1, cursor_kill_spec_2)
expect(to_kill.keys).to eq([ address ])
expect(to_kill[address]).to contain_exactly(cursor_kill_spec_1, cursor_kill_spec_2)
end

context 'when the same op is added more than once' do

before do
reaper.schedule_kill_cursor(cursor_kill_spec_2, server)
reaper.schedule_kill_cursor(cursor_kill_spec_2)
reaper.read_scheduled_kill_specs
end

it 'does not allow duplicates ops for a server' do
expect(to_kill.keys).to eq([ address.seed ])
expect(to_kill[address.seed]).to contain_exactly(cursor_kill_spec_1, cursor_kill_spec_2)
expect(to_kill.keys).to eq([ address ])
expect(to_kill[address]).to contain_exactly(cursor_kill_spec_1, cursor_kill_spec_2)
end
end
end
Expand All @@ -98,7 +102,7 @@
context 'when the cursor is not on the list of active cursors' do

before do
reaper.schedule_kill_cursor(cursor_kill_spec_1, server)
reaper.schedule_kill_cursor(cursor_kill_spec_1)
end

it 'does not add the kill cursors op spec to the list' do
Expand Down Expand Up @@ -189,8 +193,11 @@
around do |example|
authorized_collection.insert_many(docs)
periodic_executor.stop!
cluster.schedule_kill_cursor(cursor.kill_spec,
cursor.instance_variable_get(:@server))
cluster.schedule_kill_cursor(
cursor.kill_spec(
cursor.instance_variable_get(:@server)
)
)
periodic_executor.flush
example.run
periodic_executor.run!
Expand Down
5 changes: 3 additions & 2 deletions spec/mongo/cursor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,9 @@

before do
authorized_collection.insert_many(documents)
cluster.schedule_kill_cursor(cursor.kill_spec,
cursor.instance_variable_get(:@server))
cluster.schedule_kill_cursor(
cursor.kill_spec(cursor.instance_variable_get(:@server))
)
end

let(:view) do
Expand Down