Skip to content

Commit c0df771

Browse files
dpkpjeffwidman
authored andcommitted
Add Request/Response structs for kafka broker 1.0.0
1 parent 441aeb8 commit c0df771

File tree

6 files changed

+167
-36
lines changed

6 files changed

+167
-36
lines changed

kafka/conn.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,7 @@ def _infer_broker_version_from_api_versions(self, api_versions):
818818
# in reverse order. As soon as we find one that works, return it
819819
test_cases = [
820820
# format (<broker version>, <needed struct>)
821+
((1, 0, 0), MetadataRequest[5]),
821822
((0, 11, 0), MetadataRequest[4]),
822823
((0, 10, 2), OffsetFetchRequest[2]),
823824
((0, 10, 1), MetadataRequest[2]),

kafka/protocol/admin.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,12 @@ class SaslHandShakeResponse_v0(Response):
286286
)
287287

288288

289+
class SaslHandShakeResponse_v1(Response):
290+
API_KEY = 17
291+
API_VERSION = 1
292+
SCHEMA = SaslHandShakeResponse_v0.SCHEMA
293+
294+
289295
class SaslHandShakeRequest_v0(Request):
290296
API_KEY = 17
291297
API_VERSION = 0
@@ -294,5 +300,36 @@ class SaslHandShakeRequest_v0(Request):
294300
('mechanism', String('utf-8'))
295301
)
296302

297-
SaslHandShakeRequest = [SaslHandShakeRequest_v0]
298-
SaslHandShakeResponse = [SaslHandShakeResponse_v0]
303+
304+
class SaslHandShakeRequest_v1(Request):
305+
API_KEY = 17
306+
API_VERSION = 1
307+
RESPONSE_TYPE = SaslHandShakeResponse_v1
308+
SCHEMA = SaslHandShakeRequest_v0.SCHEMA
309+
310+
311+
SaslHandShakeRequest = [SaslHandShakeRequest_v0, SaslHandShakeRequest_v1]
312+
SaslHandShakeResponse = [SaslHandShakeResponse_v0, SaslHandShakeResponse_v1]
313+
314+
315+
class SaslAuthenticateResponse_v0(Request):
316+
API_KEY = 36
317+
API_VERSION = 0
318+
SCHEMA = Schema(
319+
('error_code', Int16),
320+
('error_message', String('utf-8')),
321+
('sasl_auth_bytes', Bytes)
322+
)
323+
324+
325+
class SaslAuthenticateRequest_v0(Request):
326+
API_KEY = 36
327+
API_VERSION = 0
328+
RESPONSE_TYPE = SaslAuthenticateResponse_v0
329+
SCHEMA = Schema(
330+
('sasl_auth_bytes', Bytes)
331+
)
332+
333+
334+
SaslAuthenticateRequest = [SaslAuthenticateRequest_v0]
335+
SaslAuthenticateResponse = [SaslAuthenticateResponse_v0]

kafka/protocol/fetch.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,16 @@ class FetchResponse_v5(Response):
8484
)
8585

8686

87+
class FetchResponse_v6(Response):
88+
"""
89+
Same as FetchResponse_v5. The version number is bumped up to indicate that the client supports KafkaStorageException.
90+
The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
91+
"""
92+
API_KEY = 1
93+
API_VERSION = 6
94+
SCHEMA = FetchResponse_v5.SCHEMA
95+
96+
8797
class FetchRequest_v0(Request):
8898
API_KEY = 1
8999
API_VERSION = 0
@@ -174,11 +184,25 @@ class FetchRequest_v5(Request):
174184
)
175185

176186

187+
class FetchRequest_v6(Request):
188+
"""
189+
The body of FETCH_REQUEST_V6 is the same as FETCH_REQUEST_V5.
190+
The version number is bumped up to indicate that the client supports KafkaStorageException.
191+
The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
192+
"""
193+
API_KEY = 1
194+
API_VERSION = 6
195+
RESPONSE_TYPE = FetchResponse_v6
196+
SCHEMA = FetchRequest_v5.SCHEMA
197+
198+
177199
FetchRequest = [
178200
FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
179-
FetchRequest_v3, FetchRequest_v4, FetchRequest_v5
201+
FetchRequest_v3, FetchRequest_v4, FetchRequest_v5,
202+
FetchRequest_v6
180203
]
181204
FetchResponse = [
182205
FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
183-
FetchResponse_v3, FetchResponse_v4, FetchResponse_v5
206+
FetchResponse_v3, FetchResponse_v4, FetchResponse_v5,
207+
FetchResponse_v6
184208
]

kafka/protocol/group.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class JoinGroupRequest_v2(Request):
8787
JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2
8888
]
8989
JoinGroupResponse = [
90-
JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v1
90+
JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2
9191
]
9292

9393

kafka/protocol/metadata.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,32 @@ class MetadataResponse_v4(Response):
102102
SCHEMA = MetadataResponse_v3.SCHEMA
103103

104104

105+
class MetadataResponse_v5(Response):
106+
API_KEY = 3
107+
API_VERSION = 5
108+
SCHEMA = Schema(
109+
('throttle_time_ms', Int32),
110+
('brokers', Array(
111+
('node_id', Int32),
112+
('host', String('utf-8')),
113+
('port', Int32),
114+
('rack', String('utf-8')))),
115+
('cluster_id', String('utf-8')),
116+
('controller_id', Int32),
117+
('topics', Array(
118+
('error_code', Int16),
119+
('topic', String('utf-8')),
120+
('is_internal', Boolean),
121+
('partitions', Array(
122+
('error_code', Int16),
123+
('partition', Int32),
124+
('leader', Int32),
125+
('replicas', Array(Int32)),
126+
('isr', Array(Int32)),
127+
('offline_replicas', Array(Int32))))))
128+
)
129+
130+
105131
class MetadataRequest_v0(Request):
106132
API_KEY = 3
107133
API_VERSION = 0
@@ -151,11 +177,24 @@ class MetadataRequest_v4(Request):
151177
NO_TOPICS = None # Empty array (len 0) for topics returns no topics
152178

