|
1 | 1 | import binascii
|
2 | 2 | import datetime
|
| 3 | +import random |
3 | 4 | import uuid
|
4 | 5 | import warnings
|
5 | 6 | from queue import LifoQueue, Queue
|
6 | 7 | from time import sleep
|
| 8 | +from unittest import mock |
7 | 9 | from unittest.mock import DEFAULT, Mock, call, patch
|
8 | 10 |
|
9 | 11 | import pytest
|
|
21 | 23 | REDIS_CLUSTER_HASH_SLOTS,
|
22 | 24 | REPLICA,
|
23 | 25 | ClusterNode,
|
| 26 | + LoadBalancer, |
24 | 27 | NodesManager,
|
25 | 28 | RedisCluster,
|
26 | 29 | get_node_name,
|
@@ -811,7 +814,7 @@ def raise_error(target_node, *args, **kwargs):
|
811 | 814 | rc = get_mocked_redis_client(
|
812 | 815 | host=default_host,
|
813 | 816 | port=default_port,
|
814 |
| - retry=Retry(ConstantBackoff(1), 3), |
| 817 | + retry=Retry(ConstantBackoff(1), 10), |
815 | 818 | )
|
816 | 819 |
|
817 | 820 | with pytest.raises(error):
|
@@ -2532,6 +2535,37 @@ def test_allow_custom_queue_class(self, queue_class):
|
2532 | 2535 | for node in rc.nodes_manager.nodes_cache.values():
|
2533 | 2536 | assert node.redis_connection.connection_pool.queue_class == queue_class
|
2534 | 2537 |
|
| 2538 | + @pytest.mark.parametrize("invalid_index", [-10, 10]) |
| 2539 | + def test_return_primary_if_invalid_node_index_is_returned(self, invalid_index): |
| 2540 | + rc = get_mocked_redis_client( |
| 2541 | + url="redis://[email protected]:7000", |
| 2542 | + cluster_slots=default_cluster_slots, |
| 2543 | + ) |
| 2544 | + random_slot = random.randint( |
| 2545 | + default_cluster_slots[0][0], default_cluster_slots[0][1] |
| 2546 | + ) |
| 2547 | + |
| 2548 | + ports = set() |
| 2549 | + for _ in range(0, 10): |
| 2550 | + ports.add( |
| 2551 | + rc.nodes_manager.get_node_from_slot( |
| 2552 | + random_slot, read_from_replicas=True |
| 2553 | + ).port |
| 2554 | + ) |
| 2555 | + assert ports == {default_port, 7003} |
| 2556 | + |
| 2557 | + ports = set() |
| 2558 | + with mock.patch.object( |
| 2559 | + LoadBalancer, "get_server_index", return_value=invalid_index |
| 2560 | + ): |
| 2561 | + for _ in range(0, 10): |
| 2562 | + ports.add( |
| 2563 | + rc.nodes_manager.get_node_from_slot( |
| 2564 | + random_slot, read_from_replicas=True |
| 2565 | + ).port |
| 2566 | + ) |
| 2567 | + assert ports == {default_port} |
| 2568 | + |
2535 | 2569 |
|
2536 | 2570 | @pytest.mark.onlycluster
|
2537 | 2571 | class TestClusterPubSubObject:
|
@@ -2962,6 +2996,33 @@ def test_empty_stack(self, r):
|
2962 | 2996 | result = p.execute()
|
2963 | 2997 | assert result == []
|
2964 | 2998 |
|
| 2999 | + @pytest.mark.parametrize("error", [ConnectionError, TimeoutError]) |
| 3000 | + def test_additional_backoff_cluster_pipeline(self, r, error): |
| 3001 | + with patch.object(ConstantBackoff, "compute") as compute: |
| 3002 | + |
| 3003 | + def _compute(target_node, *args, **kwargs): |
| 3004 | + return 1 |
| 3005 | + |
| 3006 | + compute.side_effect = _compute |
| 3007 | + with patch("redis.cluster.get_connection") as get_connection: |
| 3008 | + |
| 3009 | + def raise_error(target_node, *args, **kwargs): |
| 3010 | + get_connection.failed_calls += 1 |
| 3011 | + raise error("mocked error") |
| 3012 | + |
| 3013 | + get_connection.side_effect = raise_error |
| 3014 | + |
| 3015 | + r.set_retry(Retry(ConstantBackoff(1), 10)) |
| 3016 | + pipeline = r.pipeline() |
| 3017 | + |
| 3018 | + with pytest.raises(error): |
| 3019 | + pipeline.get("bar") |
| 3020 | + pipeline.get("bar") |
| 3021 | + pipeline.execute() |
| 3022 | + # cluster pipeline does one more back off than a single Redis command |
| 3023 | + # this is not required, but it's just how it's implemented as of now |
| 3024 | + assert compute.call_count == r.cluster_error_retry_attempts + 1 |
| 3025 | + |
2965 | 3026 |
|
2966 | 3027 | @pytest.mark.onlycluster
|
2967 | 3028 | class TestReadOnlyPipeline:
|
|
0 commit comments