Skip to content

Commit f6b53d2

Browse files
committed
Reorganize implementation of Async::LimitedQueue and related tests.
1 parent a91f32c commit f6b53d2

File tree

14 files changed

+317
-331
lines changed

14 files changed

+317
-331
lines changed

fixtures/async/a_queue.rb

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2018-2024, by Samuel Williams.
5+
# Copyright, 2019, by Ryan Musgrave.
6+
# Copyright, 2020-2022, by Bruno Sutic.
7+
8+
require "async"
9+
require "async/queue"
10+
require "sus/fixtures/async"
11+
require "async/semaphore"
12+
13+
require "async/chainable_async"
14+
15+
module Async
16+
AQueue = Sus::Shared("a queue") do
17+
let(:queue) {subject.new}
18+
19+
with "#push" do
20+
it "adds an item to the queue" do
21+
queue.push(:item)
22+
expect(queue.size).to be == 1
23+
expect(queue.dequeue).to be == :item
24+
end
25+
end
26+
27+
with "#pop" do
28+
it "removes an item from the queue" do
29+
queue.push(:item)
30+
expect(queue.pop).to be == :item
31+
expect(queue.size).to be == 0
32+
end
33+
end
34+
35+
with "#each" do
36+
it "can enumerate queue items" do
37+
reactor.async do |task|
38+
10.times do |item|
39+
task.sleep(0.0001)
40+
queue.enqueue(item)
41+
end
42+
43+
queue.enqueue(nil)
44+
end
45+
46+
items = []
47+
queue.each do |item|
48+
items << item
49+
end
50+
51+
expect(items).to be == 10.times.to_a
52+
end
53+
end
54+
55+
it "should process items in order" do
56+
reactor.async do |task|
57+
10.times do |i|
58+
task.sleep(0.001)
59+
queue.enqueue(i)
60+
end
61+
end
62+
63+
10.times do |j|
64+
expect(queue.dequeue).to be == j
65+
end
66+
end
67+
68+
it "can enqueue multiple items" do
69+
items = Array.new(10) { rand(10) }
70+
71+
reactor.async do |task|
72+
queue.enqueue(*items)
73+
end
74+
75+
items.each do |item|
76+
expect(queue.dequeue).to be == item
77+
end
78+
end
79+
80+
it "can dequeue items asynchronously" do
81+
reactor.async do |task|
82+
queue << 1
83+
queue << nil
84+
end
85+
86+
queue.async do |task, item|
87+
expect(item).to be == 1
88+
end
89+
end
90+
91+
with "#<<" do
92+
it "adds an item to the queue" do
93+
queue << :item
94+
expect(queue.size).to be == 1
95+
expect(queue.dequeue).to be == :item
96+
end
97+
end
98+
99+
with "#size" do
100+
it "returns queue size" do
101+
expect(queue.size).to be == 0
102+
queue.enqueue("Hello World")
103+
expect(queue.size).to be == 1
104+
end
105+
end
106+
107+
with "#signal" do
108+
it "can signal with an item" do
109+
queue.signal(:item)
110+
expect(queue.dequeue).to be == :item
111+
end
112+
end
113+
114+
with "#wait" do
115+
it "can wait for an item" do
116+
reactor.async do |task|
117+
queue.enqueue(:item)
118+
end
119+
120+
expect(queue.wait).to be == :item
121+
end
122+
end
123+
124+
with "an empty queue" do
125+
it "is expected to be empty" do
126+
expect(queue).to be(:empty?)
127+
end
128+
end
129+
130+
with "task finishing queue" do
131+
it "can signal task completion" do
132+
3.times do
133+
Async(finished: queue) do
134+
:result
135+
end
136+
end
137+
138+
3.times do
139+
task = queue.dequeue
140+
expect(task.wait).to be == :result
141+
end
142+
end
143+
end
144+
145+
with "semaphore" do
146+
let(:capacity) {2}
147+
let(:semaphore) {Async::Semaphore.new(capacity)}
148+
let(:repeats) {capacity * 2}
149+
150+
it "should process several items limited by a semaphore" do
151+
count = 0
152+
153+
Async do
154+
repeats.times do
155+
queue.enqueue :item
156+
end
157+
158+
queue.enqueue nil
159+
end
160+
161+
queue.async(parent: semaphore) do |task|
162+
count += 1
163+
end
164+
165+
expect(count).to be == repeats
166+
end
167+
end
168+
169+
it_behaves_like Async::ChainableAsync do
170+
def before
171+
chainable.enqueue(:item)
172+
173+
# The limited queue may block.
174+
Async do
175+
chainable.enqueue(nil)
176+
end
177+
178+
super
179+
end
180+
end
181+
end
182+
end

