Skip to content

Topics management #1956

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,14 +470,48 @@ def delete_topics(self, topics, timeout_ms=None):
.format(version))
return response

# list topics functionality is in ClusterMetadata
# Note: if implemented here, send the request to the least_loaded_node()

# describe topics functionality is in ClusterMetadata
# Note: if implemented here, send the request to the controller
def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
"""
topics == None means "get all topics"
"""
version = self._matching_api_version(MetadataRequest)
if version <= 3:
if auto_topic_creation:
raise IncompatibleBrokerVersion(
"auto_topic_creation requires MetadataRequest >= v4, which"
" is not supported by Kafka {}"
.format(self.config['api_version']))

# describe cluster functionality is in ClusterMetadata
# Note: if implemented here, send the request to the least_loaded_node()
request = MetadataRequest[version](topics=topics)
elif version <= 5:
request = MetadataRequest[version](
topics=topics,
allow_auto_topic_creation=auto_topic_creation
)

future = self._send_request_to_node(
self._client.least_loaded_node(),
request
)
self._wait_for_futures([future])
return future.value

def list_topics(self):
metadata = self._get_cluster_metadata(topics=None)
obj = metadata.to_object()
return [t['topic'] for t in obj['topics']]

def describe_topics(self, topics=None):
metadata = self._get_cluster_metadata(topics=topics)
obj = metadata.to_object()
return obj['topics']

def describe_cluster(self):
metadata = self._get_cluster_metadata()
obj = metadata.to_object()
obj.pop('topics') # We have 'describe_topics' for this
return obj

@staticmethod
def _convert_describe_acls_response_to_acls(describe_response):
Expand Down
32 changes: 31 additions & 1 deletion kafka/protocol/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import abc

from kafka.protocol.struct import Struct
from kafka.protocol.types import Int16, Int32, String, Schema
from kafka.protocol.types import Int16, Int32, String, Schema, Array


class RequestHeader(Struct):
Expand Down Expand Up @@ -47,6 +47,9 @@ def expect_response(self):
"""Override this method if an api request does not always generate a response"""
return True

def to_object(self):
return _to_object(self.SCHEMA, self.fields)


class Response(Struct):
__metaclass__ = abc.ABCMeta
Expand All @@ -65,3 +68,30 @@ def API_VERSION(self):
def SCHEMA(self):
"""An instance of Schema() representing the response structure"""
pass

def to_object(self):
return _to_object(self.SCHEMA, self.fields)


def _to_object(schema, data):
obj = {}
for idx, (name, _type) in enumerate(zip(schema.names, schema.fields)):
if isinstance(data, dict):
val = data[name]
else:
val = data[idx]

if isinstance(_type, Schema):
obj[name] = _to_object(_type, val)
elif isinstance(_type, Array):
if isinstance(_type.array_of, (Array, Schema)):
obj[name] = [
_to_object(_type.array_of, x)
for x in val
]
else:
obj[name] = val
else:
obj[name] = val

return obj
3 changes: 3 additions & 0 deletions kafka/protocol/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ class Struct(AbstractType):
SCHEMA = Schema()

def __init__(self, *args, **kwargs):
self.fields = {}
if len(args) == len(self.SCHEMA.fields):
for i, name in enumerate(self.SCHEMA.names):
self.__dict__[name] = args[i]
self.fields[name] = args[i]
elif len(args) > 0:
raise ValueError('Args must be empty or mirror schema')
else:
for name in self.SCHEMA.names:
self.__dict__[name] = kwargs.pop(name, None)
self.fields[name] = self.__dict__[name]
if kwargs:
raise ValueError('Keyword(s) not in schema %s: %s'
% (list(self.SCHEMA.names),
Expand Down
223 changes: 223 additions & 0 deletions test/test_object_conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
from kafka.protocol.admin import Request
from kafka.protocol.admin import Response
from kafka.protocol.types import Schema
from kafka.protocol.types import Array
from kafka.protocol.types import Int16
from kafka.protocol.types import String

import pytest

@pytest.mark.parametrize('superclass', (Request, Response))
class TestObjectConversion:
def test_with_empty_schema(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema()

tc = TestClass()
tc.encode()
assert tc.to_object() == {}

def test_with_basic_schema(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myobject', Int16))

tc = TestClass(myobject=0)
tc.encode()
assert tc.to_object() == {'myobject': 0}

def test_with_basic_array_schema(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(Int16)))

tc = TestClass(myarray=[1,2,3])
tc.encode()
assert tc.to_object()['myarray'] == [1, 2, 3]

def test_with_complex_array_schema(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(
('subobject', Int16),
('othersubobject', String('utf-8')))))

tc = TestClass(
myarray=[[10, 'hello']]
)
tc.encode()
obj = tc.to_object()
assert len(obj['myarray']) == 1
assert obj['myarray'][0]['subobject'] == 10
assert obj['myarray'][0]['othersubobject'] == 'hello'

def test_with_array_and_other(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(
('subobject', Int16),
('othersubobject', String('utf-8')))),
('notarray', Int16))

tc = TestClass(
myarray=[[10, 'hello']],
notarray=42
)

obj = tc.to_object()
assert len(obj['myarray']) == 1
assert obj['myarray'][0]['subobject'] == 10
assert obj['myarray'][0]['othersubobject'] == 'hello'
assert obj['notarray'] == 42

def test_with_nested_array(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(
('subarray', Array(Int16)),
('otherobject', Int16))))

