Skip to content

Commit 1dace1d

Browse files
committed
Remove some dead code
1 parent 24bf504 commit 1dace1d

File tree

3 files changed

+6
-194
lines changed

3 files changed

+6
-194
lines changed

kafka/producer/buffer.py

Lines changed: 0 additions & 179 deletions
Original file line numberDiff line numberDiff line change
@@ -237,182 +237,3 @@ def queued(self):
237237
"""The number of threads blocked waiting on memory."""
238238
with self._lock:
239239
return len(self._waiters)
240-
241-
'''
242-
class BufferPool(object):
243-
"""
244-
A pool of ByteBuffers kept under a given memory limit. This class is fairly
245-
specific to the needs of the producer. In particular it has the following
246-
properties:
247-
248-
* There is a special "poolable size" and buffers of this size are kept in a
249-
free list and recycled
250-
* It is fair. That is all memory is given to the longest waiting thread
251-
until it has sufficient memory. This prevents starvation or deadlock when
252-
a thread asks for a large chunk of memory and needs to block until
253-
multiple buffers are deallocated.
254-
"""
255-
def __init__(self, memory, poolable_size):
256-
"""Create a new buffer pool.
257-
258-
Arguments:
259-
memory (int): maximum memory that this buffer pool can allocate
260-
poolable_size (int): memory size per buffer to cache in the free
261-
list rather than deallocating
262-
"""
263-
self._poolable_size = poolable_size
264-
self._lock = threading.RLock()
265-
self._free = collections.deque()
266-
self._waiters = collections.deque()
267-
self._total_memory = memory
268-
self._available_memory = memory
269-
#self.metrics = metrics;
270-
#self.waitTime = this.metrics.sensor("bufferpool-wait-time");
271-
#MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation.");
272-
#this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
273-
274-
def allocate(self, size, max_time_to_block_ms):
275-
"""
276-
Allocate a buffer of the given size. This method blocks if there is not
277-
enough memory and the buffer pool is configured with blocking mode.
278-
279-
Arguments:
280-
size (int): The buffer size to allocate in bytes
281-
max_time_to_block_ms (int): The maximum time in milliseconds to
282-
block for buffer memory to be available
283-
284-
Returns:
285-
buffer
286-
287-
Raises:
288-
InterruptedException If the thread is interrupted while blocked
289-
IllegalArgumentException if size is larger than the total memory
290-
controlled by the pool (and hence we would block forever)
291-
"""
292-
assert size <= self._total_memory, (
293-
"Attempt to allocate %d bytes, but there is a hard limit of %d on"
294-
" memory allocations." % (size, self._total_memory))
295-
296-
with self._lock:
297-
# check if we have a free buffer of the right size pooled
298-
if (size == self._poolable_size and len(self._free) > 0):
299-
return self._free.popleft()
300-
301-
# now check if the request is immediately satisfiable with the
302-
# memory on hand or if we need to block
303-
free_list_size = len(self._free) * self._poolable_size
304-
if self._available_memory + free_list_size >= size:
305-
# we have enough unallocated or pooled memory to immediately
306-
# satisfy the request
307-
self._free_up(size)
308-
self._available_memory -= size
309-
raise NotImplementedError()
310-
#return ByteBuffer.allocate(size)
311-
else:
312-
# we are out of memory and will have to block
313-
accumulated = 0
314-
buf = None
315-
more_memory = threading.Condition(self._lock)
316-
self._waiters.append(more_memory)
317-
# loop over and over until we have a buffer or have reserved
318-
# enough memory to allocate one
319-
while (accumulated < size):
320-
start_wait = time.time()
321-
if not more_memory.wait(max_time_to_block_ms / 1000.0):
322-
raise Errors.KafkaTimeoutError(
323-
"Failed to allocate memory within the configured"
324-
" max blocking time")
325-
end_wait = time.time()
326-
#this.waitTime.record(endWait - startWait, time.milliseconds());
327-
328-
# check if we can satisfy this request from the free list,
329-
# otherwise allocate memory
330-
if (accumulated == 0
331-
and size == self._poolable_size
332-
and self._free):
333-
334-
# just grab a buffer from the free list
335-
buf = self._free.popleft()
336-
accumulated = size
337-
else:
338-
# we'll need to allocate memory, but we may only get
339-
# part of what we need on this iteration
340-
self._free_up(size - accumulated)
341-
got = min(size - accumulated, self._available_memory)
342-
self._available_memory -= got
343-
accumulated += got
344-
345-
# remove the condition for this thread to let the next thread
346-
# in line start getting memory
347-
removed = self._waiters.popleft()
348-
assert removed is more_memory, 'Wrong condition'
349-
350-
# signal any additional waiters if there is more memory left
351-
# over for them
352-
if (self._available_memory > 0 or len(self._free) > 0):
353-
if len(self._waiters) > 0:
354-
self._waiters[0].notify()
355-
356-
# unlock and return the buffer
357-
if buf is None:
358-
raise NotImplementedError()
359-
#return ByteBuffer.allocate(size)
360-
else:
361-
return buf
362-
363-
def _free_up(self, size):
364-
"""
365-
Attempt to ensure we have at least the requested number of bytes of
366-
memory for allocation by deallocating pooled buffers (if needed)
367-
"""
368-
while self._free and self._available_memory < size:
369-
self._available_memory += self._free.pop().capacity
370-
371-
def deallocate(self, buffer_, size=None):
372-
"""
373-
Return buffers to the pool. If they are of the poolable size add them
374-
to the free list, otherwise just mark the memory as free.
375-
376-
Arguments:
377-
buffer (io.BytesIO): The buffer to return
378-
size (int): The size of the buffer to mark as deallocated, note
379-
that this maybe smaller than buffer.capacity since the buffer
380-
may re-allocate itself during in-place compression
381-
"""
382-
with self._lock:
383-
if size is None:
384-
size = buffer_.capacity
385-
if (size == self._poolable_size and size == buffer_.capacity):
386-
buffer_.seek(0)
387-
buffer_.truncate()
388-
self._free.append(buffer_)
389-
else:
390-
self._available_memory += size
391-
392-
if self._waiters:
393-
more_mem = self._waiters[0]
394-
more_mem.notify()
395-
396-
def available_memory(self):
397-
"""The total free memory both unallocated and in the free list."""
398-
with self._lock:
399-
return self._available_memory + len(self._free) * self._poolable_size
400-
401-
def unallocated_memory(self):
402-
"""Get the unallocated memory (not in the free list or in use)."""
403-
with self._lock:
404-
return self._available_memory
405-
406-
def queued(self):
407-
"""The number of threads blocked waiting on memory."""
408-
with self._lock:
409-
return len(self._waiters)
410-
411-
def poolable_size(self):
412-
"""The buffer size that will be retained in the free list after use."""
413-
return self._poolable_size
414-
415-
def total_memory(self):
416-
"""The total memory managed by this pool."""
417-
return self._total_memory
418-
'''

