Skip to content

Commit 87cf220

Browse files
committed
Issue #11743: Rewrite multiprocessing connection classes in pure Python.
1 parent df77e3d commit 87cf220

File tree

15 files changed

+490
-983
lines changed

15 files changed

+490
-983
lines changed

Lib/multiprocessing/connection.py

Lines changed: 300 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,27 @@
3434

3535
__all__ = [ 'Client', 'Listener', 'Pipe' ]
3636

37+
import io
3738
import os
3839
import sys
40+
import pickle
41+
import select
3942
import socket
43+
import struct
4044
import errno
4145
import time
4246
import tempfile
4347
import itertools
4448

4549
import _multiprocessing
46-
from multiprocessing import current_process, AuthenticationError
50+
from multiprocessing import current_process, AuthenticationError, BufferTooShort
4751
from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
48-
from multiprocessing.forking import duplicate, close
49-
52+
try:
53+
from _multiprocessing import win32
54+
except ImportError:
55+
if sys.platform == 'win32':
56+
raise
57+
win32 = None
5058

5159
#
5260
#
@@ -110,6 +118,281 @@ def address_type(address):
110118
else:
111119
raise ValueError('address type of %r unrecognized' % address)
112120

121+
#
122+
# Connection classes
123+
#
124+
125+
class _ConnectionBase:
126+
_handle = None
127+
128+
def __init__(self, handle, readable=True, writable=True):
129+
handle = handle.__index__()
130+
if handle < 0:
131+
raise ValueError("invalid handle")
132+
if not readable and not writable:
133+
raise ValueError(
134+
"at least one of `readable` and `writable` must be True")
135+
self._handle = handle
136+
self._readable = readable
137+
self._writable = writable
138+
139+
def __del__(self):
140+
if self._handle is not None:
141+
self._close()
142+
143+
def _check_closed(self):
144+
if self._handle is None:
145+
raise IOError("handle is closed")
146+
147+
def _check_readable(self):
148+
if not self._readable:
149+
raise IOError("connection is write-only")
150+
151+
def _check_writable(self):
152+
if not self._writable:
153+
raise IOError("connection is read-only")
154+
155+
def _bad_message_length(self):
156+
if self._writable:
157+
self._readable = False
158+
else:
159+
self.close()
160+
raise IOError("bad message length")
161+
162+
@property
163+
def closed(self):
164+
"""True if the connection is closed"""
165+
return self._handle is None
166+
167+
@property
168+
def readable(self):
169+
"""True if the connection is readable"""
170+
return self._readable
171+
172+
@property
173+
def writable(self):
174+
"""True if the connection is writable"""
175+
return self._writable
176+
177+
def fileno(self):
178+
"""File descriptor or handle of the connection"""
179+
self._check_closed()
180+
return self._handle
181+
182+
def close(self):
183+
"""Close the connection"""
184+
if self._handle is not None:
185+
try:
186+
self._close()
187+
finally:
188+
self._handle = None
189+
190+
def send_bytes(self, buf, offset=0, size=None):
191+
"""Send the bytes data from a bytes-like object"""
192+
self._check_closed()
193+
self._check_writable()
194+
m = memoryview(buf)
195+
# HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
196+
if m.itemsize > 1:
197+
m = memoryview(bytes(m))
198+
n = len(m)
199+
if offset < 0:
200+
raise ValueError("offset is negative")
201+
if n < offset:
202+
raise ValueError("buffer length < offset")
203+
if size is None:
204+
size = n - offset
205+
elif size < 0:
206+
raise ValueError("size is negative")
207+
elif offset + size > n:
208+
raise ValueError("buffer length < offset + size")
209+
self._send_bytes(m[offset:offset + size])
210+
211+
def send(self, obj):
212+
"""Send a (picklable) object"""
213+
self._check_closed()
214+
self._check_writable()
215+
buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
216+
self._send_bytes(memoryview(buf))
217+
218+
def recv_bytes(self, maxlength=None):
219+
"""
220+
Receive bytes data as a bytes object.
221+
"""
222+
self._check_closed()
223+
self._check_readable()
224+
if maxlength is not None and maxlength < 0:
225+
raise ValueError("negative maxlength")
226+
buf = self._recv_bytes(maxlength)
227+
if buf is None:
228+
self._bad_message_length()
229+
return buf.getvalue()
230+
231+
def recv_bytes_into(self, buf, offset=0):
232+
"""
233+
Receive bytes data into a writeable buffer-like object.
234+
Return the number of bytes read.
235+
"""
236+
self._check_closed()
237+
self._check_readable()
238+
with memoryview(buf) as m:
239+
# Get bytesize of arbitrary buffer
240+
itemsize = m.itemsize
241+
bytesize = itemsize * len(m)
242+
if offset < 0:
243+
raise ValueError("negative offset")
244+
elif offset > bytesize:
245+
raise ValueError("offset too large")
246+
result = self._recv_bytes()
247+
size = result.tell()
248+
if bytesize < offset + size:
249+
raise BufferTooShort(result.getvalue())
250+
# Message can fit in dest
251+
result.seek(0)
252+
result.readinto(m[offset // itemsize :
253+
(offset + size) // itemsize])
254+
return size
255+
256+
def recv(self):
257+
"""Receive a (picklable) object"""
258+
self._check_closed()
259+
self._check_readable()
260+
buf = self._recv_bytes()
261+
return pickle.loads(buf.getbuffer())
262+
263+
def poll(self, timeout=0.0):
264+
"""Whether there is any input available to be read"""
265+
self._check_closed()
266+
self._check_readable()
267+
if timeout < 0.0:
268+
timeout = None
269+
return self._poll(timeout)
270+
271+
272+
if win32:
273+
274+
class PipeConnection(_ConnectionBase):
275+
"""
276+
Connection class based on a Windows named pipe.
277+
"""
278+
279+
def _close(self):
280+
win32.CloseHandle(self._handle)
281+
282+
def _send_bytes(self, buf):
283+
nwritten = win32.WriteFile(self._handle, buf)
284+
assert nwritten == len(buf)
285+
286+
def _recv_bytes(self, maxsize=None):
287+
buf = io.BytesIO()
288+
bufsize = 512
289+
if maxsize is not None:
290+
bufsize = min(bufsize, maxsize)
291+
try:
292+
firstchunk, complete = win32.ReadFile(self._handle, bufsize)
293+
except IOError as e:
294+
if e.errno == win32.ERROR_BROKEN_PIPE:
295+
raise EOFError
296+
raise
297+
lenfirstchunk = len(firstchunk)
298+
buf.write(firstchunk)
299+
if complete:
300+
return buf
301+
navail, nleft = win32.PeekNamedPipe(self._handle)
302+
if maxsize is not None and lenfirstchunk + nleft > maxsize:
303+
return None
304+
lastchunk, complete = win32.ReadFile(self._handle, nleft)
305+
assert complete
306+
buf.write(lastchunk)
307+
return buf
308+
309+
def _poll(self, timeout):
310+
navail, nleft = win32.PeekNamedPipe(self._handle)
311+
if navail > 0:
312+
return True
313+
elif timeout == 0.0:
314+
return False
315+
# Setup a polling loop (translated straight from old
316+
# pipe_connection.c)
317+
if timeout < 0.0:
318+
deadline = None
319+
else:
320+
deadline = time.time() + timeout
321+
delay = 0.001
322+
max_delay = 0.02
323+
while True:
324+
time.sleep(delay)
325+
navail, nleft = win32.PeekNamedPipe(self._handle)
326+
if navail > 0:
327+
return True
328+
if deadline and time.time() > deadline:
329+
return False
330+
if delay < max_delay:
331+
delay += 0.001
332+
333+
334+
class Connection(_ConnectionBase):
335+
"""
336+
Connection class based on an arbitrary file descriptor (Unix only), or
337+
a socket handle (Windows).
338+
"""
339+
340+
if win32:
341+
def _close(self):
342+
win32.closesocket(self._handle)
343+
_write = win32.send
344+
_read = win32.recv
345+
else:
346+
def _close(self):
347+
os.close(self._handle)
348+
_write = os.write
349+
_read = os.read
350+
351+
def _send(self, buf, write=_write):
352+
remaining = len(buf)
353+
while True:
354+
n = write(self._handle, buf)
355+
remaining -= n
356+
if remaining == 0:
357+
break
358+
buf = buf[n:]
359+
360+
def _recv(self, size, read=_read):
361+
buf = io.BytesIO()
362+
remaining = size
363+
while remaining > 0:
364+
chunk = read(self._handle, remaining)
365+
n = len(chunk)
366+
if n == 0:
367+
if remaining == size:
368+
raise EOFError
369+
else:
370+
raise IOError("got end of file during message")
371+
buf.write(chunk)
372+
remaining -= n
373+
return buf
374+
375+
def _send_bytes(self, buf):
376+
# For wire compatibility with 3.2 and lower
377+
n = len(buf)
378+
self._send(struct.pack("=i", len(buf)))
379+
# The condition is necessary to avoid "broken pipe" errors
380+
# when sending a 0-length buffer if the other end closed the pipe.
381+
if n > 0:
382+
self._send(buf)
383+
384+
def _recv_bytes(self, maxsize=None):
385+
buf = self._recv(4)
386+
size, = struct.unpack("=i", buf.getvalue())
387+
if maxsize is not None and size > maxsize:
388+
return None
389+
return self._recv(size)
390+
391+
def _poll(self, timeout):
392+
r = select.select([self._handle], [], [], timeout)[0]
393+
return bool(r)
394+
395+
113396
#
114397
# Public functions
115398
#
@@ -186,21 +469,19 @@ def Pipe(duplex=True):
186469
'''
187470
if duplex:
188471
s1, s2 = socket.socketpair()
189-
c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
190-
c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
472+
c1 = Connection(os.dup(s1.fileno()))
473+
c2 = Connection(os.dup(s2.fileno()))
191474
s1.close()
192475
s2.close()
193476
else:
194477
fd1, fd2 = os.pipe()
195-
c1 = _multiprocessing.Connection(fd1, writable=False)
196-
c2 = _multiprocessing.Connection(fd2, readable=False)
478+
c1 = Connection(fd1, writable=False)
479+
c2 = Connection(fd2, readable=False)
197480

198481
return c1, c2
199482

200483
else:
201484

202-
from _multiprocessing import win32
203-
204485
def Pipe(duplex=True):
205486
'''
206487
Returns pair of connection objects at either end of a pipe
@@ -234,8 +515,8 @@ def Pipe(duplex=True):
234515
if e.args[0] != win32.ERROR_PIPE_CONNECTED:
235516
raise
236517

237-
c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
238-
c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
518+
c1 = PipeConnection(h1, writable=duplex)
519+
c2 = PipeConnection(h2, readable=duplex)
239520

240521
return c1, c2
241522

@@ -266,7 +547,7 @@ def __init__(self, address, family, backlog=1):
266547
def accept(self):
267548
s, self._last_accepted = self._socket.accept()
268549
fd = duplicate(s.fileno())
269-
conn = _multiprocessing.Connection(fd)
550+
conn = Connection(fd)
270551
s.close()
271552
return conn
272553

@@ -298,7 +579,7 @@ def SocketClient(address):
298579
raise
299580

300581
fd = duplicate(s.fileno())
301-
conn = _multiprocessing.Connection(fd)
582+
conn = Connection(fd)
302583
return conn
303584

304585
#
@@ -345,7 +626,7 @@ def accept(self):
345626
except WindowsError as e:
346627
if e.args[0] != win32.ERROR_PIPE_CONNECTED:
347628
raise
348-
return _multiprocessing.PipeConnection(handle)
629+
return PipeConnection(handle)
349630

350631
@staticmethod
351632
def _finalize_pipe_listener(queue, address):
@@ -377,7 +658,7 @@ def PipeClient(address):
377658
win32.SetNamedPipeHandleState(
378659
h, win32.PIPE_READMODE_MESSAGE, None, None
379660
)
380-
return _multiprocessing.PipeConnection(h)
661+
return PipeConnection(h)
381662

382663
#
383664
# Authentication stuff
@@ -451,3 +732,7 @@ def XmlClient(*args, **kwds):
451732
global xmlrpclib
452733
import xmlrpc.client as xmlrpclib
453734
return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
735+
736+
737+
# Late import because of circular import
738+
from multiprocessing.forking import duplicate, close

0 commit comments

Comments
 (0)