|
1 | 1 | import binascii
|
2 | 2 | import datetime
|
| 3 | +import random |
3 | 4 | import uuid
|
4 | 5 | import warnings
|
5 | 6 | from time import sleep
|
| 7 | +from unittest import mock |
6 | 8 | from unittest.mock import DEFAULT, Mock, call, patch
|
7 | 9 |
|
8 | 10 | import pytest
|
|
20 | 22 | REDIS_CLUSTER_HASH_SLOTS,
|
21 | 23 | REPLICA,
|
22 | 24 | ClusterNode,
|
| 25 | + LoadBalancer, |
23 | 26 | NodesManager,
|
24 | 27 | RedisCluster,
|
25 | 28 | get_node_name,
|
@@ -810,7 +813,7 @@ def raise_error(target_node, *args, **kwargs):
|
810 | 813 | rc = get_mocked_redis_client(
|
811 | 814 | host=default_host,
|
812 | 815 | port=default_port,
|
813 |
| - retry=Retry(ConstantBackoff(1), 3), |
| 816 | + retry=Retry(ConstantBackoff(1), 10), |
814 | 817 | )
|
815 | 818 |
|
816 | 819 | with pytest.raises(error):
|
@@ -2519,6 +2522,37 @@ def test_connection_pool_class(self, connection_pool_class):
|
2519 | 2522 | node.redis_connection.connection_pool, connection_pool_class
|
2520 | 2523 | )
|
2521 | 2524 |
|
| 2525 | + @pytest.mark.parametrize("invalid_index", [-10, 10]) |
| 2526 | + def test_return_primary_if_invalid_node_index_is_returned(self, invalid_index): |
| 2527 | + rc = get_mocked_redis_client( |
| 2528 | + url="redis://[email protected]:7000", |
| 2529 | + cluster_slots=default_cluster_slots, |
| 2530 | + ) |
| 2531 | + random_slot = random.randint( |
| 2532 | + default_cluster_slots[0][0], default_cluster_slots[0][1] |
| 2533 | + ) |
| 2534 | + |
| 2535 | + ports = set() |
| 2536 | + for _ in range(0, 10): |
| 2537 | + ports.add( |
| 2538 | + rc.nodes_manager.get_node_from_slot( |
| 2539 | + random_slot, read_from_replicas=True |
| 2540 | + ).port |
| 2541 | + ) |
| 2542 | + assert ports == {default_port, 7003} |
| 2543 | + |
| 2544 | + ports = set() |
| 2545 | + with mock.patch.object( |
| 2546 | + LoadBalancer, "get_server_index", return_value=invalid_index |
| 2547 | + ): |
| 2548 | + for _ in range(0, 10): |
| 2549 | + ports.add( |
| 2550 | + rc.nodes_manager.get_node_from_slot( |
| 2551 | + random_slot, read_from_replicas=True |
| 2552 | + ).port |
| 2553 | + ) |
| 2554 | + assert ports == {default_port} |
| 2555 | + |
2522 | 2556 |
|
2523 | 2557 | @pytest.mark.onlycluster
|
2524 | 2558 | class TestClusterPubSubObject:
|
@@ -2930,6 +2964,33 @@ def test_empty_stack(self, r):
|
2930 | 2964 | result = p.execute()
|
2931 | 2965 | assert result == []
|
2932 | 2966 |
|
| 2967 | + @pytest.mark.parametrize("error", [ConnectionError, TimeoutError]) |
| 2968 | + def test_additional_backoff_cluster_pipeline(self, r, error): |
| 2969 | + with patch.object(ConstantBackoff, "compute") as compute: |
| 2970 | + |
| 2971 | + def _compute(target_node, *args, **kwargs): |
| 2972 | + return 1 |
| 2973 | + |
| 2974 | + compute.side_effect = _compute |
| 2975 | + with patch("redis.cluster.get_connection") as get_connection: |
| 2976 | + |
| 2977 | + def raise_error(target_node, *args, **kwargs): |
| 2978 | + get_connection.failed_calls += 1 |
| 2979 | + raise error("mocked error") |
| 2980 | + |
| 2981 | + get_connection.side_effect = raise_error |
| 2982 | + |
| 2983 | + r.set_retry(Retry(ConstantBackoff(1), 10)) |
| 2984 | + pipeline = r.pipeline() |
| 2985 | + |
| 2986 | + with pytest.raises(error): |
| 2987 | + pipeline.get("bar") |
| 2988 | + pipeline.get("bar") |
| 2989 | + pipeline.execute() |
| 2990 | + # cluster pipeline does one more back off than a single Redis command |
| 2991 | + # this is not required, but it's just how it's implemented as of now |
| 2992 | + assert compute.call_count == r.cluster_error_retry_attempts + 1 |
| 2993 | + |
2933 | 2994 |
|
2934 | 2995 | @pytest.mark.onlycluster
|
2935 | 2996 | class TestReadOnlyPipeline:
|
|
0 commit comments