Skip to content

bpo-14976: Reentrant simple queue #3346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Jan 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9c3431b
Add queue.SimpleQueue type
pitrou Sep 3, 2017
5d2f5cc
Add C version of SimpleQueue
pitrou Sep 4, 2017
d3e890c
Docstrings and comments
pitrou Sep 5, 2017
c0a087a
Simplify some code away.
pitrou Sep 5, 2017
fa67c79
Fix expectation in test
pitrou Sep 5, 2017
a58acf0
Updated generated files
pitrou Sep 5, 2017
0f86f5c
Try to support --without-threads
pitrou Sep 5, 2017
3d9c709
Fix EINTR-handling logic
pitrou Sep 5, 2017
5620970
Merge branch 'master' of https://github.com/python/cpython into simpl…
pitrou Sep 5, 2017
a76c041
Add qsize(), empty() and __sizeof__()
pitrou Sep 5, 2017
bc07e44
Add optional block and timeout args to put(), for compatibility.
pitrou Sep 5, 2017
f8fe713
Add put_nowait() and get_nowait() compatibility methods
pitrou Sep 5, 2017
3e1b27d
Add reentrancy tests
pitrou Sep 6, 2017
a250d56
Merge branch 'master' of https://github.com/python/cpython into simpl…
pitrou Sep 6, 2017
3d761a8
Merge branch 'master' of https://github.com/python/cpython into simpl…
pitrou Sep 6, 2017
fdb5379
Try to add _queue module to Visual Studio build files
pitrou Sep 6, 2017
f1d731d
Try to fix compilation on MSVC
pitrou Sep 6, 2017
6f66a52
Change import idiom (and fix __all__)
pitrou Sep 6, 2017
8847e03
Add a test that references to items are not kept longer than they should
pitrou Sep 7, 2017
1773aff
Add docs
pitrou Sep 7, 2017
57e6675
Merge branch 'master' into simple_queue
pitrou Sep 7, 2017
4041847
Add NEWS blurb
pitrou Sep 7, 2017
16cbd05
Merge branch 'master' of https://github.com/python/cpython into simpl…
pitrou Dec 19, 2017
c75ccf6
Remove obsolete WITH_THREAD guard
pitrou Dec 19, 2017
c3aeb82
Merge branch 'master' into simple_queue
pitrou Jan 15, 2018
c963121
Update clinic file
pitrou Jan 15, 2018
3337e24
Address review comments
pitrou Jan 15, 2018
3646e99
Use Argument Clinic for __new__
pitrou Jan 15, 2018
0d03ff5
Remove __sizeof__ implementation as suggested by Serhiy
pitrou Jan 15, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 70 additions & 3 deletions Doc/library/queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@ the first retrieved (operating like a stack). With a priority queue,
the entries are kept sorted (using the :mod:`heapq` module) and the
lowest valued entry is retrieved first.

Internally, the module uses locks to temporarily block competing threads;
however, it is not designed to handle reentrancy within a thread.
Internally, those three types of queues use locks to temporarily block
competing threads; however, they are not designed to handle reentrancy
within a thread.

In addition, the module implements a "simple"
:abbr:`FIFO (first-in, first-out)` queue type where
specific implementations can provide additional guarantees
in exchange for the smaller functionality.

The :mod:`queue` module defines the following classes and exceptions:

Expand Down Expand Up @@ -67,6 +73,14 @@ The :mod:`queue` module defines the following classes and exceptions:
priority: int
item: Any=field(compare=False)

.. class:: SimpleQueue()

Constructor for an unbounded :abbr:`FIFO (first-in, first-out)` queue.
Simple queues lack advanced functionality such as task tracking.

.. versionadded:: 3.7


.. exception:: Empty

