Skip to content

Commit a2191e5

Browse files
committed
Support list (or comma-separated) of hosts (replaces host and port arguments)
1 parent 84de472 commit a2191e5

File tree

4 files changed

+56
-28
lines changed

4 files changed

+56
-28
lines changed

kafka/client.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
BrokerResponseError, PartitionUnavailableError,
1111
KafkaUnavailableError, KafkaRequestError)
1212

13-
from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
13+
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
1414
from kafka.protocol import KafkaProtocol
1515

1616
log = logging.getLogger("kafka")
@@ -24,14 +24,15 @@ class KafkaClient(object):
2424
# NOTE: The timeout given to the client should always be greater than the
2525
# one passed to SimpleConsumer.get_message(), otherwise you can get a
2626
# socket timeout.
27-
def __init__(self, host, port, client_id=CLIENT_ID,
27+
def __init__(self, hosts, client_id=CLIENT_ID,
2828
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
2929
# We need one connection to bootstrap
3030
self.client_id = client_id
3131
self.timeout = timeout
32-
self.conns = { # (host, port) -> KafkaConnection
33-
(host, port): KafkaConnection(host, port, timeout=timeout)
34-
}
32+
self.hosts = collect_hosts(hosts)
33+
34+
# create connections only when we need them
35+
self.conns = {}
3536
self.brokers = {} # broker_id -> BrokerMetadata
3637
self.topics_to_brokers = {} # topic_id -> broker_id
3738
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
@@ -46,7 +47,7 @@ def _get_conn(self, host, port):
4647

4748
host_key = (host, port)
4849
if host_key not in self.conns:
49-
self.conns[host_key] = KafkaConnection(host, port, self.bufsize)
50+
self.conns[host_key] = KafkaConnection(host, port)
5051

5152
return self.conns[host_key]
5253

kafka/conn.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@ def collect_hosts(hosts, randomize=True):
1717
randomize the returned list.
1818
"""
1919

20+
if isinstance(hosts, str):
21+
hosts = hosts.split(',')
22+
2023
result = []
21-
for host_port in hosts.split(","):
24+
for host_port in hosts:
2225

2326
res = host_port.split(':')
2427
host = res[0]

test/test_integration.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def ensure_topic_creation(client, topic_name):
3333

3434
class KafkaTestCase(unittest.TestCase):
3535
def setUp(self):
36-
self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10))
36+
self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
3737
ensure_topic_creation(self.client, self.topic)
3838

3939

@@ -578,7 +578,7 @@ def setUpClass(cls):
578578
cls.zk = ZookeeperFixture.instance()
579579
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
580580
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
581-
cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192)
581+
cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port))
582582

583583
@classmethod
584584
def tearDownClass(cls): # noqa
@@ -800,7 +800,6 @@ def test_large_messages(self):
800800
self.assertEquals(all_messages[i], message.message)
801801
self.assertEquals(i, 19)
802802

803-
804803
# Produce 1 message that is too large (bigger than max fetch size)
805804
big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10
806805
big_message = create_message(random_string(big_message_size))
@@ -827,25 +826,26 @@ def test_large_messages(self):
827826

828827
class TestFailover(KafkaTestCase):
829828

830-
def setUp(self):
829+
@classmethod
830+
def setUpClass(cls): # noqa
831831
zk_chroot = random_string(10)
832832
replicas = 2
833833
partitions = 2
834834

835835
# mini zookeeper, 2 kafka brokers
836-
self.zk = ZookeeperFixture.instance()
837-
kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
838-
self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
836+
cls.zk = ZookeeperFixture.instance()
837+
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
838+
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
839839

840-
hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers])
840+
hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers]
841841
cls.client = KafkaClient(hosts)
842-
super(TestFailover, self).setUp()
843842

844-
def tearDown(self):
845-
self.client.close()
846-
for broker in self.brokers:
843+
@classmethod
844+
def tearDownClass(cls):
845+
cls.client.close()
846+
for broker in cls.brokers:
847847
broker.close()
848-
self.zk.close()
848+
cls.zk.close()
849849

850850
def test_switch_leader(self):
851851
key, topic, partition = random_string(5), self.topic, 0

test/test_unit.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55

66
from mock import patch
77

8+
from kafka import KafkaClient
89
from kafka.common import (
910
ProduceRequest, FetchRequest, Message, ChecksumError,
1011
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
1112
OffsetAndMessage, BrokerMetadata, PartitionMetadata
1213
)
14+
from kafka.common import KafkaUnavailableError
1315
from kafka.codec import (
1416
has_gzip, has_snappy, gzip_encode, gzip_decode,
1517
snappy_encode, snappy_decode
@@ -384,6 +386,26 @@ def test_decode_offset_fetch_response(self):
384386

385387
class TestKafkaClient(unittest.TestCase):
386388

389+
def test_init_with_list(self):
390+
391+
with patch.object(KafkaClient, 'load_metadata_for_topics'):
392+
client = KafkaClient(
393+
hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
394+
395+
self.assertItemsEqual(
396+
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
397+
client.hosts)
398+
399+
def test_init_with_csv(self):
400+
401+
with patch.object(KafkaClient, 'load_metadata_for_topics'):
402+
client = KafkaClient(
403+
hosts='kafka01:9092,kafka02:9092,kafka03:9092')
404+
405+
self.assertItemsEqual(
406+
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
407+
client.hosts)
408+
387409
def test_send_broker_unaware_request_fail(self):
388410
'Tests that call fails when all hosts are unavailable'
389411

@@ -402,14 +424,16 @@ def mock_get_conn(host, port):
402424
return mocked_conns[(host, port)]
403425

404426
# patch to avoid making requests before we want it
405-
with patch.object(KafkaClient, '_load_metadata_for_topics'), \
427+
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
406428
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
407429

408-
client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
430+
client = KafkaClient(hosts=['kafka01:9092','kafka02:9092'])
409431

410-
resp = client._send_broker_unaware_request(1, 'fake request')
411432

412-
self.assertIsNone(resp)
433+
self.assertRaises(
434+
KafkaUnavailableError,
435+
client._send_broker_unaware_request,
436+
1, 'fake request')
413437

414438
for key, conn in mocked_conns.iteritems():
415439
conn.send.assert_called_with(1, 'fake request')
@@ -434,7 +458,7 @@ def mock_get_conn(host, port):
434458
return mocked_conns[(host, port)]
435459

436460
# patch to avoid making requests before we want it
437-
with patch.object(KafkaClient, '_load_metadata_for_topics'), \
461+
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
438462
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
439463

440464
client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
@@ -444,7 +468,7 @@ def mock_get_conn(host, port):
444468
self.assertEqual('valid response', resp)
445469
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
446470

447-
@unittest.skip('requires disabling recursion on _load_metadata_for_topics')
471+
@unittest.skip('requires disabling recursion on load_metadata_for_topics')
448472
@patch('kafka.client.KafkaConnection')
449473
@patch('kafka.client.KafkaProtocol')
450474
def test_client_load_metadata(self, protocol, conn):
@@ -474,7 +498,7 @@ def test_client_load_metadata(self, protocol, conn):
474498
},
475499
client.topics_to_brokers)
476500

477-
@unittest.skip('requires disabling recursion on _load_metadata_for_topics')
501+
@unittest.skip('requires disabling recursion on load_metadata_for_topics')
478502
@patch('kafka.client.KafkaConnection')
479503
@patch('kafka.client.KafkaProtocol')
480504
def test_client_load_metadata_unassigned_partitions(self, protocol, conn):
@@ -513,7 +537,7 @@ def test_client_load_metadata_unassigned_partitions(self, protocol, conn):
513537
},
514538
client.topics_to_brokers)
515539

516-
@unittest.skip('requires disabling recursion on _load_metadata_for_topics')
540+
@unittest.skip('requires disabling recursion on load_metadata_for_topics')
517541
@patch('kafka.client.KafkaConnection')
518542
@patch('kafka.client.KafkaProtocol')
519543
def test_client_load_metadata_noleader_partitions(self, protocol, conn):

0 commit comments

Comments
 (0)