Skip to content

Commit 3e05bf8

Browse files
RUBY-2869 Do not use mutexes in finalizers (#2417)
1 parent cf1ed1e commit 3e05bf8

File tree

5 files changed

+75
-38
lines changed

5 files changed

+75
-38
lines changed

lib/mongo/cluster/reapers/cursor_reaper.rb

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,24 +44,18 @@ def initialize(cluster)
4444
@to_kill = {}
4545
@active_cursor_ids = Set.new
4646
@mutex = Mutex.new
47+
@kill_spec_queue = Queue.new
4748
end
4849

4950
attr_reader :cluster
5051

5152
# Schedule a kill cursors operation to be eventually executed.
5253
#
5354
# @param [ Cursor::KillSpec ] kill_spec The kill specification.
54-
# @param [ Mongo::Server ] server The server to send the kill cursors
55-
# operation to.
5655
#
5756
# @api private
58-
def schedule_kill_cursor(kill_spec, server)
59-
@mutex.synchronize do
60-
if @active_cursor_ids.include?(kill_spec.cursor_id)
61-
@to_kill[server.address.seed] ||= Set.new
62-
@to_kill[server.address.seed] << kill_spec
63-
end
64-
end
57+
def schedule_kill_cursor(kill_spec)
58+
@kill_spec_queue << kill_spec
6559
end
6660

6761
# Register a cursor id as active.
@@ -110,6 +104,24 @@ def unregister_cursor(id)
110104
end
111105
end
112106

107+
# Read and decode scheduled kill cursors operations.
108+
#
109+
# This method mutates instance variables without locking, so is is not
110+
# thread safe. Generally, it should not be called itself, this is a helper
111+
# for `kill_cursor` method.
112+
#
113+
# @api private
114+
def read_scheduled_kill_specs
115+
while kill_spec = @kill_spec_queue.pop(true)
116+
if @active_cursor_ids.include?(kill_spec.cursor_id)
117+
@to_kill[kill_spec.server_address] ||= Set.new
118+
@to_kill[kill_spec.server_address] << kill_spec
119+
end
120+
end
121+
rescue ThreadError
122+
# Empty queue, nothing to do.
123+
end
124+
113125
# Execute all pending kill cursors operations.
114126
#
115127
# @example Execute pending kill cursors operations.
@@ -122,14 +134,14 @@ def kill_cursors
122134
# TODO optimize this to batch kill cursor operations for the same
123135
# server/database/collection instead of killing each cursor
124136
# individually.
125-
126137
loop do
127-
server_address_str = nil
138+
server_address = nil
128139

129140
kill_spec = @mutex.synchronize do
141+
read_scheduled_kill_specs
130142
# Find a server that has any cursors scheduled for destruction.
131-
server_address_str, specs =
132-
@to_kill.detect { |server_address_str, specs| specs.any? }
143+
server_address, specs =
144+
@to_kill.detect { |_, specs| specs.any? }
133145

134146
if specs.nil?
135147
# All servers have empty specs, nothing to do.
@@ -168,7 +180,7 @@ def kill_cursors
168180
op = Operation::KillCursors.new(spec)
169181

170182
server = cluster.servers.detect do |server|
171-
server.address.seed == server_address_str
183+
server.address == server_address
172184
end
173185

174186
unless server

lib/mongo/cursor.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,8 @@ def initialize(view, result, server, options = {})
8484
@session = @options[:session]
8585
unless closed?
8686
register
87-
ObjectSpace.define_finalizer(self, self.class.finalize(kill_spec,
87+
ObjectSpace.define_finalizer(self, self.class.finalize(kill_spec(server),
8888
cluster,
89-
server,
9089
@session))
9190
end
9291
end
@@ -107,12 +106,12 @@ def initialize(view, result, server, options = {})
107106
# @return [ Proc ] The Finalizer.
108107
#
109108
# @api private
110-
def self.finalize(kill_spec, cluster, server, session)
109+
def self.finalize(kill_spec, cluster, session)
111110
unless KillSpec === kill_spec
112111
raise ArgumentError, "First argument must be a KillSpec: #{kill_spec.inspect}"
113112
end
114113
proc do
115-
cluster.schedule_kill_cursor(kill_spec, server)
114+
cluster.schedule_kill_cursor(kill_spec)
116115
session.end_session if session && session.implicit?
117116
end
118117
end
@@ -367,12 +366,13 @@ def get_more
367366
end
368367

369368
# @api private
370-
def kill_spec
369+
def kill_spec(server)
371370
KillSpec.new(
372371
cursor_id: id,
373372
coll_name: collection_name,
374373
db_name: database.name,
375374
service_id: initial_result.connection_description.service_id,
375+
server_address: server.address,
376376
)
377377
end
378378

lib/mongo/cursor/kill_spec.rb

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,31 @@ class Cursor
2525
# @api private
2626
class KillSpec
2727

28-
def initialize(cursor_id:, coll_name:, db_name:, service_id:)
28+
def initialize(cursor_id:, coll_name:, db_name:, service_id:, server_address:)
2929
@cursor_id = cursor_id
3030
@coll_name = coll_name
3131
@db_name = db_name
3232
@service_id = service_id
33+
@server_address = server_address
3334
end
3435

35-
attr_reader :cursor_id, :coll_name, :db_name, :service_id
36+
attr_reader :cursor_id, :coll_name, :db_name, :service_id, :server_address
37+
38+
def ==(other)
39+
cursor_id == other.cursor_id &&
40+
coll_name == other.coll_name &&
41+
db_name == other.db_name &&
42+
service_id == other.service_id &&
43+
server_address == other.server_address
44+
end
45+
46+
def eql?(other)
47+
self.==(other)
48+
end
49+
50+
def hash
51+
[cursor_id, coll_name, db_name, service_id, server_address].compact.hash
52+
end
3653
end
3754
end
3855
end

spec/mongo/cluster/cursor_reaper_spec.rb

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@
4141
let(:cursor_id) { 1 }
4242
let(:cursor_kill_spec_1) do
4343
Mongo::Cursor::KillSpec.new(
44-
cursor_id: cursor_id, coll_name: 'c', db_name: 'd', service_id: nil,
44+
cursor_id: cursor_id, coll_name: 'c', db_name: 'd', service_id: nil, server_address: address
4545
)
4646
end
4747
let(:cursor_kill_spec_2) do
4848
Mongo::Cursor::KillSpec.new(
49-
cursor_id: cursor_id, coll_name: 'c', db_name: 'q', service_id: nil,
49+
cursor_id: cursor_id, coll_name: 'c', db_name: 'q', service_id: nil, server_address: address
5050
)
5151
end
5252
let(:to_kill) { reaper.instance_variable_get(:@to_kill)}
@@ -60,36 +60,40 @@
6060
context 'when there is not a list already for the server' do
6161

6262
before do
63-
reaper.schedule_kill_cursor(cursor_kill_spec_1, server)
63+
reaper.schedule_kill_cursor(cursor_kill_spec_1)
64+
reaper.read_scheduled_kill_specs
6465
end
6566

6667
it 'initializes the list of op specs to a set' do
67-
expect(to_kill.keys).to eq([ address.seed ])
68-
expect(to_kill[address.seed]).to eq(Set.new([cursor_kill_spec_1]))
68+
expect(to_kill.keys).to eq([ address ])
69+
expect(to_kill[address]).to contain_exactly(cursor_kill_spec_1)
6970
end
7071
end
7172

7273
context 'when there is a list of ops already for the server' do
7374

7475
before do
75-
reaper.schedule_kill_cursor(cursor_kill_spec_1, server)
76-
reaper.schedule_kill_cursor(cursor_kill_spec_2, server)
76+
reaper.schedule_kill_cursor(cursor_kill_spec_1)
77+
reaper.read_scheduled_kill_specs
78+
reaper.schedule_kill_cursor(cursor_kill_spec_2)
79+
reaper.read_scheduled_kill_specs
7780
end
7881

7982
it 'adds the op to the server list' do
80-
expect(to_kill.keys).to eq([ address.seed ])
81-
expect(to_kill[address.seed]).to contain_exactly(cursor_kill_spec_1, cursor_kill_spec_2)
83+
expect(to_kill.keys).to eq([ address ])
84+
expect(to_kill[address]).to contain_exactly(cursor_kill_spec_1, cursor_kill_spec_2)
8285
end
8386

8487
context 'when the same op is added more than once' do
8588

8689
before do
87-
reaper.schedule_kill_cursor(cursor_kill_spec_2, server)
90+
reaper.schedule_kill_cursor(cursor_kill_spec_2)
91+
reaper.read_scheduled_kill_specs
8892
end
8993

9094
it 'does not allow duplicates ops for a server' do
91-
expect(to_kill.keys).to eq([ address.seed ])
92-
expect(to_kill[address.seed]).to contain_exactly(cursor_kill_spec_1, cursor_kill_spec_2)
95+
expect(to_kill.keys).to eq([ address ])
96+
expect(to_kill[address]).to contain_exactly(cursor_kill_spec_1, cursor_kill_spec_2)
9397
end
9498
end
9599
end
@@ -98,7 +102,7 @@
98102
context 'when the cursor is not on the list of active cursors' do
99103

100104
before do
101-
reaper.schedule_kill_cursor(cursor_kill_spec_1, server)
105+
reaper.schedule_kill_cursor(cursor_kill_spec_1)
102106
end
103107

104108
it 'does not add the kill cursors op spec to the list' do
@@ -189,8 +193,11 @@
189193
around do |example|
190194
authorized_collection.insert_many(docs)
191195
periodic_executor.stop!
192-
cluster.schedule_kill_cursor(cursor.kill_spec,
193-
cursor.instance_variable_get(:@server))
196+
cluster.schedule_kill_cursor(
197+
cursor.kill_spec(
198+
cursor.instance_variable_get(:@server)
199+
)
200+
)
194201
periodic_executor.flush
195202
example.run
196203
periodic_executor.run!

spec/mongo/cursor_spec.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,9 @@
331331

332332
before do
333333
authorized_collection.insert_many(documents)
334-
cluster.schedule_kill_cursor(cursor.kill_spec,
335-
cursor.instance_variable_get(:@server))
334+
cluster.schedule_kill_cursor(
335+
cursor.kill_spec(cursor.instance_variable_get(:@server))
336+
)
336337
end
337338

338339
let(:view) do

0 commit comments

Comments
 (0)