Skip to content

Commit a1b3299

Browse files
dpkpygalblum
andauthored
Admin - Implement perform leader election (#2536)
Co-authored-by: Ygal Blum <[email protected]>
1 parent 837df1e commit a1b3299

File tree

2 files changed

+167
-13
lines changed

2 files changed

+167
-13
lines changed

kafka/admin/client.py

Lines changed: 86 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from kafka.protocol.admin import (
2222
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
2323
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
24-
DeleteGroupsRequest, DeleteRecordsRequest, DescribeLogDirsRequest)
24+
DeleteGroupsRequest, DeleteRecordsRequest, DescribeLogDirsRequest, ElectLeadersRequest, ElectionType)
2525
from kafka.protocol.commit import OffsetFetchRequest
2626
from kafka.protocol.find_coordinator import FindCoordinatorRequest
2727
from kafka.protocol.metadata import MetadataRequest
@@ -393,27 +393,55 @@ def _send_request_to_controller(self, request):
393393
# So this is a little brittle in that it assumes all responses have
394394
# one of these attributes and that they always unpack into
395395
# (topic, error_code) tuples.
396-
topic_error_tuples = (response.topic_errors if hasattr(response, 'topic_errors')
397-
else response.topic_error_codes)
398-
# Also small py2/py3 compatibility -- py3 can ignore extra values
399-
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
400-
# So for now we have to map across the list and explicitly drop any
401-
# extra values (usually the error_message)
402-
for topic, error_code in map(lambda e: e[:2], topic_error_tuples):
396+
topic_error_tuples = getattr(response, 'topic_errors', getattr(response, 'topic_error_codes', None))
397+
if topic_error_tuples is not None:
398+
success = self._parse_topic_request_response(topic_error_tuples, request, response, tries)
399+
else:
400+
# Leader Election request has a two layer error response (topic and partition)
401+
success = self._parse_topic_partition_request_response(request, response, tries)
402+
403+
if success:
404+
return response
405+
raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")
406+
407+
def _parse_topic_request_response(self, topic_error_tuples, request, response, tries):
408+
# Also small py2/py3 compatibility -- py3 can ignore extra values
409+
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
410+
# So for now we have to map across the list and explicitly drop any
411+
# extra values (usually the error_message)
412+
for topic, error_code in map(lambda e: e[:2], topic_error_tuples):
413+
error_type = Errors.for_code(error_code)
414+
if tries and error_type is NotControllerError:
415+
# No need to inspect the rest of the errors for
416+
# non-retriable errors because NotControllerError should
417+
# either be thrown for all errors or no errors.
418+
self._refresh_controller_id()
419+
return False
420+
elif error_type is not Errors.NoError:
421+
raise error_type(
422+
"Request '{}' failed with response '{}'."
423+
.format(request, response))
424+
return True
425+
426+
def _parse_topic_partition_request_response(self, request, response, tries):
427+
# Also small py2/py3 compatibility -- py3 can ignore extra values
428+
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
429+
# So for now we have to map across the list and explicitly drop any
430+
# extra values (usually the error_message)
431+
for topic, partition_results in response.replication_election_results:
432+
for partition_id, error_code in map(lambda e: e[:2], partition_results):
403433
error_type = Errors.for_code(error_code)
404434
if tries and error_type is NotControllerError:
405435
# No need to inspect the rest of the errors for
406436
# non-retriable errors because NotControllerError should
407437
# either be thrown for all errors or no errors.
408438
self._refresh_controller_id()
409-
break
410-
elif error_type is not Errors.NoError:
439+
return False
440+
elif error_type not in [Errors.NoError, Errors.ElectionNotNeeded]:
411441
raise error_type(
412442
"Request '{}' failed with response '{}'."
413443
.format(request, response))
414-
else:
415-
return response
416-
raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")
444+
return True
417445

418446
@staticmethod
419447
def _convert_new_topic_request(new_topic):
@@ -1651,6 +1679,51 @@ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
16511679
.format(version))
16521680
return self._send_request_to_node(group_coordinator_id, request)
16531681

