Skip to content

Commit caa3fe7

Browse files
committed
Add basic test of close interruption.
1 parent f30c2f8 commit caa3fe7

File tree

3 files changed

+191
-1
lines changed

3 files changed

+191
-1
lines changed

lib/async/scheduler.rb

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ def block(blocker, timeout)
234234
# @parameter blocker [Object] The object that was blocking the fiber.
235235
# @parameter fiber [Fiber] The fiber to unblock.
236236
def unblock(blocker, fiber)
237-
# $stderr.puts "unblock(#{blocker}, #{fiber})"
237+
# Fiber.blocking{$stderr.puts "unblock(#{blocker}, #{fiber})"}
238238

239239
# This operation is protected by the GVL:
240240
if selector = @selector
@@ -250,6 +250,8 @@ def unblock(blocker, fiber)
250250
#
251251
# @parameter duration [Numeric | Nil] The time in seconds to sleep, or if nil, indefinitely.
252252
def kernel_sleep(duration = nil)
253+
# Fiber.blocking{$stderr.puts "kernel_sleep(#{duration}, #{Fiber.current})"}
254+
253255
if duration
254256
self.block(nil, duration)
255257
else
@@ -348,6 +350,34 @@ def io_write(io, buffer, length, offset = 0)
348350
end
349351
end
350352

353+
# Used to defer stopping the current task until later.
354+
class FiberInterrupt
355+
# Create a new stop later operation.
356+
#
357+
# @parameter task [Task] The task to stop later.
358+
def initialize(fiber, exception)
359+
@fiber = fiber
360+
@exception = exception
361+
end
362+
363+
# @returns [Boolean] Whether the task is alive.
364+
def alive?
365+
@fiber.alive?
366+
end
367+
368+
# Transfer control to the operation - this will stop the task.
369+
def transfer
370+
# Fiber.blocking{$stderr.puts "FiberInterrupt#transfer(#{@fiber}, #{@exception})"}
371+
@fiber.raise(@exception)
372+
end
373+
end
374+
375+
# Raise an exception on the specified fiber, waking up the event loop if necessary.
376+
def fiber_interrupt(fiber, exception)
377+
# Fiber.blocking{$stderr.puts "fiber_interrupt(#{fiber}, #{exception})"}
378+
unblock(nil, FiberInterrupt.new(fiber, exception))
379+
end
380+
351381
# Wait for the specified process ID to exit.
352382
#
353383
# @public Since *Async v2*.

test.rb

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#!/usr/bin/env ruby
2+
3+
require_relative 'lib/async'
4+
5+
Async do
6+
r, w = IO.pipe
7+
8+
read_thread = Thread.new do
9+
Thread.current.report_on_exception = false
10+
r.read(5)
11+
end
12+
13+
# Wait until read_thread blocks on I/O
14+
Thread.pass until read_thread.status == "sleep"
15+
16+
close_task = Async do
17+
r.close
18+
end
19+
20+
close_task.wait
21+
begin
22+
read_thread.join
23+
rescue => error
24+
puts "Caught exception: #{error.class} - #{error.message}"
25+
end
26+
end

test/io.rb

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,138 @@
9090
out.close
9191
end
9292
end
93+
94+
with "#close" do
95+
it "can interrupt reading fiber when closing" do
96+
skip_unless_minimum_ruby_version("3.5")
97+
98+
r, w = IO.pipe
99+
100+
read_task = Async do
101+
expect do
102+
r.read(5)
103+
end.to raise_exception(IOError, message: be =~ /stream closed/)
104+
end
105+
106+
r.close
107+
read_task.wait
108+
end
109+
110+
it "can interrupt reading fiber when closing from another fiber" do
111+
skip_unless_minimum_ruby_version("3.5")
112+
113+
r, w = IO.pipe
114+
115+
read_task = Async do
116+
expect do
117+
r.read(5)
118+
end.to raise_exception(IOError, message: be =~ /stream closed/)
119+
end
120+
121+
close_task = Async do
122+
r.close
123+
end
124+
125+
close_task.wait
126+
read_task.wait
127+
end
128+
129+
it "can interrupt reading fiber when closing from a new thread" do
130+
skip_unless_minimum_ruby_version("3.5")
131+
132+
r, w = IO.pipe
133+
134+
read_task = Async do
135+
expect do
136+
r.read(5)
137+
end.to raise_exception(IOError, message: be =~ /stream closed/)
138+
end
139+
140+
close_thread = Thread.new do
141+
r.close
142+
end
143+
144+
close_thread.value
145+
read_task.wait
146+
end
147+
148+
it "can interrupt reading fiber when closing from a fiber in a new thread" do
149+
skip_unless_minimum_ruby_version("3.5")
150+
151+
r, w = IO.pipe
152+
153+
read_task = Async do
154+
expect do
155+
r.read(5)
156+
end.to raise_exception(IOError, message: be =~ /stream closed/)
157+
end
158+
159+
close_thread = Thread.new do
160+
close_task = Async do
161+
r.close
162+
end
163+
close_task.wait
164+
end
165+
166+
close_thread.value
167+
read_task.wait
168+
end
169+
170+
it "can interrupt reading thread when closing from a fiber" do
171+
skip_unless_minimum_ruby_version("3.5")
172+
173+
$stderr.puts "---------------------------------"
174+
r, w = IO.pipe
175+
176+
read_thread = Thread.new do
177+
Thread.current.report_on_exception = false
178+
puts "Reading in thread #{Thread.current}"
179+
r.read(5)
180+
ensure
181+
puts "Thread #{Thread.current} finished reading"
182+
end
183+
184+
# Wait until read_thread blocks on I/O
185+
Thread.pass until read_thread.status == "sleep"
186+
187+
close_task = Async do
188+
puts "Closing in fiber #{Thread.current}"
189+
r.close
190+
ensure
191+
puts "Closed in fiber #{Thread.current}"
192+
end
193+
194+
close_task.wait
195+
196+
expect do
197+
read_thread.join
198+
end.to raise_exception(IOError, message: be =~ /stream closed/)
199+
end
200+
201+
it "can interrupt reading fiber in a new thread when closing from a fiber" do
202+
skip_unless_minimum_ruby_version("3.5")
203+
204+
r, w = IO.pipe
205+
206+
read_thread = Thread.new do
207+
Thread.current.report_on_exception = false
208+
read_task = Async do
209+
expect do
210+
r.read(5)
211+
end.to raise_exception(IOError, message: be =~ /stream closed/)
212+
end
213+
read_task.wait
214+
end
215+
216+
# Wait until read_thread blocks on I/O
217+
Thread.pass until read_thread.status == "sleep"
218+
219+
close_task = Async do
220+
r.close
221+
end
222+
close_task.wait
223+
224+
read_thread.value
225+
end
226+
end
93227
end

0 commit comments

Comments
 (0)