Skip to content

Commit a1fb68e

Browse files
Try queue
1 parent 3836771 commit a1fb68e

File tree

5 files changed

+36
-50
lines changed

5 files changed

+36
-50
lines changed

lib/mongo/cluster/reapers/cursor_reaper.rb

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,6 @@ class CursorReaper
3434
# @since 2.3.0
3535
FREQUENCY = 1.freeze
3636

37-
# Symbol for separating parts of scheduled kill spec messages.
38-
#
39-
# @api private
40-
MSG_SEPARATOR = "\t"
41-
4237
# Create a cursor reaper.
4338
#
4439
# @param [ Cluster ] cluster The cluster.
@@ -49,27 +44,18 @@ def initialize(cluster)
4944
@to_kill = {}
5045
@active_cursor_ids = Set.new
5146
@mutex = Mutex.new
52-
@pipe_read, @pipe_write = IO.pipe
47+
@kill_spec_queue = Queue.new
5348
end
5449

5550
attr_reader :cluster
5651

5752
# Schedule a kill cursors operation to be eventually executed.
5853
#
5954
# @param [ Cursor::KillSpec ] kill_spec The kill specification.
60-
# @param [ Mongo::Server ] server The server to send the kill cursors
61-
# operation to.
6255
#
6356
# @api private
64-
def schedule_kill_cursor(kill_spec, server)
65-
msg = [
66-
server.address.seed,
67-
kill_spec.cursor_id,
68-
kill_spec.coll_name,
69-
kill_spec.db_name,
70-
kill_spec.service_id
71-
].join(MSG_SEPARATOR)
72-
@pipe_write.puts(msg)
57+
def schedule_kill_cursor(kill_spec)
58+
@kill_spec_queue << kill_spec
7359
end
7460

7561
# Register a cursor id as active.
@@ -126,20 +112,11 @@ def unregister_cursor(id)
126112
#
127113
# @api private
128114
def read_scheduled_kill_specs
129-
while select_resp = IO.select([@pipe_read], nil, nil, 0.1)
130-
readable_io = select_resp&.first&.first
131-
next if readable_io.nil?
132-
msg = readable_io.gets.strip
133-
seed, cursor_id, coll_name, db_name, service_id = msg.split(MSG_SEPARATOR)
134-
kill_spec = Cursor::KillSpec.new(
135-
cursor_id: cursor_id.to_i,
136-
coll_name: coll_name,
137-
db_name: db_name,
138-
service_id: service_id
139-
)
115+
while !@kill_spec_queue.empty?
116+
kill_spec = @kill_spec_queue.pop
140117
if @active_cursor_ids.include?(kill_spec.cursor_id)
141-
@to_kill[seed] ||= Set.new
142-
@to_kill[seed] << kill_spec
118+
@to_kill[kill_spec.server_seed] ||= Set.new
119+
@to_kill[kill_spec.server_seed] << kill_spec
143120
end
144121
end
145122
end