tc = TestClass(
myarray=[
[[1, 2], 2],
[[2, 3], 4],
]
)
print(tc.encode())


obj = tc.to_object()
assert len(obj['myarray']) == 2
assert obj['myarray'][0]['subarray'] == [1, 2]
assert obj['myarray'][0]['otherobject'] == 2
assert obj['myarray'][1]['subarray'] == [2, 3]
assert obj['myarray'][1]['otherobject'] == 4

def test_with_complex_nested_array(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(
('subarray', Array(
('innertest', String('utf-8')),
('otherinnertest', String('utf-8')))),
('othersubarray', Array(Int16)))),
('notarray', String('utf-8')))

tc = TestClass(
myarray=[
[[['hello', 'hello'], ['hello again', 'hello again']], [0]],
[[['hello', 'hello again']], [1]],
],
notarray='notarray'
)
tc.encode()

obj = tc.to_object()

assert obj['notarray'] == 'notarray'
myarray = obj['myarray']
assert len(myarray) == 2

assert myarray[0]['othersubarray'] == [0]
assert len(myarray[0]['subarray']) == 2
assert myarray[0]['subarray'][0]['innertest'] == 'hello'
assert myarray[0]['subarray'][0]['otherinnertest'] == 'hello'
assert myarray[0]['subarray'][1]['innertest'] == 'hello again'
assert myarray[0]['subarray'][1]['otherinnertest'] == 'hello again'

assert myarray[1]['othersubarray'] == [1]
assert len(myarray[1]['subarray']) == 1
assert myarray[1]['subarray'][0]['innertest'] == 'hello'
assert myarray[1]['subarray'][0]['otherinnertest'] == 'hello again'

def test_with_metadata_response():
from kafka.protocol.metadata import MetadataResponse_v5
tc = MetadataResponse_v5(
throttle_time_ms=0,
brokers=[
[0, 'testhost0', 9092, 'testrack0'],
[1, 'testhost1', 9092, 'testrack1'],
],
cluster_id='abcd',
controller_id=0,
topics=[
[0, 'testtopic1', False, [
[0, 0, 0, [0, 1], [0, 1], []],
[0, 1, 1, [1, 0], [1, 0], []],
],
], [0, 'other-test-topic', True, [
[0, 0, 0, [0, 1], [0, 1], []],
]
]]
)
tc.encode() # Make sure this object encodes successfully


obj = tc.to_object()

assert obj['throttle_time_ms'] == 0

assert len(obj['brokers']) == 2
assert obj['brokers'][0]['node_id'] == 0
assert obj['brokers'][0]['host'] == 'testhost0'
assert obj['brokers'][0]['port'] == 9092
assert obj['brokers'][0]['rack'] == 'testrack0'
assert obj['brokers'][1]['node_id'] == 1
assert obj['brokers'][1]['host'] == 'testhost1'
assert obj['brokers'][1]['port'] == 9092
assert obj['brokers'][1]['rack'] == 'testrack1'

assert obj['cluster_id'] == 'abcd'
assert obj['controller_id'] == 0

assert len(obj['topics']) == 2
assert obj['topics'][0]['error_code'] == 0
assert obj['topics'][0]['topic'] == 'testtopic1'
assert obj['topics'][0]['is_internal'] == False
assert len(obj['topics'][0]['partitions']) == 2
assert obj['topics'][0]['partitions'][0]['error_code'] == 0
assert obj['topics'][0]['partitions'][0]['partition'] == 0
assert obj['topics'][0]['partitions'][0]['leader'] == 0
assert obj['topics'][0]['partitions'][0]['replicas'] == [0, 1]
assert obj['topics'][0]['partitions'][0]['isr'] == [0, 1]
assert obj['topics'][0]['partitions'][0]['offline_replicas'] == []
assert obj['topics'][0]['partitions'][1]['error_code'] == 0
assert obj['topics'][0]['partitions'][1]['partition'] == 1
assert obj['topics'][0]['partitions'][1]['leader'] == 1
assert obj['topics'][0]['partitions'][1]['replicas'] == [1, 0]
assert obj['topics'][0]['partitions'][1]['isr'] == [1, 0]
assert obj['topics'][0]['partitions'][1]['offline_replicas'] == []

assert obj['topics'][1]['error_code'] == 0
assert obj['topics'][1]['topic'] == 'other-test-topic'
assert obj['topics'][1]['is_internal'] == True
assert len(obj['topics'][1]['partitions']) == 1
assert obj['topics'][1]['partitions'][0]['error_code'] == 0
assert obj['topics'][1]['partitions'][0]['partition'] == 0
assert obj['topics'][1]['partitions'][0]['leader'] == 0
assert obj['topics'][1]['partitions'][0]['replicas'] == [0, 1]
assert obj['topics'][1]['partitions'][0]['isr'] == [0, 1]
assert obj['topics'][1]['partitions'][0]['offline_replicas'] == []

tc.encode()