Skip to content

Commit 9e1c9de

Browse files
author
Gabriel Tincu
committed
client: allow for custom kafka clients
Provide the consumer, producer and admin client with the option to create the kafka client from a custom callable, thus allowing more flexibility in handling certain low level errors
1 parent 6f932ba commit 9e1c9de

File tree

4 files changed

+25
-14
lines changed

4 files changed

+25
-14
lines changed

kafka/admin/client.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
1212
ACLResourcePatternType
13-
from kafka.client_async import KafkaClient, selectors
13+
from kafka.client_async import selectors, KafkaClient
1414
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
1515
import kafka.errors as Errors
1616
from kafka.errors import (
@@ -26,6 +26,7 @@
2626
from kafka.protocol.metadata import MetadataRequest
2727
from kafka.protocol.types import Array
2828
from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
29+
from kafka.util import get_client_factory
2930
from kafka.version import __version__
3031

3132

@@ -146,6 +147,7 @@ class KafkaAdminClient(object):
146147
sasl mechanism handshake. Default: one of bootstrap servers
147148
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
148149
instance. (See kafka.oauth.abstract). Default: None
150+
client (callable): Custom class / callable for creating KafkaClient instances
149151
150152
"""
151153
DEFAULT_CONFIG = {
@@ -186,6 +188,7 @@ class KafkaAdminClient(object):
186188
'metric_reporters': [],
187189
'metrics_num_samples': 2,
188190
'metrics_sample_window_ms': 30000,
191+
'client': KafkaClient,
189192
}
190193

191194
def __init__(self, **configs):
@@ -204,10 +207,12 @@ def __init__(self, **configs):
204207
tags=metrics_tags)
205208
reporters = [reporter() for reporter in self.config['metric_reporters']]
206209
self._metrics = Metrics(metric_config, reporters)
207-
208-
self._client = KafkaClient(metrics=self._metrics,
209-
metric_group_prefix='admin',
210-
**self.config)
210+
assert callable(self.config['client']), "Client parameter should be callable"
211+
self._client = self.config['client'](
212+
metrics=self._metrics,
213+
metric_group_prefix='admin',
214+
**self.config
215+
)
211216
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
212217

213218
# Get auto-discovered version from client if necessary

kafka/consumer/group.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from kafka.vendor import six
1111

12-
from kafka.client_async import KafkaClient, selectors
12+
from kafka.client_async import selectors, KafkaClient
1313
from kafka.consumer.fetcher import Fetcher
1414
from kafka.consumer.subscription_state import SubscriptionState
1515
from kafka.coordinator.consumer import ConsumerCoordinator
@@ -244,6 +244,7 @@ class KafkaConsumer(six.Iterator):
244244
sasl mechanism handshake. Default: one of bootstrap servers
245245
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
246246
instance. (See kafka.oauth.abstract). Default: None
247+
client (callable): Custom class / callable for creating KafkaClient instances
247248
248249
Note:
249250
Configuration parameters are described in more detail at
@@ -306,6 +307,7 @@ class KafkaConsumer(six.Iterator):
306307
'sasl_kerberos_domain_name': None,
307308
'sasl_oauth_token_provider': None,
308309
'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
310+
'client': KafkaClient,
309311
}
310312
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
311313

@@ -352,8 +354,8 @@ def __init__(self, *topics, **configs):
352354
self.config['api_version'] = tuple(map(int, str_version.split('.')))
353355
log.warning('use api_version=%s [tuple] -- "%s" as str is deprecated',
354356
str(self.config['api_version']), str_version)
355-
356-
self._client = KafkaClient(metrics=self._metrics, **self.config)
357+
assert callable(self.config['client']), "Client parameter should be callable"
358+
self._client = self.config['client'](metrics=self._metrics, **self.config)
357359

358360
# Get auto-discovered version from client if necessary
359361
if self.config['api_version'] is None:

kafka/producer/kafka.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from kafka.vendor import six
1212

1313
import kafka.errors as Errors
14-
from kafka.client_async import KafkaClient, selectors
14+
from kafka.client_async import selectors, KafkaClient
1515
from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd
1616
from kafka.metrics import MetricConfig, Metrics
1717
from kafka.partitioner.default import DefaultPartitioner
@@ -280,6 +280,7 @@ class KafkaProducer(object):
280280
sasl mechanism handshake. Default: one of bootstrap servers
281281
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
282282
instance. (See kafka.oauth.abstract). Default: None
283+
client (callable): Custom class / callable for creating KafkaClient instances
283284
284285
Note:
285286
Configuration parameters are described in more detail at
@@ -332,7 +333,8 @@ class KafkaProducer(object):
332333
'sasl_plain_password': None,
333334
'sasl_kerberos_service_name': 'kafka',
334335
'sasl_kerberos_domain_name': None,
335-
'sasl_oauth_token_provider': None
336+
'sasl_oauth_token_provider': None,
337+
'client': KafkaClient,
336338
}
337339

338340
_COMPRESSORS = {
@@ -377,10 +379,11 @@ def __init__(self, **configs):
377379
tags=metrics_tags)
378380
reporters = [reporter() for reporter in self.config['metric_reporters']]
379381
self._metrics = Metrics(metric_config, reporters)
380-
381-
client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
382-
wakeup_timeout_ms=self.config['max_block_ms'],
383-
**self.config)
382+
assert callable(self.config['client']), "Client parameter should be callable"
383+
client = self.config['client'](
384+
metrics=self._metrics, metric_group_prefix='producer',
385+
wakeup_timeout_ms=self.config['max_block_ms'],
386+
**self.config)
384387

385388
# Get auto-discovered version from client if necessary
386389
if self.config['api_version'] is None:

kafka/util.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import absolute_import
22

33
import binascii
4+
import kafka
45
import weakref
56

67
from kafka.vendor import six

0 commit comments

Comments
 (0)