Skip to content

Use KafkaProtocol to handle serialization/deserialization of api events #1230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 44 additions & 145 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
from kafka.protocol.frame import KafkaBytes
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.parser import KafkaProtocol
from kafka.protocol.types import Int32
from kafka.version import __version__

Expand Down Expand Up @@ -73,9 +72,6 @@ class ConnectionStates(object):
CONNECTED = '<connected>'
AUTHENTICATING = '<authenticating>'

InFlightRequest = collections.namedtuple('InFlightRequest',
['request', 'response_type', 'correlation_id', 'future', 'timestamp'])


class BrokerConnection(object):
"""Initialize a Kafka broker connection
Expand Down Expand Up @@ -226,19 +222,17 @@ def __init__(self, host, port, afi, **configs):
assert gssapi is not None, 'GSSAPI lib not available'
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_servicename_kafka required for GSSAPI sasl'

self._protocol = KafkaProtocol(
client_id=self.config['client_id'],
api_version=self.config['api_version'])
self.state = ConnectionStates.DISCONNECTED
self._reset_reconnect_backoff()
self._sock = None
self._ssl_context = None
if self.config['ssl_context'] is not None:
self._ssl_context = self.config['ssl_context']
self._sasl_auth_future = None
self._header = KafkaBytes(4)
self._rbuffer = None
self._receiving = False
self.last_attempt = 0
self._processing = False
self._correlation_id = 0
self._gai = None
self._gai_index = 0
self._sensors = None
Expand Down Expand Up @@ -628,19 +622,16 @@ def close(self, error=None):
self.state = ConnectionStates.DISCONNECTED
self.last_attempt = time.time()
self._sasl_auth_future = None
self._reset_buffer()
self._protocol = KafkaProtocol(
client_id=self.config['client_id'],
api_version=self.config['api_version'])
if error is None:
error = Errors.Cancelled(str(self))
while self.in_flight_requests:
ifr = self.in_flight_requests.popleft()
ifr.future.failure(error)
(_, future, _) = self.in_flight_requests.popleft()
future.failure(error)
self.config['state_change_callback'](self)

def _reset_buffer(self):
self._receiving = False
self._header.seek(0)
self._rbuffer = None

def send(self, request):
"""send request, return Future()

Expand All @@ -658,13 +649,8 @@ def send(self, request):
def _send(self, request):
assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED)
future = Future()
correlation_id = self._next_correlation_id()
header = RequestHeader(request,
correlation_id=correlation_id,
client_id=self.config['client_id'])
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
data = size + message
correlation_id = self._protocol.send_request(request)
data = self._protocol.send_bytes()
try:
# In the future we might manage an internal write buffer
# and send bytes asynchronously. For now, just block
Expand All @@ -686,11 +672,7 @@ def _send(self, request):
log.debug('%s Request %d: %s', self, correlation_id, request)

if request.expect_response():
ifr = InFlightRequest(request=request,
correlation_id=correlation_id,
response_type=request.RESPONSE_TYPE,
future=future,
timestamp=time.time())
ifr = (correlation_id, future, time.time())
self.in_flight_requests.append(ifr)
else:
future.success(None)
Expand All @@ -707,7 +689,6 @@ def recv(self):