lib/mongo/cursor.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,8 @@ def initialize(view, result, server, options = {})
8484
@session = @options[:session]
8585
unless closed?
8686
register
87-
ObjectSpace.define_finalizer(self, self.class.finalize(kill_spec,
87+
ObjectSpace.define_finalizer(self, self.class.finalize(kill_spec(server),
8888
cluster,
89-
server,
9089
@session))
9190
end
9291
end
@@ -107,12 +106,12 @@ def initialize(view, result, server, options = {})
107106
# @return [ Proc ] The Finalizer.
108107
#
109108
# @api private
110-
def self.finalize(kill_spec, cluster, server, session)
109+
def self.finalize(kill_spec, cluster, session)
111110
unless KillSpec === kill_spec
112111
raise ArgumentError, "First argument must be a KillSpec: #{kill_spec.inspect}"
113112
end
114113
proc do
115-
cluster.schedule_kill_cursor(kill_spec, server)
114+
cluster.schedule_kill_cursor(kill_spec)
116115
session.end_session if session && session.implicit?
117116
end
118117
end
@@ -367,12 +366,13 @@ def get_more
367366
end
368367

369368
# @api private
370-
def kill_spec
369+
def kill_spec(server)
371370
KillSpec.new(
372371
cursor_id: id,
373372
coll_name: collection_name,
374373
db_name: database.name,
375374
service_id: initial_result.connection_description.service_id,
375+
server_seed: server.address.seed,
376376
)
377377
end
378378

lib/mongo/cursor/kill_spec.rb

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,30 @@ class Cursor
2525
# @api private
2626
class KillSpec
2727

28-
def initialize(cursor_id:, coll_name:, db_name:, service_id:)
28+
def initialize(cursor_id:, coll_name:, db_name:, service_id:, server_seed:)
2929
@cursor_id = cursor_id
3030
@coll_name = coll_name
3131
@db_name = db_name
3232
@service_id = service_id
33+
@server_seed = server_seed
3334
end
3435

35-
attr_reader :cursor_id, :coll_name, :db_name, :service_id
36+
attr_reader :cursor_id, :coll_name, :db_name, :service_id, :server_seed
3637

3738
def ==(other)
38-
cursor_id == other.cursor_id && coll_name == other.coll_name && db_name == other.db_name && service_id == other.service_id
39+
cursor_id == other.cursor_id &&
40+
coll_name == other.coll_name &&
41+
db_name == other.db_name &&
42+
service_id == other.service_id &&
43+
server_seed == other.server_seed
3944
end
4045

4146
def eql?(other)
4247
self.==(other)
4348
end
4449

4550
def hash
46-
[cursor_id, coll_name, db_name, service_id].compact.hash
51+
[cursor_id, coll_name, db_name, service_id, server_seed].compact.hash
4752
end
4853
end
4954
end

spec/mongo/cluster/cursor_reaper_spec.rb

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@
4141
let(:cursor_id) { 1 }
4242
let(:cursor_kill_spec_1) do
4343
Mongo::Cursor::KillSpec.new(
44-
cursor_id: cursor_id, coll_name: 'c', db_name: 'd', service_id: nil,
44+
cursor_id: cursor_id, coll_name: 'c', db_name: 'd', service_id: nil, server_seed: address.seed
4545
)
4646
end
4747
let(:cursor_kill_spec_2) do
4848
Mongo::Cursor::KillSpec.new(
49-
cursor_id: cursor_id, coll_name: 'c', db_name: 'q', service_id: nil,
49+
cursor_id: cursor_id, coll_name: 'c', db_name: 'q', service_id: nil, server_seed: address.seed
5050
)
5151
end
5252
let(:to_kill) { reaper.instance_variable_get(:@to_kill)}
@@ -60,7 +60,7 @@
6060
context 'when there is not a list already for the server' do
6161

6262
before do
63-
reaper.schedule_kill_cursor(cursor_kill_spec_1, server)
63+
reaper.schedule_kill_cursor(cursor_kill_spec_1)
6464
reaper.read_scheduled_kill_specs
6565
end
6666

@@ -73,9 +73,9 @@
7373
context 'when there is a list of ops already for the server' do
7474

7575
before do
76-
reaper.schedule_kill_cursor(cursor_kill_spec_1, server)
76+
reaper.schedule_kill_cursor(cursor_kill_spec_1)
7777
reaper.read_scheduled_kill_specs
78-
reaper.schedule_kill_cursor(cursor_kill_spec_2, server)
78+
reaper.schedule_kill_cursor(cursor_kill_spec_2)
7979
reaper.read_scheduled_kill_specs
8080
end
8181

@@ -87,7 +87,7 @@
8787
context 'when the same op is added more than once' do
8888

8989
before do
90-
reaper.schedule_kill_cursor(cursor_kill_spec_2, server)
90+
reaper.schedule_kill_cursor(cursor_kill_spec_2)
9191
reaper.read_scheduled_kill_specs
9292
end
9393

@@ -102,7 +102,7 @@
102102
context 'when the cursor is not on the list of active cursors' do
103103

104104
before do
105-
reaper.schedule_kill_cursor(cursor_kill_spec_1, server)
105+
reaper.schedule_kill_cursor(cursor_kill_spec_1)
106106
end
107107

108108
it 'does not add the kill cursors op spec to the list' do
@@ -193,8 +193,11 @@
193193
around do |example|
194194
authorized_collection.insert_many(docs)
195195
periodic_executor.stop!
196-
cluster.schedule_kill_cursor(cursor.kill_spec,
197-
cursor.instance_variable_get(:@server))
196+
cluster.schedule_kill_cursor(
197+
cursor.kill_spec(
198+
cursor.instance_variable_get(:@server)
199+
)
200+
)
198201
periodic_executor.flush
199202
example.run
200203
periodic_executor.run!

spec/mongo/cursor_spec.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,9 @@
331331

332332
before do
333333
authorized_collection.insert_many(documents)
334-
cluster.schedule_kill_cursor(cursor.kill_spec,
335-
cursor.instance_variable_get(:@server))
334+
cluster.schedule_kill_cursor(
335+
cursor.kill_spec(cursor.instance_variable_get(:@server))
336+
)
336337
end
337338

338339
let(:view) do

0 commit comments

Comments
 (0)