153179

180+
class MetadataRequest_v5(Request):
181+
"""
182+
The v5 metadata request is the same as v4.
183+
An additional field for offline_replicas has been added to the v5 metadata response
184+
"""
185+
API_KEY = 3
186+
API_VERSION = 5
187+
RESPONSE_TYPE = MetadataResponse_v5
188+
SCHEMA = MetadataRequest_v4.SCHEMA
189+
ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics
190+
NO_TOPICS = None # Empty array (len 0) for topics returns no topics
191+
192+
154193
MetadataRequest = [
155194
MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2,
156-
MetadataRequest_v3, MetadataRequest_v4
195+
MetadataRequest_v3, MetadataRequest_v4, MetadataRequest_v5
157196
]
158197
MetadataResponse = [
159198
MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2,
160-
MetadataResponse_v3, MetadataResponse_v4
199+
MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5
161200
]

kafka/protocol/produce.py

Lines changed: 59 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -52,52 +52,67 @@ class ProduceResponse_v3(Response):
5252
SCHEMA = ProduceResponse_v2.SCHEMA
5353

5454

55-
class ProduceRequest_v0(Request):
55+
class ProduceResponse_v4(Response):
56+
"""
57+
The version number is bumped up to indicate that the client supports KafkaStorageException.
58+
The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
59+
"""
5660
API_KEY = 0
57-
API_VERSION = 0
58-
RESPONSE_TYPE = ProduceResponse_v0
61+
API_VERSION = 4
62+
SCHEMA = ProduceResponse_v3.SCHEMA
63+
64+
65+
class ProduceResponse_v5(Response):
66+
API_KEY = 0
67+
API_VERSION = 5
5968
SCHEMA = Schema(
60-
('required_acks', Int16),
61-
('timeout', Int32),
6269
('topics', Array(
6370
('topic', String('utf-8')),
6471
('partitions', Array(
6572
('partition', Int32),
66-
('messages', Bytes)))))
73+
('error_code', Int16),
74+
('offset', Int64),
75+
('timestamp', Int64),
76+
('log_start_offset', Int64))))),
77+
('throttle_time_ms', Int32)
6778
)
6879

80+
81+
class ProduceRequest(Request):
82+
API_KEY = 0
83+
6984
def expect_response(self):
7085
if self.required_acks == 0: # pylint: disable=no-member
7186
return False
7287
return True
7388

7489

75-
class ProduceRequest_v1(Request):
76-
API_KEY = 0
90+
class ProduceRequest_v0(ProduceRequest):
91+
API_VERSION = 0
92+
RESPONSE_TYPE = ProduceResponse_v0
93+
SCHEMA = Schema(
94+
('required_acks', Int16),
95+
('timeout', Int32),
96+
('topics', Array(
97+
('topic', String('utf-8')),
98+
('partitions', Array(
99+
('partition', Int32),
100+
('messages', Bytes)))))
101+
)
102+
103+
104+
class ProduceRequest_v1(ProduceRequest):
77105
API_VERSION = 1
78106
RESPONSE_TYPE = ProduceResponse_v1
79107
SCHEMA = ProduceRequest_v0.SCHEMA
80108

81-
def expect_response(self):
82-
if self.required_acks == 0: # pylint: disable=no-member
83-
return False
84-
return True
85-
86-
87-
class ProduceRequest_v2(Request):
88-
API_KEY = 0
109+
class ProduceRequest_v2(ProduceRequest):
89110
API_VERSION = 2
90111
RESPONSE_TYPE = ProduceResponse_v2
91112
SCHEMA = ProduceRequest_v1.SCHEMA
92113

93-
def expect_response(self):
94-
if self.required_acks == 0: # pylint: disable=no-member
95-
return False
96-
return True
97-
98114

99-
class ProduceRequest_v3(Request):
100-
API_KEY = 0
115+
class ProduceRequest_v3(ProduceRequest):
101116
API_VERSION = 3
102117
RESPONSE_TYPE = ProduceResponse_v3
103118
SCHEMA = Schema(
@@ -111,17 +126,32 @@ class ProduceRequest_v3(Request):
111126
('messages', Bytes)))))
112127
)
113128

114-
def expect_response(self):
115-
if self.required_acks == 0: # pylint: disable=no-member
116-
return False
117-
return True
129+
130+
class ProduceRequest_v4(ProduceRequest):
131+
"""
132+
The version number is bumped up to indicate that the client supports KafkaStorageException.
133+
The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
134+
"""
135+
API_VERSION = 4
136+
RESPONSE_TYPE = ProduceResponse_v4
137+
SCHEMA = ProduceRequest_v3.SCHEMA
138+
139+
140+
class ProduceRequest_v5(ProduceRequest):
141+
"""
142+
Same as v4. The version number is bumped since the v5 response includes an additional
143+
partition level field: the log_start_offset.
144+
"""
145+
API_VERSION = 5
146+
RESPONSE_TYPE = ProduceResponse_v5
147+
SCHEMA = ProduceRequest_v4.SCHEMA
118148

119149

120150
ProduceRequest = [
121151
ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2,
122-
ProduceRequest_v3
152+
ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5
123153
]
124154
ProduceResponse = [
125155
ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2,
126-
ProduceResponse_v2
156+
ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5
127157
]

0 commit comments

Comments
 (0)