Skip to content

UnitTest for blocking PubSub auto-reconnect #2256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions tests/test_pubsub.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import platform
import queue
import socket
import threading
import time
from unittest import mock
Expand Down Expand Up @@ -608,3 +610,121 @@ def test_pubsub_deadlock(self, master_host):
p = r.pubsub()
p.subscribe("my-channel-1", "my-channel-2")
pool.reset()


@pytest.mark.timeout(5, method="thread")
@pytest.mark.onlynoncluster
class TestPubSubAutoReconnect:
def test_reconnect_socket_error(self, r: redis.Redis):
"""
Test that a socket error will cause reconnect
"""
self.messages = queue.Queue()
self.pubsub = r.pubsub()
self.state = 0
self.cond = threading.Condition()

thread = threading.Thread(target=self.loop)
thread.start()
# get the initial connect message
message = self.messages.get(timeout=1)
assert message == {
"channel": b"foo",
"data": 1,
"pattern": None,
"type": "subscribe",
}
# now, disconnect the connection, and wait for it to be re-established
with self.cond:
self.state = 1
with patch("socket.socket.recv") as mock:
mock.side_effect = socket.error
# wait until thread noticies the disconnect until we undo the patch
self.cond.wait_for(lambda: self.state >= 2)
assert (
self.pubsub.connection._sock is None
) # it is in a disconnecte state
# wait for reconnect
self.cond.wait_for(lambda: self.pubsub.connection._sock is not None)
assert self.state == 3

message = self.messages.get(timeout=1)
assert message == {
"channel": b"foo",
"data": 1,
"pattern": None,
"type": "subscribe",
}
# kill thread
with self.cond:
self.state = 4 # quit
thread.join()

def test_reconnect_disconnect(self, r: redis.Redis):
"""
Test that a socket error will cause reconnect
"""
self.messages = queue.Queue()
self.pubsub = r.pubsub()
self.state = 0
self.cond = threading.Condition()

thread = threading.Thread(target=self.loop)
thread.start()
# get the initial connect message
message = self.messages.get(timeout=1)
assert message == {
"channel": b"foo",
"data": 1,
"pattern": None,
"type": "subscribe",
}
# now, disconnect the connection, and wait for it to be re-established
with self.cond:
self.state = 1
self.pubsub.connection.disconnect()
assert self.pubsub.connection._sock is None # it is in a disconnecte state
# wait for reconnect
self.cond.wait_for(lambda: self.pubsub.connection._sock is not None)
assert self.state == 3

message = self.messages.get(timeout=1)
assert message == {
"channel": b"foo",
"data": 1,
"pattern": None,
"type": "subscribe",
}
# kill thread
with self.cond:
self.state = 4 # quit
thread.join()

def loop(self):
# must make sure the task exits
self.pubsub.subscribe("foo")
while True:
time.sleep(0.01) # give main thread chance to get lock
with self.cond:
try:
if self.state == 1:
self.state = 2
elif self.state == 4:
break
# print ('state, %s, sock %s' % (state, pubsub.connection._sock))
self.loop_step(0.1)
if self.state == 2:
self.state = 3 # successful reconnect
except redis.ConnectionError:
pass # we will reconnect

finally:
self.cond.notify()

def loop_step(self, timeout):
# get a single message via listen()
message = self.pubsub.get_message(timeout=timeout)
if message is not None:
self.messages.put(message)
return True
return False