Exception raised when non-blocking :meth:`~Queue.get` (or
Expand Down Expand Up @@ -201,6 +215,60 @@ Example of how to wait for enqueued tasks to be completed::
t.join()


SimpleQueue Objects
-------------------

:class:`SimpleQueue` objects provide the public methods described below.

.. method:: SimpleQueue.qsize()

Return the approximate size of the queue. Note, qsize() > 0 doesn't
guarantee that a subsequent get() will not block.


.. method:: SimpleQueue.empty()

Return ``True`` if the queue is empty, ``False`` otherwise. If empty()
returns ``False`` it doesn't guarantee that a subsequent call to get()
will not block.


.. method:: SimpleQueue.put(item, block=True, timeout=None)

Put *item* into the queue. The method never blocks and always succeeds
(except for potential low-level errors such as failure to allocate memory).
The optional args *block* and *timeout* are ignored and only provided
for compatibility with :meth:`Queue.put`.

.. impl-detail::
This method has a C implementation which is reentrant. That is, a
``put()`` or ``get()`` call can be interrupted by another ``put()``
call in the same thread without deadlocking or corrupting internal
state inside the queue. This makes it appropriate for use in
destructors such as ``__del__`` methods or :mod:`weakref` callbacks.


.. method:: SimpleQueue.put_nowait(item)

Equivalent to ``put(item)``, provided for compatibility with
:meth:`Queue.put_nowait`.


.. method:: SimpleQueue.get(block=True, timeout=None)

Remove and return an item from the queue. If optional args *block* is true and
*timeout* is ``None`` (the default), block if necessary until an item is available.
If *timeout* is a positive number, it blocks at most *timeout* seconds and
raises the :exc:`Empty` exception if no item was available within that time.
Otherwise (*block* is false), return an item if one is immediately available,
else raise the :exc:`Empty` exception (*timeout* is ignored in that case).


.. method:: SimpleQueue.get_nowait()

Equivalent to ``get(False)``.


.. seealso::

Class :class:`multiprocessing.Queue`
Expand All @@ -210,4 +278,3 @@ Example of how to wait for enqueued tasks to be completed::
:class:`collections.deque` is an alternative implementation of unbounded
queues with fast atomic :meth:`~collections.deque.append` and
:meth:`~collections.deque.popleft` operations that do not require locking.

86 changes: 82 additions & 4 deletions Lib/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,26 @@
from collections import deque
from heapq import heappush, heappop
from time import monotonic as time
try:
from _queue import SimpleQueue
except ImportError:
SimpleQueue = None

__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue']

class Empty(Exception):
'Exception raised by Queue.get(block=0)/get_nowait().'
pass

try:
from _queue import Empty
except AttributeError:
class Empty(Exception):
'Exception raised by Queue.get(block=0)/get_nowait().'
pass

class Full(Exception):
'Exception raised by Queue.put(block=0)/put_nowait().'
pass


class Queue:
'''Create a queue object with a given maximum size.

Expand Down Expand Up @@ -241,3 +250,72 @@ def _put(self, item):

def _get(self):
return self.queue.pop()


class _PySimpleQueue:
'''Simple, unbounded FIFO queue.

This pure Python implementation is not reentrant.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point of the pure python version existing at all of it's raison d'être isn't possible? Even on PyPy if they took this module and haven't implemented a _queue equivalent it would do the wrong thing for code that wants to use SimpleQueue for reentrant API reasons.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know its presence is debattable, but providing a pure Python version is current policy. Besides, being reentrant is documented as an implementation detail, not an intrinsic property of the API.

'''
# Note: while this pure Python version provides fairness
# (by using a threading.Semaphore which is itself fair, being based
# on threading.Condition), fairness is not part of the API contract.
# This allows the C version to use a different implementation.

def __init__(self):
self._queue = deque()
self._count = threading.Semaphore(0)

def put(self, item, block=True, timeout=None):
'''Put the item on the queue.

The optional 'block' and 'timeout' arguments are ignored, as this method
never blocks. They are provided for compatibility with the Queue class.
'''
self._queue.append(item)
self._count.release()

def get(self, block=True, timeout=None):
'''Remove and return an item from the queue.

If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
'''
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
if not self._count.acquire(block, timeout):
raise Empty
return self._queue.popleft()

def put_nowait(self, item):
'''Put an item into the queue without blocking.

This is exactly equivalent to `put(item)` and is only provided
for compatibility with the Queue class.
'''
return self.put(item, block=False)

def get_nowait(self):
'''Remove and return an item from the queue without blocking.

Only get an item if one is immediately available. Otherwise
raise the Empty exception.
'''
return self.get(block=False)

def empty(self):
'''Return True if the queue is empty, False otherwise (not reliable!).'''
return len(self._queue) == 0

def qsize(self):
'''Return the approximate size of the queue (not reliable!).'''
return len(self._queue)


if SimpleQueue is None:
SimpleQueue = _PySimpleQueue
Loading