Skip to content

Commit 9c3431b

Browse files
committed
Add queue.SimpleQueue type
1 parent bca4939 commit 9c3431b

File tree

2 files changed

+163
-4
lines changed

2 files changed

+163
-4
lines changed

Lib/queue.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,3 +244,21 @@ def _put(self, item):
244244

245245
def _get(self):
246246
return self.queue.pop()
247+
248+
249+
class SimpleQueue:
250+
251+
def __init__(self):
252+
self._queue = deque()
253+
self._count = threading.Semaphore(0)
254+
255+
def put(self, item):
256+
self._queue.append(item)
257+
self._count.release()
258+
259+
def get(self, block=True, timeout=None):
260+
if timeout is not None and timeout < 0:
261+
raise ValueError("'timeout' must be a non-negative number")
262+
if not self._count.acquire(block, timeout):
263+
raise Empty
264+
return self._queue.popleft()

Lib/test/test_queue.py

Lines changed: 145 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Some simple queue module tests, plus some failure conditions
22
# to ensure the Queue locks remain stable.
3+
import collections
34
import queue
5+
import random
46
import time
57
import unittest
68
from test import support
@@ -90,7 +92,7 @@ def setUp(self):
9092
self.cum = 0
9193
self.cumlock = threading.Lock()
9294

93-
def simple_queue_test(self, q):
95+
def basic_queue_test(self, q):
9496
if q.qsize():
9597
raise RuntimeError("Call this function with an empty queue")
9698
self.assertTrue(q.empty())
@@ -193,12 +195,12 @@ def test_queue_join(self):
193195
else:
194196
self.fail("Did not detect task count going negative")
195197

196-
def test_simple_queue(self):
198+
def test_basic(self):
197199
# Do it a couple of times on the same queue.
198200
# Done twice to make sure works with same instance reused.
199201
q = self.type2test(QUEUE_SIZE)
200-
self.simple_queue_test(q)
201-
self.simple_queue_test(q)
202+
self.basic_queue_test(q)
203+
self.basic_queue_test(q)
202204

203205
def test_negative_timeout_raises_exception(self):
204206
q = self.type2test(QUEUE_SIZE)
@@ -354,5 +356,144 @@ def test_failing_queue(self):
354356
self.failing_queue_test(q)
355357

356358

359+
class SimpleQueueTest(unittest.TestCase):
360+
type2test = queue.SimpleQueue
361+
362+
def setUp(self):
363+
self.q = self.type2test()
364+
365+
def feed(self, q, seq, rnd):
366+
while True:
367+
try:
368+
val = seq.pop()
369+
except IndexError:
370+
return
371+
q.put(val)
372+
if rnd.random() > 0.5:
373+
time.sleep(rnd.random() * 1e-3)
374+
375+
def consume(self, q, results, sentinel):
376+
while True:
377+
val = q.get()
378+
if val == sentinel:
379+
return
380+
results.append(val)
381+
382+
def consume_nonblock(self, q, results, sentinel):
383+
while True:
384+
while True:
385+
try:
386+
val = q.get(block=False)
387+
except queue.Empty:
388+
time.sleep(1e-5)
389+
else:
390+
break
391+
if val == sentinel:
392+
return
393+
results.append(val)
394+
395+
def consume_timeout(self, q, results, sentinel):
396+
while True:
397+
while True:
398+
try:
399+
val = q.get(timeout=1e-5)
400+
except queue.Empty:
401+
pass
402+
else:
403+
break
404+
if val == sentinel:
405+
return
406+
results.append(val)
407+
408+
def run_threads(self, n_feeders, n_consumers, q, inputs,
409+
feed_func, consume_func):
410+
results = []
411+
sentinel = None
412+
seq = inputs + [sentinel] * n_consumers
413+
seq.reverse()
414+
rnd = random.Random(42)
415+
416+
exceptions = []
417+
def log_exceptions(f):
418+
def wrapper(*args, **kwargs):
419+
try:
420+
f(*args, **kwargs)
421+
except BaseException as e:
422+
exceptions.append(e)
423+
return wrapper
424+
425+
feeders = [threading.Thread(target=log_exceptions(feed_func),
426+
args=(q, seq, rnd))
427+
for i in range(n_feeders)]
428+
consumers = [threading.Thread(target=log_exceptions(consume_func),
429+
args=(q, results, sentinel))
430+
for i in range(n_consumers)]
431+
432+
with support.start_threads(feeders + consumers):
433+
pass
434+
435+
self.assertFalse(exceptions)
436+
437+
return results
438+
439+
def test_simple(self):
440+
q = self.q
441+
q.put(1)
442+
q.put(2)
443+
q.put(3)
444+
q.put(4)
445+
self.assertEqual(q.get(), 1)
446+
self.assertEqual(q.get(), 2)
447+
self.assertEqual(q.get(block=False), 3)
448+
self.assertEqual(q.get(timeout=0.1), 4)
449+
with self.assertRaises(queue.Empty):
450+
q.get(block=False)
451+
with self.assertRaises(queue.Empty):
452+
q.get(timeout=1e-3)
453+
454+
def test_negative_timeout_raises_exception(self):
455+
q = self.q
456+
q.put(1)
457+
with self.assertRaises(ValueError):
458+
q.get(timeout=-1)
459+
460+
def test_order(self):
461+
N = 3
462+
q = self.q
463+
inputs = list(range(100))
464+
results = self.run_threads(N, 1, q, inputs, self.feed, self.consume)
465+
466+
# One consumer => results appended in well-defined order
467+
self.assertEqual(results, inputs)
468+
469+
def test_many_threads(self):
470+
N = 50
471+
q = self.q
472+
inputs = list(range(10000))
473+
results = self.run_threads(N, N, q, inputs, self.feed, self.consume)
474+
475+
# Multiple consumers without synchronization append the
476+
# results in random order
477+
self.assertEqual(sorted(results), inputs)
478+
479+
def test_many_threads_nonblock(self):
480+
N = 50
481+
q = self.q
482+
inputs = list(range(10000))
483+
results = self.run_threads(N, N, q, inputs,
484+
self.feed, self.consume_nonblock)
485+
486+
self.assertEqual(sorted(results), inputs)
487+
488+
def test_many_threads_timeout(self):
489+
N = 50
490+
q = self.q
491+
inputs = list(range(10000))
492+
results = self.run_threads(N, N, q, inputs,
493+
self.feed, self.consume_timeout)
494+
495+
self.assertEqual(sorted(results), inputs)
496+
497+
357498
if __name__ == "__main__":
358499
unittest.main()

0 commit comments

Comments
 (0)