Return response if available
"""
assert not self._processing, 'Recursion not supported'
if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
log.warning('%s cannot recv: socket not connected', self)
# If requests are pending, we should close the socket and
Expand All @@ -720,15 +701,28 @@ def recv(self):
log.warning('%s: No in-flight-requests to recv', self)
return ()

response = self._recv()
if not response and self.requests_timed_out():
responses = self._recv()
if not responses and self.requests_timed_out():
log.warning('%s timed out after %s ms. Closing connection.',
self, self.config['request_timeout_ms'])
self.close(error=Errors.RequestTimedOutError(
'Request timed out after %s ms' %
self.config['request_timeout_ms']))
return ()
return response

for response in responses:
(correlation_id, future, timestamp) = self.in_flight_requests.popleft()
if isinstance(response, Errors.KafkaError):
self.close(response)
break

if self._sensors:
self._sensors.request_time.record((time.time() - timestamp) * 1000)

log.debug('%s Response %d: %s', self, correlation_id, response)
future.success(response)

return responses

def _recv(self):
responses = []
Expand All @@ -744,10 +738,7 @@ def _recv(self):
log.error('%s: socket disconnected', self)
self.close(error=Errors.ConnectionError('socket disconnected'))
break
else:
responses.extend(self.receive_bytes(data))
if len(data) < SOCK_CHUNK_BYTES:
break

except SSLWantReadError:
break
except ConnectionError as e:
Expand All @@ -761,118 +752,26 @@ def _recv(self):
if six.PY3:
break
raise
return responses

def receive_bytes(self, data):
i = 0
n = len(data)
responses = []
if self._sensors:
self._sensors.bytes_received.record(n)
while i < n:

# Not receiving is the state of reading the payload header
if not self._receiving:
bytes_to_read = min(4 - self._header.tell(), n - i)
self._header.write(data[i:i+bytes_to_read])
i += bytes_to_read

if self._header.tell() == 4:
self._header.seek(0)
nbytes = Int32.decode(self._header)
# reset buffer and switch state to receiving payload bytes
self._rbuffer = KafkaBytes(nbytes)
self._receiving = True
elif self._header.tell() > 4:
raise Errors.KafkaError('this should not happen - are you threading?')


if self._receiving:
total_bytes = len(self._rbuffer)
staged_bytes = self._rbuffer.tell()
bytes_to_read = min(total_bytes - staged_bytes, n - i)
self._rbuffer.write(data[i:i+bytes_to_read])
i += bytes_to_read

staged_bytes = self._rbuffer.tell()
if staged_bytes > total_bytes:
self.close(error=Errors.KafkaError('Receive buffer has more bytes than expected?'))

if staged_bytes != total_bytes:
break
if self._sensors:
self._sensors.bytes_received.record(len(data))

self._receiving = False
self._rbuffer.seek(0)
resp = self._process_response(self._rbuffer)
if resp is not None:
responses.append(resp)
self._reset_buffer()
return responses
try:
more_responses = self._protocol.receive_bytes(data)
except Errors.KafkaProtocolError as e:
self.close(e)
break
else:
responses.extend([resp for (_, resp) in more_responses])

def _process_response(self, read_buffer):
assert not self._processing, 'Recursion not supported'
self._processing = True
recv_correlation_id = Int32.decode(read_buffer)

if not self.in_flight_requests:
error = Errors.CorrelationIdError(
'%s: No in-flight-request found for server response'
' with correlation ID %d'
% (self, recv_correlation_id))
self.close(error)
self._processing = False
return None
else:
ifr = self.in_flight_requests.popleft()

if self._sensors:
self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000)

# verify send/recv correlation ids match

# 0.8.2 quirk
if (self.config['api_version'] == (0, 8, 2) and
ifr.response_type is GroupCoordinatorResponse[0] and
ifr.correlation_id != 0 and
recv_correlation_id == 0):
log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse'
' Correlation ID does not match request. This'
' should go away once at least one topic has been'
' initialized on the broker.')

elif ifr.correlation_id != recv_correlation_id:
error = Errors.CorrelationIdError(
'%s: Correlation IDs do not match: sent %d, recv %d'
% (self, ifr.correlation_id, recv_correlation_id))
ifr.future.failure(error)
self.close(error)
self._processing = False
return None

# decode response
try:
response = ifr.response_type.decode(read_buffer)
except ValueError:
read_buffer.seek(0)
buf = read_buffer.read()
log.error('%s Response %d [ResponseType: %s Request: %s]:'
' Unable to decode %d-byte buffer: %r', self,
ifr.correlation_id, ifr.response_type,
ifr.request, len(buf), buf)
error = Errors.UnknownError('Unable to decode response')
ifr.future.failure(error)
self.close(error)
self._processing = False
return None

log.debug('%s Response %d: %s', self, ifr.correlation_id, response)
ifr.future.success(response)
self._processing = False
return response
if len(data) < SOCK_CHUNK_BYTES:
break

return responses

def requests_timed_out(self):
if self.in_flight_requests:
oldest_at = self.in_flight_requests[0].timestamp
(_, _, oldest_at) = self.in_flight_requests[0]
timeout = self.config['request_timeout_ms'] / 1000.0
if time.time() >= oldest_at + timeout:
return True
Expand Down
6 changes: 5 additions & 1 deletion kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ class NodeNotReadyError(KafkaError):
retriable = True


class CorrelationIdError(KafkaError):
class KafkaProtocolError(KafkaError):
retriable = True


class CorrelationIdError(KafkaProtocolError):
retriable = True


Expand Down
Loading