fixtures/async/chainable_async.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2019-2024, by Samuel Williams.
5+
6+
module Async
7+
ChainableAsync = Sus::Shared("chainable async") do
8+
let(:parent) {Object.new}
9+
let(:chainable) {subject.new(parent: parent)}
10+
11+
it "should chain async to parent" do
12+
expect(parent).to receive(:async).and_return(nil)
13+
14+
chainable.async do
15+
# Nothing.
16+
end
17+
end
18+
end
19+
end

fixtures/chainable_async.rb

Lines changed: 0 additions & 17 deletions
This file was deleted.

fixtures/timer_quantum.rb

Lines changed: 0 additions & 46 deletions
This file was deleted.

gems.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
gem "metrics"
3030

3131
gem "sus-fixtures-async"
32-
gem "sus-fixtures-console", "~> 0.3"
32+
gem "sus-fixtures-console"
33+
gem "sus-fixtures-time"
3334

3435
gem "bake-test"
3536
gem "bake-test-external"

lib/async/limited_queue.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2024, by Samuel Williams.
5+
6+
# The implementation lives in `queue.rb` but later we may move it here for better autoload/inference.
7+
require_relative "queue"

test/async/barrier.rb

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
require "sus/fixtures/async"
99
require "async/semaphore"
1010

11-
require "chainable_async"
12-
require "timer_quantum"
11+
require "async/chainable_async"
1312

1413
describe Async::Barrier do
1514
include Sus::Fixtures::Async::ReactorContext
@@ -40,7 +39,7 @@
4039

4140
duration = Async::Clock.measure{barrier.wait}
4241

43-
expect(duration).to be_within(Q*repeats).of(delay)
42+
expect(duration).to be_within(repeats * Sus::Fixtures::Time::QUANTUM).of(delay)
4443
expect(finished).to be == repeats
4544
expect(barrier).to be(:empty?)
4645
end
@@ -186,5 +185,5 @@
186185
end
187186
end
188187

189-
it_behaves_like ChainableAsync
188+
it_behaves_like Async::ChainableAsync
190189
end

test/async/clock.rb

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
# Copyright, 2018-2024, by Samuel Williams.
55

66
require "async/clock"
7-
8-
require "timer_quantum"
7+
require "sus/fixtures/time/quantum"
98

109
describe Async::Clock do
1110
let(:clock) {subject.new}
@@ -15,7 +14,7 @@
1514
sleep 0.01
1615
end
1716

18-
expect(duration).to be_within(Q).of(0.01)
17+
expect(duration).to be_within(Sus::Fixtures::Time::QUANTUM).of(0.01)
1918
end
2019

2120
it "can get current offset" do
@@ -29,7 +28,7 @@
2928
clock.stop!
3029
end
3130

32-
expect(clock.total).to be_within(2 * Q).of(0.02)
31+
expect(clock.total).to be_within(2 * Sus::Fixtures::Time::QUANTUM).of(0.02)
3332
end
3433

3534
with "#total" do

test/async/idler.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
require "async/idler"
77
require "sus/fixtures/async"
88

9-
require "chainable_async"
9+
require "async/chainable_async"
1010

1111
describe Async::Idler do
1212
include Sus::Fixtures::Async::ReactorContext
@@ -33,5 +33,5 @@
3333
expect(Fiber.scheduler.load).to be_within(0.1).of(0.5)
3434
end
3535

36-
it_behaves_like ChainableAsync
36+
it_behaves_like Async::ChainableAsync
3737
end

0 commit comments

Comments
 (0)