@@ -113,182 +113,3 @@ def queued(self):
113
113
"""The number of threads blocked waiting on memory."""
114
114
with self ._lock :
115
115
return len (self ._waiters )
116
-
117
- '''
118
- class BufferPool(object):
119
- """
120
- A pool of ByteBuffers kept under a given memory limit. This class is fairly
121
- specific to the needs of the producer. In particular it has the following
122
- properties:
123
-
124
- * There is a special "poolable size" and buffers of this size are kept in a
125
- free list and recycled
126
- * It is fair. That is all memory is given to the longest waiting thread
127
- until it has sufficient memory. This prevents starvation or deadlock when
128
- a thread asks for a large chunk of memory and needs to block until
129
- multiple buffers are deallocated.
130
- """
131
- def __init__(self, memory, poolable_size):
132
- """Create a new buffer pool.
133
-
134
- Arguments:
135
- memory (int): maximum memory that this buffer pool can allocate
136
- poolable_size (int): memory size per buffer to cache in the free
137
- list rather than deallocating
138
- """
139
- self._poolable_size = poolable_size
140
- self._lock = threading.RLock()
141
- self._free = collections.deque()
142
- self._waiters = collections.deque()
143
- self._total_memory = memory
144
- self._available_memory = memory
145
- #self.metrics = metrics;
146
- #self.waitTime = this.metrics.sensor("bufferpool-wait-time");
147
- #MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation.");
148
- #this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
149
-
150
- def allocate(self, size, max_time_to_block_ms):
151
- """
152
- Allocate a buffer of the given size. This method blocks if there is not
153
- enough memory and the buffer pool is configured with blocking mode.
154
-
155
- Arguments:
156
- size (int): The buffer size to allocate in bytes
157
- max_time_to_block_ms (int): The maximum time in milliseconds to
158
- block for buffer memory to be available
159
-
160
- Returns:
161
- buffer
162
-
163
- Raises:
164
- InterruptedException If the thread is interrupted while blocked
165
- IllegalArgumentException if size is larger than the total memory
166
- controlled by the pool (and hence we would block forever)
167
- """
168
- assert size <= self._total_memory, (
169
- "Attempt to allocate %d bytes, but there is a hard limit of %d on"
170
- " memory allocations." % (size, self._total_memory))
171
-
172
- with self._lock:
173
- # check if we have a free buffer of the right size pooled
174
- if (size == self._poolable_size and len(self._free) > 0):
175
- return self._free.popleft()
176
-
177
- # now check if the request is immediately satisfiable with the
178
- # memory on hand or if we need to block
179
- free_list_size = len(self._free) * self._poolable_size
180
- if self._available_memory + free_list_size >= size:
181
- # we have enough unallocated or pooled memory to immediately
182
- # satisfy the request
183
- self._free_up(size)
184
- self._available_memory -= size
185
- raise NotImplementedError()
186
- #return ByteBuffer.allocate(size)
187
- else:
188
- # we are out of memory and will have to block
189
- accumulated = 0
190
- buf = None
191
- more_memory = threading.Condition(self._lock)
192
- self._waiters.append(more_memory)
193
- # loop over and over until we have a buffer or have reserved
194
- # enough memory to allocate one
195
- while (accumulated < size):
196
- start_wait = time.time()
197
- if not more_memory.wait(max_time_to_block_ms / 1000.0):
198
- raise Errors.KafkaTimeoutError(
199
- "Failed to allocate memory within the configured"
200
- " max blocking time")
201
- end_wait = time.time()
202
- #this.waitTime.record(endWait - startWait, time.milliseconds());
203
-
204
- # check if we can satisfy this request from the free list,
205
- # otherwise allocate memory
206
- if (accumulated == 0
207
- and size == self._poolable_size
208
- and self._free):
209
-
210
- # just grab a buffer from the free list
211
- buf = self._free.popleft()
212
- accumulated = size
213
- else:
214
- # we'll need to allocate memory, but we may only get
215
- # part of what we need on this iteration
216
- self._free_up(size - accumulated)
217
- got = min(size - accumulated, self._available_memory)
218
- self._available_memory -= got
219
- accumulated += got
220
-
221
- # remove the condition for this thread to let the next thread
222
- # in line start getting memory
223
- removed = self._waiters.popleft()
224
- assert removed is more_memory, 'Wrong condition'
225
-
226
- # signal any additional waiters if there is more memory left
227
- # over for them
228
- if (self._available_memory > 0 or len(self._free) > 0):
229
- if len(self._waiters) > 0:
230
- self._waiters[0].notify()
231
-
232
- # unlock and return the buffer
233
- if buf is None:
234
- raise NotImplementedError()
235
- #return ByteBuffer.allocate(size)
236
- else:
237
- return buf
238
-
239
- def _free_up(self, size):
240
- """
241
- Attempt to ensure we have at least the requested number of bytes of
242
- memory for allocation by deallocating pooled buffers (if needed)
243
- """
244
- while self._free and self._available_memory < size:
245
- self._available_memory += self._free.pop().capacity
246
-
247
- def deallocate(self, buffer_, size=None):
248
- """
249
- Return buffers to the pool. If they are of the poolable size add them
250
- to the free list, otherwise just mark the memory as free.
251
-
252
- Arguments:
253
- buffer (io.BytesIO): The buffer to return
254
- size (int): The size of the buffer to mark as deallocated, note
255
- that this maybe smaller than buffer.capacity since the buffer
256
- may re-allocate itself during in-place compression
257
- """
258
- with self._lock:
259
- if size is None:
260
- size = buffer_.capacity
261
- if (size == self._poolable_size and size == buffer_.capacity):
262
- buffer_.seek(0)
263
- buffer_.truncate()
264
- self._free.append(buffer_)
265
- else:
266
- self._available_memory += size
267
-
268
- if self._waiters:
269
- more_mem = self._waiters[0]
270
- more_mem.notify()
271
-
272
- def available_memory(self):
273
- """The total free memory both unallocated and in the free list."""
274
- with self._lock:
275
- return self._available_memory + len(self._free) * self._poolable_size
276
-
277
- def unallocated_memory(self):
278
- """Get the unallocated memory (not in the free list or in use)."""
279
- with self._lock:
280
- return self._available_memory
281
-
282
- def queued(self):
283
- """The number of threads blocked waiting on memory."""
284
- with self._lock:
285
- return len(self._waiters)
286
-
287
- def poolable_size(self):
288
- """The buffer size that will be retained in the free list after use."""
289
- return self._poolable_size
290
-
291
- def total_memory(self):
292
- """The total memory managed by this pool."""
293
- return self._total_memory
294
- '''
0 commit comments