|
8 | 8 |
|
9 | 9 | import pytest
|
10 | 10 |
|
| 11 | +import redis.cluster |
11 | 12 | from redis import Redis
|
12 | 13 | from redis.backoff import (
|
13 | 14 | ConstantBackoff,
|
@@ -2925,6 +2926,33 @@ def raise_ask_error():
|
2925 | 2926 | assert ask_node.redis_connection.connection.read_response.called
|
2926 | 2927 | assert res == ["MOCK_OK"]
|
2927 | 2928 |
|
| 2929 | + @pytest.mark.parametrize("error", [ConnectionError, TimeoutError]) |
| 2930 | + def test_return_previous_acquired_connections(self, r, error): |
| 2931 | + # in order to ensure that a pipeline will make use of connections |
| 2932 | + # from different nodes |
| 2933 | + assert r.keyslot("a") != r.keyslot("b") |
| 2934 | + |
| 2935 | + orig_func = redis.cluster.get_connection |
| 2936 | + with patch("redis.cluster.get_connection") as get_connection: |
| 2937 | + |
| 2938 | + def raise_error(target_node, *args, **kwargs): |
| 2939 | + if get_connection.call_count == 2: |
| 2940 | + raise error("mocked error") |
| 2941 | + else: |
| 2942 | + return orig_func(target_node, *args, **kwargs) |
| 2943 | + |
| 2944 | + get_connection.side_effect = raise_error |
| 2945 | + |
| 2946 | + r.pipeline().get("a").get("b").execute() |
| 2947 | + |
| 2948 | + # there should have been two get_connections per execution and |
| 2949 | + # two executions due to exception raised in the first execution |
| 2950 | + assert get_connection.call_count == 4 |
| 2951 | + for cluster_node in r.nodes_manager.nodes_cache.values(): |
| 2952 | + connection_pool = cluster_node.redis_connection.connection_pool |
| 2953 | + num_of_conns = len(connection_pool._available_connections) |
| 2954 | + assert num_of_conns == connection_pool._created_connections |
| 2955 | + |
2928 | 2956 | def test_empty_stack(self, r):
|
2929 | 2957 | """
|
2930 | 2958 | If pipeline is executed with no commands it should
|
|
0 commit comments