Skip to content

Commit 301c4e9

Browse files
authored
Fix handling of IO#close interruption across threads/fibers. (#369)
1 parent bdfd6cf commit 301c4e9

File tree

3 files changed

+179
-1
lines changed

3 files changed

+179
-1
lines changed

lib/async/scheduler.rb

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ def block(blocker, timeout)
256256
# @parameter blocker [Object] The object that was blocking the fiber.
257257
# @parameter fiber [Fiber] The fiber to unblock.
258258
def unblock(blocker, fiber)
259-
# $stderr.puts "unblock(#{blocker}, #{fiber})"
259+
# Fiber.blocking{$stderr.puts "unblock(#{blocker}, #{fiber})"}
260260

261261
# This operation is protected by the GVL:
262262
if selector = @selector
@@ -272,6 +272,8 @@ def unblock(blocker, fiber)
272272
#
273273
# @parameter duration [Numeric | Nil] The time in seconds to sleep, or if nil, indefinitely.
274274
def kernel_sleep(duration = nil)
275+
# Fiber.blocking{$stderr.puts "kernel_sleep(#{duration}, #{Fiber.current})"}
276+
275277
if duration
276278
self.block(nil, duration)
277279
else
@@ -370,6 +372,34 @@ def io_write(io, buffer, length, offset = 0)
370372
end
371373
end
372374

375+
# Used to defer stopping the current task until later.
376+
class FiberInterrupt
377+
# Create a new stop later operation.
378+
#
379+
# @parameter task [Task] The task to stop later.
380+
def initialize(fiber, exception)
381+
@fiber = fiber
382+
@exception = exception
383+
end
384+
385+
# @returns [Boolean] Whether the task is alive.
386+
def alive?
387+
@fiber.alive?
388+
end
389+
390+
# Transfer control to the operation - this will stop the task.
391+
def transfer
392+
# Fiber.blocking{$stderr.puts "FiberInterrupt#transfer(#{@fiber}, #{@exception})"}
393+
@fiber.raise(@exception)
394+
end
395+
end
396+
397+
# Raise an exception on the specified fiber, waking up the event loop if necessary.
398+
def fiber_interrupt(fiber, exception)
399+
# Fiber.blocking{$stderr.puts "fiber_interrupt(#{fiber}, #{exception})"}
400+
unblock(nil, FiberInterrupt.new(fiber, exception))
401+
end
402+
373403
# Wait for the specified process ID to exit.
374404
#
375405
# @public Since *Async v2*.

releases.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,23 @@ The `Async::WorkerPool` implementation has been removed in favor of using `IO::E
88

99
To enable the worker pool, you can set the `ASYNC_SCHEDULER_WORKER_POOL` environment variable to `true`. This will allow the scheduler to use a worker pool for blocking operations, which can help improve performance in applications that perform a lot of CPU-bound operations (e.g. `rb_nogvl`).
1010

11+
### Better handling of `IO#close` using `fiber_interrupt`
12+
13+
`IO#close` interrupts fibers that are waiting on the IO using the new `fiber_interrupt` hook introduced in Ruby 3.5/4.0. This means that if you close an IO while a fiber is waiting on it, the fiber will be interrupted and will raise an `IOError`. This is a change from previous versions of Ruby, where closing an IO would not interrupt fibers waiting on it, and would instead interrupt the entire event loop (essentially a bug).
14+
15+
```ruby
16+
r, w = IO.pipe
17+
18+
Async do
19+
child = Async do
20+
r.gets
21+
end
22+
23+
r.close # This will interrupt the child fiber.
24+
child.wait # This will raise an `IOError` because the IO was closed.
25+
end
26+
```
27+
1128
## v2.24.0
1229

1330
- Ruby v3.1 support is dropped.

test/io.rb

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,135 @@
9090
out.close
9191
end
9292
end
93+
94+
with "#close" do
95+
it "can interrupt reading fiber when closing" do
96+
skip_unless_minimum_ruby_version("3.5")
97+
98+
r, w = IO.pipe
99+
100+
read_task = Async do
101+
expect do
102+
r.read(5)
103+
end.to raise_exception(IOError, message: be =~ /stream closed/)
104+
end
105+
106+
r.close
107+
read_task.wait
108+
end
109+
110+
it "can interrupt reading fiber when closing from another fiber" do
111+
skip_unless_minimum_ruby_version("3.5")
112+
113+
r, w = IO.pipe
114+
115+
read_task = Async do
116+
expect do
117+
r.read(5)
118+
end.to raise_exception(IOError, message: be =~ /stream closed/)
119+
ensure
120+
puts "Exiting read task"
121+
end
122+
123+
close_task = Async do
124+
r.close
125+
ensure
126+
puts "Exiting close task"
127+
end
128+
129+
close_task.wait
130+
read_task.wait
131+
end
132+
133+
it "can interrupt reading fiber when closing from a new thread" do
134+
skip_unless_minimum_ruby_version("3.5")
135+
136+
r, w = IO.pipe
137+
138+
read_task = Async do
139+
expect do
140+
r.read(5)
141+
end.to raise_exception(IOError, message: be =~ /stream closed/)
142+
end
143+
144+
close_thread = Thread.new do
145+
r.close
146+
end
147+
148+
close_thread.value
149+
read_task.wait
150+
end
151+
152+
it "can interrupt reading fiber when closing from a fiber in a new thread" do
153+
skip_unless_minimum_ruby_version("3.5")
154+
155+
r, w = IO.pipe
156+
157+
read_task = Async do
158+
expect do
159+
r.read(5)
160+
end.to raise_exception(IOError, message: be =~ /stream closed/)
161+
end
162+
163+
close_thread = Thread.new do
164+
close_task = Async do
165+
r.close
166+
end
167+
close_task.wait
168+
end
169+
170+
close_thread.value
171+
read_task.wait
172+
end
173+
174+
it "can interrupt reading thread when closing from a fiber" do
175+
skip_unless_minimum_ruby_version("3.5")
176+
177+
r, w = IO.pipe
178+
179+
read_thread = Thread.new do
180+
Thread.current.report_on_exception = false
181+
r.read(5)
182+
end
183+
184+
# Wait until read_thread blocks on I/O
185+
Thread.pass until read_thread.status == "sleep"
186+
187+
close_task = Async do
188+
r.close
189+
end
190+
191+
close_task.wait
192+
193+
expect do
194+
read_thread.join
195+
end.to raise_exception(IOError, message: be =~ /closed/)
196+
end
197+
198+
it "can interrupt reading fiber in a new thread when closing from a fiber" do
199+
skip_unless_minimum_ruby_version("3.5")
200+
201+
r, w = IO.pipe
202+
203+
read_thread = Thread.new do
204+
Thread.current.report_on_exception = false
205+
read_task = Async do
206+
expect do
207+
r.read(5)
208+
end.to raise_exception(IOError, message: be =~ /closed/)
209+
end
210+
read_task.wait
211+
end
212+
213+
# Wait until read_thread blocks on I/O
214+
Thread.pass until read_thread.status == "sleep"
215+
216+
close_task = Async do
217+
r.close
218+
end
219+
close_task.wait
220+
221+
read_thread.value
222+
end
223+
end
93224
end

0 commit comments

Comments
 (0)