Skip to content

Commit d3479bd

Browse files
dpkp88manpreet
authored andcommitted
Add kafka.protocol.parser.KafkaProtocol w/ receive and send (dpkp#1230)
1 parent 6d66d1b commit d3479bd

File tree

3 files changed

+226
-148
lines changed

3 files changed

+226
-148
lines changed

kafka/conn.py

Lines changed: 44 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,10 @@
1414
import kafka.errors as Errors
1515
from kafka.future import Future
1616
from kafka.metrics.stats import Avg, Count, Max, Rate
17-
from kafka.protocol.api import RequestHeader
1817
from kafka.protocol.admin import SaslHandShakeRequest
19-
from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
20-
from kafka.protocol.frame import KafkaBytes
18+
from kafka.protocol.commit import OffsetFetchRequest
2119
from kafka.protocol.metadata import MetadataRequest
20+
from kafka.protocol.parser import KafkaProtocol
2221
from kafka.protocol.types import Int32
2322
from kafka.version import __version__
2423

@@ -73,9 +72,6 @@ class ConnectionStates(object):
7372
CONNECTED = '<connected>'
7473
AUTHENTICATING = '<authenticating>'
7574

76-
InFlightRequest = collections.namedtuple('InFlightRequest',
77-
['request', 'response_type', 'correlation_id', 'future', 'timestamp'])
78-
7975

8076
class BrokerConnection(object):
8177
"""Initialize a Kafka broker connection
@@ -227,19 +223,17 @@ def __init__(self, host, port, afi, **configs):
227223
assert gssapi is not None, 'GSSAPI lib not available'
228224
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
229225

226+
self._protocol = KafkaProtocol(
227+
client_id=self.config['client_id'],
228+
api_version=self.config['api_version'])
230229
self.state = ConnectionStates.DISCONNECTED
231230
self._reset_reconnect_backoff()
232231
self._sock = None
233232
self._ssl_context = None
234233
if self.config['ssl_context'] is not None:
235234
self._ssl_context = self.config['ssl_context']
236235
self._sasl_auth_future = None
237-
self._header = KafkaBytes(4)
238-
self._rbuffer = None
239-
self._receiving = False
240236
self.last_attempt = 0
241-
self._processing = False
242-
self._correlation_id = 0
243237
self._gai = None
244238
self._gai_index = 0
245239
self._sensors = None
@@ -643,19 +637,16 @@ def close(self, error=None):
643637
self.state = ConnectionStates.DISCONNECTED
644638
self.last_attempt = time.time()
645639
self._sasl_auth_future = None
646-
self._reset_buffer()
640+
self._protocol = KafkaProtocol(
641+
client_id=self.config['client_id'],
642+
api_version=self.config['api_version'])
647643
if error is None:
648644
error = Errors.Cancelled(str(self))
649645
while self.in_flight_requests:
650-
ifr = self.in_flight_requests.popleft()
651-
ifr.future.failure(error)
646+
(_, future, _) = self.in_flight_requests.popleft()
647+
future.failure(error)
652648
self.config['state_change_callback'](self)
653649

654-
def _reset_buffer(self):
655-
self._receiving = False
656-
self._header.seek(0)
657-
self._rbuffer = None
658-
659650
def send(self, request):
660651
"""send request, return Future()
661652
@@ -673,13 +664,8 @@ def send(self, request):
673664
def _send(self, request):
674665
assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED)
675666
future = Future()
676-
correlation_id = self._next_correlation_id()
677-
header = RequestHeader(request,
678-
correlation_id=correlation_id,
679-
client_id=self.config['client_id'])
680-
message = b''.join([header.encode(), request.encode()])
681-
size = Int32.encode(len(message))
682-
data = size + message
667+
correlation_id = self._protocol.send_request(request)
668+
data = self._protocol.send_bytes()
683669
try:
684670
# In the future we might manage an internal write buffer
685671
# and send bytes asynchronously. For now, just block
@@ -701,11 +687,7 @@ def _send(self, request):
701687
log.debug('%s Request %d: %s', self, correlation_id, request)
702688

