Skip to content

Commit f9d15f9

Browse files
authored
Fix for non-blocking sendall (#378)
* Removed BufferedSocket and non-blocking func * FIx for py35
1 parent f1b34a8 commit f9d15f9

File tree

5 files changed

+13
-192
lines changed

5 files changed

+13
-192
lines changed

neo4j/io/_bolt3.py

Lines changed: 1 addition & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def __init__(self, unresolved_address, sock, *, auth=None, **config):
8282
self.socket = sock
8383
self.server = ServerInfo(Address(sock.getpeername()), Bolt3.PROTOCOL_VERSION)
8484
self.outbox = Outbox()
85-
self.inbox = Inbox(BufferedSocket(self.socket, 32768), on_error=self._set_defunct)
85+
self.inbox = Inbox(self.socket, on_error=self._set_defunct)
8686
self.packer = Packer(self.outbox)
8787
self.unpacker = Unpacker(self.inbox)
8888
self.responses = deque()
@@ -463,96 +463,6 @@ def view(self):
463463
return memoryview(self._data[:end])
464464

465465

466-
class BufferedSocket:
467-
""" Wrapper for a regular socket, with an added a dynamically-resizing
468-
receive buffer to reduce the number of calls to recv.
469-
470-
NOTE: not all socket methods are implemented yet
471-
"""
472-
473-
def __init__(self, socket_, initial_capacity=0):
474-
self.socket = socket_
475-
self.buffer = bytearray(initial_capacity)
476-
self.r_pos = 0
477-
self.w_pos = 0
478-
479-
def _fill_buffer(self, min_bytes):
480-
""" Fill the buffer with at least `min_bytes` bytes, requesting more if
481-
the buffer has space. Internally, this method attempts to do as little
482-
allocation as possible and make as few calls to socket.recv as
483-
possible.
484-
"""
485-
# First, we need to calculate how much spare space exists between the
486-
# write cursor and the end of the buffer.
487-
space_at_end = len(self.buffer) - self.w_pos
488-
if min_bytes <= space_at_end:
489-
# If there's at least enough here for the minimum number of bytes
490-
# we need, then do nothing
491-
#
492-
pass
493-
elif min_bytes <= space_at_end + self.r_pos:
494-
# If the buffer contains enough space, but it's split between the
495-
# end of the buffer and recyclable space at the start of the
496-
# buffer, then recycle that space by pushing the remaining data
497-
# towards the front.
498-
#
499-
# print("Recycling {} bytes".format(self.r_pos))
500-
size = self.w_pos - self.r_pos
501-
view = memoryview(self.buffer)
502-
self.buffer[0:size] = view[self.r_pos:self.w_pos]
503-
self.r_pos = 0
504-
self.w_pos = size
505-
else:
506-
# Otherwise, there's just not enough space whichever way you shake
507-
# it. So, rebuild the buffer from scratch, taking the unread data
508-
# and appending empty space big enough to hold the minimum number
509-
# of bytes we're looking for.
510-
#
511-
# print("Rebuilding buffer from {} bytes ({} used) to "
512-
# "{} bytes".format(len(self.buffer),
513-
# self.w_pos - self.r_pos,
514-
# self.w_pos - self.r_pos + min_bytes))
515-
self.buffer = (self.buffer[self.r_pos:self.w_pos] +
516-
bytearray(min_bytes))
517-
self.w_pos -= self.r_pos
518-
self.r_pos = 0
519-
min_end = self.w_pos + min_bytes
520-
end = len(self.buffer)
521-
view = memoryview(self.buffer)
522-
self.socket.setblocking(0)
523-
while self.w_pos < min_end:
524-
ready_to_read, _, _ = select([self.socket], [], [])
525-
subview = view[self.w_pos:end]
526-
n = self.socket.recv_into(subview, end - self.w_pos)
527-
if n == 0:
528-
raise OSError("No data")
529-
self.w_pos += n
530-
531-
def recv_into(self, buffer, n_bytes=0, flags=0):
532-
""" Intercepts a regular socket.recv_into call, taking data from the
533-
internal buffer, if available. If not enough data exists in the buffer,
534-
more will be retrieved first.
535-
536-
Unlike the lower-level call, this method will never return 0, instead
537-
raising an OSError if no data is returned on the underlying socket.
538-
539-
:param buffer:
540-
:param n_bytes:
541-
:param flags:
542-
:raises OSError:
543-
:return:
544-
"""
545-
available = self.w_pos - self.r_pos
546-
required = n_bytes - available
547-
if required > 0:
548-
self._fill_buffer(required)
549-
view = memoryview(self.buffer)
550-
end = self.r_pos + n_bytes
551-
buffer[:] = view[self.r_pos:end]
552-
self.r_pos = end
553-
return n_bytes
554-
555-
556466
class Inbox(MessageInbox):
557467

558468
def __next__(self):

neo4j/io/_bolt4x0.py

Lines changed: 1 addition & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def __init__(self, unresolved_address, sock, *, auth=None, **config):
8282
self.socket = sock
8383
self.server = ServerInfo(Address(sock.getpeername()), Bolt4x0.PROTOCOL_VERSION)
8484
self.outbox = Outbox()
85-
self.inbox = Inbox(BufferedSocket(self.socket, 32768), on_error=self._set_defunct)
85+
self.inbox = Inbox(self.socket, on_error=self._set_defunct)
8686
self.packer = Packer(self.outbox)
8787
self.unpacker = Unpacker(self.inbox)
8888
self.responses = deque()
@@ -461,96 +461,6 @@ def view(self):
461461
return memoryview(self._data[:end])
462462

463463

464-
class BufferedSocket:
465-
""" Wrapper for a regular socket, with an added a dynamically-resizing
466-
receive buffer to reduce the number of calls to recv.
467-
468-
NOTE: not all socket methods are implemented yet
469-
"""
470-
471-
def __init__(self, socket_, initial_capacity=0):
472-
self.socket = socket_
473-
self.buffer = bytearray(initial_capacity)
474-
self.r_pos = 0
475-
self.w_pos = 0
476-
477-
def _fill_buffer(self, min_bytes):
478-
""" Fill the buffer with at least `min_bytes` bytes, requesting more if
479-
the buffer has space. Internally, this method attempts to do as little
480-
allocation as possible and make as few calls to socket.recv as
481-
possible.
482-
"""
483-
# First, we need to calculate how much spare space exists between the
484-
# write cursor and the end of the buffer.
485-
space_at_end = len(self.buffer) - self.w_pos
486-
if min_bytes <= space_at_end:
487-
# If there's at least enough here for the minimum number of bytes
488-
# we need, then do nothing
489-
#
490-
pass
491-
elif min_bytes <= space_at_end + self.r_pos:
492-
# If the buffer contains enough space, but it's split between the
493-
# end of the buffer and recyclable space at the start of the
494-
# buffer, then recycle that space by pushing the remaining data
495-
# towards the front.
496-
#
497-
# print("Recycling {} bytes".format(self.r_pos))
498-
size = self.w_pos - self.r_pos
499-
view = memoryview(self.buffer)
500-
self.buffer[0:size] = view[self.r_pos:self.w_pos]
501-
self.r_pos = 0
502-
self.w_pos = size
503-
else:
504-
# Otherwise, there's just not enough space whichever way you shake
505-
# it. So, rebuild the buffer from scratch, taking the unread data
506-
# and appending empty space big enough to hold the minimum number
507-
# of bytes we're looking for.
508-
#
509-
# print("Rebuilding buffer from {} bytes ({} used) to "
510-
# "{} bytes".format(len(self.buffer),
511-
# self.w_pos - self.r_pos,
512-
# self.w_pos - self.r_pos + min_bytes))
513-
self.buffer = (self.buffer[self.r_pos:self.w_pos] +
514-
bytearray(min_bytes))
515-
self.w_pos -= self.r_pos
516-
self.r_pos = 0
517-
min_end = self.w_pos + min_bytes
518-
end = len(self.buffer)
519-
view = memoryview(self.buffer)
520-
self.socket.setblocking(0)
521-
while self.w_pos < min_end:
522-
ready_to_read, _, _ = select([self.socket], [], [])
523-
subview = view[self.w_pos:end]
524-
n = self.socket.recv_into(subview, end - self.w_pos)
525-
if n == 0:
526-
raise OSError("No data")
527-
self.w_pos += n
528-
529-
def recv_into(self, buffer, n_bytes=0, flags=0):
530-
""" Intercepts a regular socket.recv_into call, taking data from the
531-
internal buffer, if available. If not enough data exists in the buffer,
532-
more will be retrieved first.
533-
534-
Unlike the lower-level call, this method will never return 0, instead
535-
raising an OSError if no data is returned on the underlying socket.
536-
537-
:param buffer:
538-
:param n_bytes:
539-
:param flags:
540-
:raises OSError:
541-
:return:
542-
"""
543-
available = self.w_pos - self.r_pos
544-
required = n_bytes - available
545-
if required > 0:
546-
self._fill_buffer(required)
547-
view = memoryview(self.buffer)
548-
end = self.r_pos + n_bytes
549-
buffer[:] = view[self.r_pos:end]
550-
self.r_pos = end
551-
return n_bytes
552-
553-
554464
class Inbox(MessageInbox):
555465

556466
def __next__(self):

tests/unit/io/conftest.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ def __init__(self, address):
3131
self.captured = b""
3232
self.messages = MessageInbox(self, on_error=print)
3333

34-
def setblocking(self, flag):
35-
pass
36-
3734
def getsockname(self):
3835
return "127.0.0.1", 0xFFFF
3936

tests/unit/io/test_direct.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ class FakeSocket:
4141
def __init__(self, address):
4242
self.address = address
4343

44-
def setblocking(self, flag):
45-
pass
46-
4744
def getpeername(self):
4845
return self.address
4946

tests/unit/test_addressing.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,14 @@ def test_address_resolve_with_custom_resolver():
236236
resolved = address.resolve(resolver=custom_resolver)
237237
assert isinstance(resolved, Address) is False
238238
assert isinstance(resolved, list) is True
239-
assert len(resolved) == 3
240-
assert resolved[0] == IPv4Address(('127.0.0.1', 7687))
241-
assert resolved[1] == IPv6Address(('::1', 1234, 0, 0))
242-
assert resolved[2] == IPv4Address(('127.0.0.1', 1234))
239+
if len(resolved) == 2:
240+
# IPv4 only
241+
assert resolved[0] == IPv4Address(('127.0.0.1', 7687))
242+
assert resolved[1] == IPv4Address(('127.0.0.1', 1234))
243+
elif len(resolved) == 3:
244+
# IPv4 and IPv6
245+
assert resolved[0] == IPv4Address(('127.0.0.1', 7687))
246+
assert resolved[1] == IPv6Address(('::1', 1234, 0, 0))
247+
assert resolved[2] == IPv4Address(('127.0.0.1', 1234))
248+
else:
249+
assert False

0 commit comments

Comments
 (0)