|
8 | 8 |
|
9 | 9 | import pytest
|
10 | 10 |
|
| 11 | +import redis.cluster |
11 | 12 | from redis import Redis
|
12 | 13 | from redis.backoff import (
|
13 | 14 | ConstantBackoff,
|
@@ -2970,6 +2971,33 @@ def raise_ask_error():
|
2970 | 2971 | assert ask_node.redis_connection.connection.read_response.called
|
2971 | 2972 | assert res == ["MOCK_OK"]
|
2972 | 2973 |
|
| 2974 | + @pytest.mark.parametrize("error", [ConnectionError, TimeoutError]) |
| 2975 | + def test_return_previous_acquired_connections(self, r, error): |
| 2976 | + # in order to ensure that a pipeline will make use of connections |
| 2977 | + # from different nodes |
| 2978 | + assert r.keyslot("a") != r.keyslot("b") |
| 2979 | + |
| 2980 | + orig_func = redis.cluster.get_connection |
| 2981 | + with patch("redis.cluster.get_connection") as get_connection: |
| 2982 | + |
| 2983 | + def raise_error(target_node, *args, **kwargs): |
| 2984 | + if get_connection.call_count == 2: |
| 2985 | + raise error("mocked error") |
| 2986 | + else: |
| 2987 | + return orig_func(target_node, *args, **kwargs) |
| 2988 | + |
| 2989 | + get_connection.side_effect = raise_error |
| 2990 | + |
| 2991 | + r.pipeline().get("a").get("b").execute() |
| 2992 | + |
| 2993 | + # there should have been two get_connections per execution and |
| 2994 | + # two executions due to exception raised in the first execution |
| 2995 | + assert get_connection.call_count == 4 |
| 2996 | + for cluster_node in r.nodes_manager.nodes_cache.values(): |
| 2997 | + connection_pool = cluster_node.redis_connection.connection_pool |
| 2998 | + num_of_conns = len(connection_pool._available_connections) |
| 2999 | + assert num_of_conns == connection_pool._created_connections |
| 3000 | + |
2973 | 3001 | def test_empty_stack(self, r):
|
2974 | 3002 | """
|
2975 | 3003 | If pipeline is executed with no commands it should
|
|
0 commit comments