Skip to content

Commit bdbd158

Browse files
committed
Initial support for Redis cluster client.
1 parent 6e7f456 commit bdbd158

File tree

8 files changed

+337
-0
lines changed

8 files changed

+337
-0
lines changed

.github/workflows/test-cluster.yaml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
name: Test Cluster
2+
3+
on: [push, pull_request]
4+
5+
permissions:
6+
contents: read
7+
8+
env:
9+
CONSOLE_OUTPUT: XTerm
10+
11+
jobs:
12+
test:
13+
name: ${{matrix.ruby}} on ${{matrix.os}}
14+
runs-on: ${{matrix.os}}-latest
15+
continue-on-error: ${{matrix.experimental}}
16+
17+
strategy:
18+
matrix:
19+
os:
20+
- ubuntu
21+
22+
ruby:
23+
- "3.1"
24+
- "3.2"
25+
- "3.3"
26+
27+
experimental: [false]
28+
29+
steps:
30+
- uses: actions/checkout@v4
31+
32+
- name: Install Docker Compose
33+
run: |
34+
sudo apt-get update
35+
sudo apt-get install -y docker-compose
36+
37+
- name: Run tests
38+
timeout-minutes: 10
39+
env:
40+
RUBY_VERSION: ${{matrix.ruby}}
41+
run: docker-compose -f cluster/docker-compose.yaml up --exit-code-from tests

cluster/docker-compose.yaml

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
services:
2+
redis-a:
3+
image: redis
4+
command: redis-server /etc/redis/redis.conf
5+
volumes:
6+
- ./node-a/cluster.conf:/etc/redis/redis.conf
7+
redis-b:
8+
image: redis
9+
command: redis-server /etc/redis/redis.conf
10+
volumes:
11+
- ./node-b/cluster.conf:/etc/redis/redis.conf
12+
redis-c:
13+
image: redis
14+
command: redis-server /etc/redis/redis.conf
15+
volumes:
16+
- ./node-c/cluster.conf:/etc/redis/redis.conf
17+
controller:
18+
image: redis
19+
command: >
20+
bash -c "
21+
redis-cli --cluster create --cluster-yes --cluster-replicas 0 redis-a:6379 redis-b:6379 redis-c:6379;
22+
while true; do
23+
redis-cli -h redis-a cluster info | grep cluster_state:fail;
24+
sleep 1;
25+
done"
26+
healthcheck:
27+
test: "redis-cli -h redis-a cluster info | grep cluster_state:ok"
28+
interval: 1s
29+
timeout: 3s
30+
retries: 30
31+
depends_on:
32+
- redis-a
33+
- redis-b
34+
- redis-c
35+
tests:
36+
image: ruby:${RUBY_VERSION:-latest}
37+
volumes:
38+
- ../:/code
39+
command: bash -c "cd /code && bundle install && bundle exec sus cluster/test"
40+
environment:
41+
- COVERAGE=${COVERAGE}
42+
depends_on:
43+
- controller

cluster/node-a/cluster.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
cluster-enabled yes
2+
cluster-config-file nodes.conf
3+
cluster-node-timeout 5000
4+
appendonly yes

cluster/node-b/cluster.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
cluster-enabled yes
2+
cluster-config-file nodes.conf
3+
cluster-node-timeout 5000
4+
appendonly yes

cluster/node-c/cluster.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
cluster-enabled yes
2+
cluster-config-file nodes.conf
3+
cluster-node-timeout 5000
4+
appendonly yes
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2024, by Samuel Williams.
5+
6+
require 'async/redis/cluster_client'
7+
require 'sus/fixtures/async'
8+
require 'securerandom'
9+
10+
describe Async::Redis::ClusterClient do
11+
include Sus::Fixtures::Async::ReactorContext
12+
13+
let(:node_a) {"redis://redis-a:6379"}
14+
let(:node_b) {"redis://redis-b:6379"}
15+
let(:node_c) {"redis://redis-c:6379"}
16+
17+
let(:endpoints) {[
18+
Async::Redis::Endpoint.parse(node_a),
19+
Async::Redis::Endpoint.parse(node_b),
20+
Async::Redis::Endpoint.parse(node_c)
21+
]}
22+
23+
let(:cluster) {subject.new(endpoints)}
24+
25+
let(:key) {"sentinel-test:#{SecureRandom.hex(8)}"}
26+
let(:value) {"sentinel-test-value"}
27+
28+
it "can get and set values" do
29+
cluster.clients_for(key) do |client, key|
30+
client.set(key, value)
31+
expect(client.get(key)).to be == value
32+
end
33+
end
34+
end

lib/async/redis.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@
66

77
require_relative 'redis/version'
88
require_relative 'redis/client'
9+
10+
require_relative 'redis/cluster_client'
911
require_relative 'redis/sentinel_client'