kafka/protocol/struct.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,3 @@ def __eq__(self, other):
6464
if self.__dict__[attr] != other.__dict__[attr]:
6565
return False
6666
return True
67-
68-
"""
69-
class MetaStruct(type):
70-
def __new__(cls, clsname, bases, dct):
71-
nt = namedtuple(clsname, [name for (name, _) in dct['SCHEMA']])
72-
bases = tuple([Struct, nt] + list(bases))
73-
return super(MetaStruct, cls).__new__(cls, clsname, bases, dct)
74-
"""

kafka/util.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,9 @@ def __eq__(self, other):
161161
return self._target_id == other._target_id and self._method_id == other._method_id
162162

163163

164-
def try_method_on_system_exit(obj, method, *args, **kwargs):
165-
def wrapper(_obj, _meth, *args, **kwargs):
166-
try:
167-
getattr(_obj, _meth)(*args, **kwargs)
168-
except (ReferenceError, AttributeError):
169-
pass
170-
atexit.register(wrapper, weakref.proxy(obj), method, *args, **kwargs)
164+
class Dict(dict):
165+
"""Utility class to support passing weakrefs to dicts
166+
167+
See: https://docs.python.org/2/library/weakref.html
168+
"""
169+
pass

0 commit comments

Comments
 (0)