1682+
@staticmethod
1683+
def _convert_topic_partitions(topic_partitions):
1684+
return [
1685+
(
1686+
topic,
1687+
partition_ids
1688+
)
1689+
for topic, partition_ids in topic_partitions.items()
1690+
]
1691+
1692+
def _get_all_topic_partitions(self):
1693+
return [
1694+
(
1695+
topic,
1696+
[partition_info.partition for partition_info in self._client.cluster._partitions[topic].values()]
1697+
)
1698+
for topic in self._client.cluster.topics()
1699+
]
1700+
1701+
def _get_topic_partitions(self, topic_partitions):
1702+
if topic_partitions is None:
1703+
return self._get_all_topic_partitions()
1704+
return self._convert_topic_partitions(topic_partitions)
1705+
1706+
def perform_leader_election(self, election_type, topic_partitions=None, timeout_ms=None):
1707+
"""Perform leader election on the topic partitions.
1708+
1709+
:param election_type: Type of election to attempt. 0 for Perferred, 1 for Unclean
1710+
:param topic_partitions: A map of topic name strings to partition ids list.
1711+
By default, will run on all topic partitions
1712+
:param timeout_ms: Milliseconds to wait for the leader election process to complete
1713+
before the broker returns.
1714+
1715+
:return: Appropriate version of ElectLeadersResponse class.
1716+
"""
1717+
version = self._client.api_version(ElectLeadersRequest, max_version=1)
1718+
timeout_ms = self._validate_timeout(timeout_ms)
1719+
request = ElectLeadersRequest[version](
1720+
election_type=ElectionType(election_type),
1721+
topic_partitions=self._get_topic_partitions(topic_partitions),
1722+
timeout=timeout_ms,
1723+
)
1724+
# TODO convert structs to a more pythonic interface
1725+
return self._send_request_to_controller(request)
1726+
16541727
def _wait_for_futures(self, futures):
16551728
"""Block until all futures complete. If any fail, raise the encountered exception.
16561729

kafka/protocol/admin.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
from __future__ import absolute_import
22

3+
# enum in stdlib as of py3.4
4+
try:
5+
from enum import IntEnum # pylint: disable=import-error
6+
except ImportError:
7+
# vendored backport module
8+
from kafka.vendor.enum34 import IntEnum
9+
310
from kafka.protocol.api import Request, Response
411
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields
512

@@ -1031,3 +1038,77 @@ class ListPartitionReassignmentsRequest_v0(Request):
10311038
ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]
10321039

10331040
ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]
1041+
1042+
1043+
class ElectLeadersResponse_v0(Response):
1044+
API_KEY = 43
1045+
API_VERSION = 1
1046+
SCHEMA = Schema(
1047+
('throttle_time_ms', Int32),
1048+
('error_code', Int16),
1049+
('replication_election_results', Array(
1050+
('topic', String('utf-8')),
1051+
('partition_result', Array(
1052+
('partition_id', Int32),
1053+
('error_code', Int16),
1054+
('error_message', String('utf-8'))
1055+
))
1056+
))
1057+
)
1058+
1059+
1060+
class ElectLeadersRequest_v0(Request):
1061+
API_KEY = 43
1062+
API_VERSION = 1
1063+
RESPONSE_TYPE = ElectLeadersResponse_v0
1064+
SCHEMA = Schema(
1065+
('election_type', Int8),
1066+
('topic_partitions', Array(
1067+
('topic', String('utf-8')),
1068+
('partition_ids', Array(Int32))
1069+
)),
1070+
('timeout', Int32),
1071+
)
1072+
1073+
1074+
class ElectLeadersResponse_v1(Response):
1075+
API_KEY = 43
1076+
API_VERSION = 1
1077+
SCHEMA = Schema(
1078+
('throttle_time_ms', Int32),
1079+
('error_code', Int16),
1080+
('replication_election_results', Array(
1081+
('topic', String('utf-8')),
1082+
('partition_result', Array(
1083+
('partition_id', Int32),
1084+
('error_code', Int16),
1085+
('error_message', String('utf-8'))
1086+
))
1087+
))
1088+
)
1089+
1090+
1091+
class ElectLeadersRequest_v1(Request):
1092+
API_KEY = 43
1093+
API_VERSION = 1
1094+
RESPONSE_TYPE = ElectLeadersResponse_v1
1095+
SCHEMA = Schema(
1096+
('election_type', Int8),
1097+
('topic_partitions', Array(
1098+
('topic', String('utf-8')),
1099+
('partition_ids', Array(Int32))
1100+
)),
1101+
('timeout', Int32),
1102+
)
1103+
1104+
1105+
class ElectionType(IntEnum):
1106+
""" Leader election type
1107+
"""
1108+
1109+
PREFERRED = 0,
1110+
UNCLEAN = 1
1111+
1112+
1113+
ElectLeadersRequest = [ElectLeadersRequest_v0, ElectLeadersRequest_v1]
1114+
ElectLeadersResponse = [ElectLeadersResponse_v0, ElectLeadersResponse_v1]

0 commit comments

Comments
 (0)