lib/async/redis/cluster_client.rb

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2020, by David Ortiz.
5+
# Copyright, 2023-2024, by Samuel Williams.
6+
7+
require_relative 'client'
8+
require 'io/stream'
9+
10+
module Async
11+
module Redis
12+
class ClusterClient
13+
class ReloadError < StandardError
14+
end
15+
16+
Node = Struct.new(:id, :endpoint, :role, :health, :client)
17+
18+
class RangeMap
19+
def initialize
20+
@ranges = []
21+
end
22+
23+
def add(range, value)
24+
@ranges << [range, value]
25+
26+
return value
27+
end
28+
29+
def find(key)
30+
@ranges.each do |range, value|
31+
return value if range.include?(key)
32+
end
33+
34+
if block_given?
35+
return yield
36+
end
37+
38+
return nil
39+
end
40+
41+
def each
42+
@ranges.each do |range, value|
43+
yield value
44+
end
45+
end
46+
47+
def clear
48+
@ranges.clear
49+
end
50+
end
51+
52+
# Create a new instance of the cluster client.
53+
#
54+
# @property endpoints [Array(Endpoint)] The list of cluster endpoints.
55+
def initialize(endpoints, **options)
56+
@endpoints = endpoints
57+
@shards = nil
58+
end
59+
60+
def clients_for(*keys, attempts: 3)
61+
slots = slots_for(keys)
62+
63+
slots.map do |slot, keys|
64+
yield client_for(slot), keys
65+
end
66+
rescue ServerError => error
67+
if error.message =~ /MOVED|ASK/
68+
reload_cluster!
69+
70+
attempts -= 1
71+
72+
retry if attempts > 0
73+
else
74+
raise
75+
end
76+
end
77+
78+
def client_for(slot, role = :master)
79+
unless @nodes
80+
reload_cluster!
81+
end
82+
83+
nodes = @shards.find(slot)
84+
85+
nodes = nodes.select{|node| node.role == role}
86+
87+
if node = nodes.sample
88+
node.client ||= Client.new(node.endpoint)
89+
end
90+
end
91+
92+
protected
93+
94+
def reload_cluster!(endpoints = @endpoints)
95+
@endpoints.each do |endpoint|
96+
client = Client.new(endpoint)
97+
98+
shards = RangeMap.new
99+
endpoints = []
100+
101+
client.call('CLUSTER', 'SHARDS').each do |shard|
102+
shard = shard.each_slice(2).to_h
103+
104+
slots = shard['slots']
105+
range = Range.new(*slots, exclude_end: false)
106+
107+
nodes = shard['nodes'].map do |node|
108+
node = node.each_slice(2).to_h
109+
endpoint = Endpoint.remote(node['ip'], node['port'])
110+
111+
# Collect all endpoints:
112+
endpoints << endpoint
113+
114+
Node.new(node['id'], endpoint, node['role'].to_sym, node['health'].to_sym)
115+
end
116+
117+
shards.add(range, nodes)
118+
end
119+
120+
@shards = shards
121+
# @endpoints = @endpoints | endpoints
122+
123+
return true
124+
rescue Errno::ECONNREFUSED
125+
next
126+
end
127+
128+
raise ReloadError, "Failed to reload cluster configuration."
129+
end
130+
131+
XMODEM_CRC16_LOOKUP = [
132+
0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
133+
0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
134+
0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
135+
0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
136+
0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
137+
0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
138+
0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
139+
0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
140+
0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
141+
0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
142+
0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
143+
0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
144+
0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
145+
0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
146+
0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
147+
0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
148+
0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
149+
0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
150+
0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
151+
0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
152+
0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
153+
0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
154+
0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
155+
0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
156+
0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
157+
0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
158+
0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
159+
0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
160+
0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
161+
0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
162+
0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
163+
0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0
164+
].freeze
165+
166+
# This is the CRC16 algorithm used by Redis Cluster to hash keys.
167+
# Copied from https://github.com/antirez/redis-rb-cluster/blob/master/crc16.rb
168+
def crc16(bytes)
169+
crc = 0
170+
171+
bytes.each_byte do |b|
172+
crc = ((crc << 8) & 0xffff) ^ XMODEM_CRC16_LOOKUP[((crc >> 8) ^ b) & 0xff]
173+
end
174+
175+
return crc
176+
end
177+
178+
HASH_SLOTS = 16_384
179+
180+
# Return Redis::Client for a given key.
181+
# Modified from https://github.com/antirez/redis-rb-cluster/blob/master/cluster.rb#L104-L117
182+
def slot_for(key)
183+
key = key.to_s
184+
185+
if s = key.index('{')
186+
if e = key.index('}', s + 1) and e != s + 1
187+
key = key[s + 1..e - 1]
188+
end
189+
end
190+
191+
return crc16(key) % HASH_SLOTS
192+
end
193+
194+
def slots_for(keys)
195+
slots = Hash.new{|hash, key| hash[key] = []}
196+
197+
keys.each do |key|
198+
slots[slot_for(key)] << key
199+
end
200+
201+
return slots
202+
end
203+
end
204+
end
205+
end

0 commit comments

Comments
 (0)