703689
if request.expect_response():
704-
ifr = InFlightRequest(request=request,
705-
correlation_id=correlation_id,
706-
response_type=request.RESPONSE_TYPE,
707-
future=future,
708-
timestamp=time.time())
690+
ifr = (correlation_id, future, time.time())
709691
self.in_flight_requests.append(ifr)
710692
else:
711693
future.success(None)
@@ -722,7 +704,6 @@ def recv(self):
722704
723705
Return response if available
724706
"""
725-
assert not self._processing, 'Recursion not supported'
726707
if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
727708
log.warning('%s cannot recv: socket not connected', self)
728709
# If requests are pending, we should close the socket and
@@ -735,17 +716,28 @@ def recv(self):
735716
log.warning('%s: No in-flight-requests to recv', self)
736717
return ()
737718

738-
elif self._requests_timed_out():
719+
responses = self._recv()
720+
if not responses and self.requests_timed_out():
739721
log.warning('%s timed out after %s ms. Closing connection.',
740722
self, self.config['request_timeout_ms'])
741723
self.close(error=Errors.RequestTimedOutError(
742724
'Request timed out after %s ms' %
743725
self.config['request_timeout_ms']))
744726
return ()
745727

746-
# TODO: manpreet: Decide to return response/None
747-
# return response
748-
return self._recv()
728+
for response in responses:
729+
(correlation_id, future, timestamp) = self.in_flight_requests.popleft()
730+
if isinstance(response, Errors.KafkaError):
731+
self.close(response)
732+
break
733+
734+
if self._sensors:
735+
self._sensors.request_time.record((time.time() - timestamp) * 1000)
736+
737+
log.debug('%s Response %d: %s', self, correlation_id, response)
738+
future.success(response)
739+
740+
return responses
749741

750742
def _recv(self):
751743
responses = []
@@ -761,10 +753,7 @@ def _recv(self):
761753
log.error('%s: socket disconnected', self)
762754
self.close(error=Errors.ConnectionError('socket disconnected'))
763755
break
764-
else:
765-
responses.extend(self.receive_bytes(data))
766-
if len(data) < SOCK_CHUNK_BYTES:
767-
break
756+
768757
except SSLWantReadError:
769758
break
770759
except ConnectionError as e:
@@ -778,118 +767,26 @@ def _recv(self):
778767
if six.PY3:
779768
break
780769
raise
781-
return responses
782770

783-
def receive_bytes(self, data):
784-
i = 0
785-
n = len(data)
786-
responses = []
787-
if self._sensors:
788-
self._sensors.bytes_received.record(n)
789-
while i < n:
790-
791-
# Not receiving is the state of reading the payload header
792-
if not self._receiving:
793-
bytes_to_read = min(4 - self._header.tell(), n - i)
794-
self._header.write(data[i:i+bytes_to_read])
795-
i += bytes_to_read
796-
797-
if self._header.tell() == 4:
798-
self._header.seek(0)
799-
nbytes = Int32.decode(self._header)
800-
# reset buffer and switch state to receiving payload bytes
801-
self._rbuffer = KafkaBytes(nbytes)
802-
self._receiving = True
803-
elif self._header.tell() > 4:
804-
raise Errors.KafkaError('this should not happen - are you threading?')
805-
806-
807-
if self._receiving:
808-
total_bytes = len(self._rbuffer)
809-
staged_bytes = self._rbuffer.tell()
810-
bytes_to_read = min(total_bytes - staged_bytes, n - i)
811-
self._rbuffer.write(data[i:i+bytes_to_read])
812-
i += bytes_to_read
813-
814-
staged_bytes = self._rbuffer.tell()
815-
if staged_bytes > total_bytes:
816-
self.close(error=Errors.KafkaError('Receive buffer has more bytes than expected?'))
817-
818-
if staged_bytes != total_bytes:
819-
break
771+
if self._sensors:
772+
self._sensors.bytes_received.record(len(data))
773+
774+
try:
775+
more_responses = self._protocol.receive_bytes(data)
776+
except Errors.KafkaProtocolError as e:
777+
self.close(e)
778+
break
779+
else:
780+
responses.extend([resp for (_, resp) in more_responses])
781+
782+
if len(data) < SOCK_CHUNK_BYTES:
783+
break
820784

821-
self._receiving = False
822-
self._rbuffer.seek(0)
823-
resp = self._process_response(self._rbuffer)
824-
if resp is not None:
825-
responses.append(resp)
826-
self._reset_buffer()
827785
return responses
828786

829-
def _process_response(self, read_buffer):
830-
assert not self._processing, 'Recursion not supported'
831-
self._processing = True
832-
recv_correlation_id = Int32.decode(read_buffer)
833-
834-
if not self.in_flight_requests:
835-
error = Errors.CorrelationIdError(
836-
'%s: No in-flight-request found for server response'
837-
' with correlation ID %d'
838-
% (self, recv_correlation_id))
839-
self.close(error)
840-
self._processing = False
841-
return None
842-
else:
843-
ifr = self.in_flight_requests.popleft()
844-
845-
if self._sensors:
846-
self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000)
847-
848-
# verify send/recv correlation ids match
849-
850-
# 0.8.2 quirk
851-
if (self.config['api_version'] == (0, 8, 2) and
852-
ifr.response_type is GroupCoordinatorResponse[0] and
853-
ifr.correlation_id != 0 and
854-
recv_correlation_id == 0):
855-
log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse'
856-
' Correlation ID does not match request. This'
857-
' should go away once at least one topic has been'
858-
' initialized on the broker.')
859-
860-
elif ifr.correlation_id != recv_correlation_id:
861-
error = Errors.CorrelationIdError(
862-
'%s: Correlation IDs do not match: sent %d, recv %d'
863-
% (self, ifr.correlation_id, recv_correlation_id))
864-
ifr.future.failure(error)
865-
self.close(error)
866-
self._processing = False
867-
return None
868-
869-
# decode response
870-
try:
871-
response = ifr.response_type.decode(read_buffer)
872-
except ValueError:
873-
read_buffer.seek(0)
874-
buf = read_buffer.read()
875-
log.error('%s Response %d [ResponseType: %s Request: %s]:'
876-
' Unable to decode %d-byte buffer: %r', self,
877-
ifr.correlation_id, ifr.response_type,
878-
ifr.request, len(buf), buf)
879-
error = Errors.UnknownError('Unable to decode response')
880-
ifr.future.failure(error)
881-
self.close(error)
882-
self._processing = False
883-
return None
884-
885-
log.debug('%s Response %d: %s', self, ifr.correlation_id, response)
886-
ifr.future.success(response)
887-
self._processing = False
888-
return response
889-
890-
def _requests_timed_out(self):
787+
def requests_timed_out(self):
891788
if self.in_flight_requests:
892-
oldest_at = self.in_flight_requests[0].timestamp
789+
(_, _, oldest_at) = self.in_flight_requests[0]
893790
timeout = self.config['request_timeout_ms'] / 1000.0
894791
if time.time() >= oldest_at + timeout:
895792
return True

kafka/errors.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ class NodeNotReadyError(KafkaError):
3333
retriable = True
3434

3535

36-
class CorrelationIdError(KafkaError):
36+
class KafkaProtocolError(KafkaError):
37+
retriable = True
38+
39+
40+
class CorrelationIdError(KafkaProtocolError):
3741
retriable = True
3842

3943

0 commit comments

Comments
 (0)