Skip to content

Commit e4679cd

Browse files
authored
bpo-32759: Free unused arenas in multiprocessing.heap (GH-5827)
Large shared arrays allocated using multiprocessing would remain allocated until the process ends.
1 parent 2ef65f3 commit e4679cd

File tree

3 files changed

+150
-46
lines changed

3 files changed

+150
-46
lines changed

Lib/multiprocessing/heap.py

Lines changed: 91 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#
99

1010
import bisect
11+
from collections import defaultdict
1112
import mmap
1213
import os
1314
import sys
@@ -28,6 +29,9 @@
2829
import _winapi
2930

3031
class Arena(object):
32+
"""
33+
A shared memory area backed by anonymous memory (Windows).
34+
"""
3135

3236
_rand = tempfile._RandomNameSequence()
3337

@@ -52,6 +56,7 @@ def __getstate__(self):
5256

5357
def __setstate__(self, state):
5458
self.size, self.name = self._state = state
59+
# Reopen existing mmap
5560
self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
5661
# XXX Temporarily preventing buildbot failures while determining
5762
# XXX the correct long-term fix. See issue 23060
@@ -60,6 +65,10 @@ def __setstate__(self, state):
6065
else:
6166

6267
class Arena(object):
68+
"""
69+
A shared memory area backed by a temporary file (POSIX).
70+
"""
71+
6372
if sys.platform == 'linux':
6473
_dir_candidates = ['/dev/shm']
6574
else:
@@ -69,6 +78,8 @@ def __init__(self, size, fd=-1):
6978
self.size = size
7079
self.fd = fd
7180
if fd == -1:
81+
# Arena is created anew (if fd != -1, it means we're coming
82+
# from rebuild_arena() below)
7283
self.fd, name = tempfile.mkstemp(
7384
prefix='pym-%d-'%os.getpid(),
7485
dir=self._choose_dir(size))
@@ -103,37 +114,82 @@ def rebuild_arena(size, dupfd):
103114

104115
class Heap(object):
105116

117+
# Minimum malloc() alignment
106118
_alignment = 8
107119

120+
_DISCARD_FREE_SPACE_LARGER_THAN = 4 * 1024 ** 2 # 4 MB
121+
_DOUBLE_ARENA_SIZE_UNTIL = 4 * 1024 ** 2
122+
108123
def __init__(self, size=mmap.PAGESIZE):
109124
self._lastpid = os.getpid()
110125
self._lock = threading.Lock()
126+
# Current arena allocation size
111127
self._size = size
128+
# A sorted list of available block sizes in arenas
112129
self._lengths = []
130+
131+
# Free block management:
132+
# - map each block size to a list of `(Arena, start, stop)` blocks
113133
self._len_to_seq = {}
134+
# - map `(Arena, start)` tuple to the `(Arena, start, stop)` block
135+
# starting at that offset
114136
self._start_to_block = {}
137+
# - map `(Arena, stop)` tuple to the `(Arena, start, stop)` block
138+
# ending at that offset
115139
self._stop_to_block = {}
116-
self._allocated_blocks = set()
140+
141+
# Map arenas to their `(Arena, start, stop)` blocks in use
142+
self._allocated_blocks = defaultdict(set)
117143
self._arenas = []
118-
# list of pending blocks to free - see free() comment below
144+
145+
# List of pending blocks to free - see comment in free() below
119146
self._pending_free_blocks = []
120147

148+
# Statistics
149+
self._n_mallocs = 0
150+
self._n_frees = 0
151+
121152
@staticmethod
122153
def _roundup(n, alignment):
123154
# alignment must be a power of 2
124155
mask = alignment - 1
125156
return (n + mask) & ~mask
126157

158+
def _new_arena(self, size):
159+
# Create a new arena with at least the given *size*
160+
length = self._roundup(max(self._size, size), mmap.PAGESIZE)
161+
# We carve larger and larger arenas, for efficiency, until we
162+
# reach a large-ish size (roughly L3 cache-sized)
163+
if self._size < self._DOUBLE_ARENA_SIZE_UNTIL:
164+
self._size *= 2
165+
util.info('allocating a new mmap of length %d', length)
166+
arena = Arena(length)
167+
self._arenas.append(arena)
168+
return (arena, 0, length)
169+
170+
def _discard_arena(self, arena):
171+
# Possibly delete the given (unused) arena
172+
length = arena.size
173+
# Reusing an existing arena is faster than creating a new one, so
174+
# we only reclaim space if it's large enough.
175+
if length < self._DISCARD_FREE_SPACE_LARGER_THAN:
176+
return
177+
blocks = self._allocated_blocks.pop(arena)
178+
assert not blocks
179+
del self._start_to_block[(arena, 0)]
180+
del self._stop_to_block[(arena, length)]
181+
self._arenas.remove(arena)
182+
seq = self._len_to_seq[length]
183+
seq.remove((arena, 0, length))
184+
if not seq:
185+
del self._len_to_seq[length]
186+
self._lengths.remove(length)
187+
127188
def _malloc(self, size):
128189
# returns a large enough block -- it might be much larger
129190
i = bisect.bisect_left(self._lengths, size)
130191
if i == len(self._lengths):
131-
length = self._roundup(max(self._size, size), mmap.PAGESIZE)
132-
self._size *= 2
133-
util.info('allocating a new mmap of length %d', length)
134-
arena = Arena(length)
135-
self._arenas.append(arena)
136-
return (arena, 0, length)
192+
return self._new_arena(size)
137193
else:
138194
length = self._lengths[i]
139195
seq = self._len_to_seq[length]
@@ -146,8 +202,8 @@ def _malloc(self, size):
146202
del self._stop_to_block[(arena, stop)]
147203
return block
148204

149-
def _free(self, block):
150-
# free location and try to merge with neighbours
205+
def _add_free_block(self, block):
206+
# make block available and try to merge with its neighbours in the arena
151207
(arena, start, stop) = block
152208

153209
try:
@@ -191,15 +247,23 @@ def _absorb(self, block):
191247

192248
return start, stop
193249

250+
def _remove_allocated_block(self, block):
251+
arena, start, stop = block
252+
blocks = self._allocated_blocks[arena]
253+
blocks.remove((start, stop))
254+
if not blocks:
255+
# Arena is entirely free, discard it from this process
256+
self._discard_arena(arena)
257+
194258
def _free_pending_blocks(self):
195259
# Free all the blocks in the pending list - called with the lock held.
196260
while True:
197261
try:
198262
block = self._pending_free_blocks.pop()
199263
except IndexError:
200264
break
201-
self._allocated_blocks.remove(block)
202-
self._free(block)
265+
self._add_free_block(block)
266+
self._remove_allocated_block(block)
203267

204268
def free(self, block):
205269
# free a block returned by malloc()
@@ -210,7 +274,7 @@ def free(self, block):
210274
# immediately, the block is added to a list of blocks to be freed
211275
# synchronously sometimes later from malloc() or free(), by calling
212276
# _free_pending_blocks() (appending and retrieving from a list is not
213-
# strictly thread-safe but under cPython it's atomic thanks to the GIL).
277+
# strictly thread-safe but under CPython it's atomic thanks to the GIL).
214278
if os.getpid() != self._lastpid:
215279
raise ValueError(
216280
"My pid ({0:n}) is not last pid {1:n}".format(
@@ -222,9 +286,10 @@ def free(self, block):
222286
else:
223287
# we hold the lock
224288
try:
289+
self._n_frees += 1
225290
self._free_pending_blocks()
226-
self._allocated_blocks.remove(block)
227-
self._free(block)
291+
self._add_free_block(block)
292+
self._remove_allocated_block(block)
228293
finally:
229294
self._lock.release()
230295

@@ -237,18 +302,21 @@ def malloc(self, size):
237302
if os.getpid() != self._lastpid:
238303
self.__init__() # reinitialize after fork
239304
with self._lock:
305+
self._n_mallocs += 1
306+
# allow pending blocks to be marked available
240307
self._free_pending_blocks()
241-
size = self._roundup(max(size,1), self._alignment)
308+
size = self._roundup(max(size, 1), self._alignment)
242309
(arena, start, stop) = self._malloc(size)
243-
new_stop = start + size
244-
if new_stop < stop:
245-
self._free((arena, new_stop, stop))
246-
block = (arena, start, new_stop)
247-
self._allocated_blocks.add(block)
248-
return block
310+
real_stop = start + size
311+
if real_stop < stop:
312+
# if the returned block is larger than necessary, mark
313+
# the remainder available
314+
self._add_free_block((arena, real_stop, stop))
315+
self._allocated_blocks[arena].add((start, real_stop))
316+
return (arena, start, real_stop)
249317

250318
#
251-
# Class representing a chunk of an mmap -- can be inherited by child process
319+
# Class wrapping a block allocated out of a Heap -- can be inherited by child process
252320
#
253321

254322
class BufferWrapper(object):

Lib/test/_test_multiprocessing.py

Lines changed: 58 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3372,11 +3372,25 @@ class _TestHeap(BaseTestCase):
33723372

33733373
ALLOWED_TYPES = ('processes',)
33743374

3375+
def setUp(self):
3376+
super().setUp()
3377+
# Make pristine heap for these tests
3378+
self.old_heap = multiprocessing.heap.BufferWrapper._heap
3379+
multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap()
3380+
3381+
def tearDown(self):
3382+
multiprocessing.heap.BufferWrapper._heap = self.old_heap
3383+
super().tearDown()
3384+
33753385
def test_heap(self):
33763386
iterations = 5000
33773387
maxblocks = 50
33783388
blocks = []
33793389

3390+
# get the heap object
3391+
heap = multiprocessing.heap.BufferWrapper._heap
3392+
heap._DISCARD_FREE_SPACE_LARGER_THAN = 0
3393+
33803394
# create and destroy lots of blocks of different sizes
33813395
for i in range(iterations):
33823396
size = int(random.lognormvariate(0, 1) * 1000)
@@ -3385,31 +3399,52 @@ def test_heap(self):
33853399
if len(blocks) > maxblocks:
33863400
i = random.randrange(maxblocks)
33873401
del blocks[i]
3388-
3389-
# get the heap object
3390-
heap = multiprocessing.heap.BufferWrapper._heap
3402+
del b
33913403

33923404
# verify the state of the heap
3393-
all = []
3394-
occupied = 0
3395-
heap._lock.acquire()
3396-
self.addCleanup(heap._lock.release)
3397-
for L in list(heap._len_to_seq.values()):
3398-
for arena, start, stop in L:
3399-
all.append((heap._arenas.index(arena), start, stop,
3400-
stop-start, 'free'))
3401-
for arena, start, stop in heap._allocated_blocks:
3402-
all.append((heap._arenas.index(arena), start, stop,
3403-
stop-start, 'occupied'))
3404-
occupied += (stop-start)
3405-
3406-
all.sort()
3407-
3408-
for i in range(len(all)-1):
3409-
(arena, start, stop) = all[i][:3]
3410-
(narena, nstart, nstop) = all[i+1][:3]
3411-
self.assertTrue((arena != narena and nstart == 0) or
3412-
(stop == nstart))
3405+
with heap._lock:
3406+
all = []
3407+
free = 0
3408+
occupied = 0
3409+
for L in list(heap._len_to_seq.values()):
3410+
# count all free blocks in arenas
3411+
for arena, start, stop in L:
3412+
all.append((heap._arenas.index(arena), start, stop,
3413+
stop-start, 'free'))
3414+
free += (stop-start)
3415+
for arena, arena_blocks in heap._allocated_blocks.items():
3416+
# count all allocated blocks in arenas
3417+
for start, stop in arena_blocks:
3418+
all.append((heap._arenas.index(arena), start, stop,
3419+
stop-start, 'occupied'))
3420+
occupied += (stop-start)
3421+
3422+
self.assertEqual(free + occupied,
3423+
sum(arena.size for arena in heap._arenas))
3424+
3425+
all.sort()
3426+
3427+
for i in range(len(all)-1):
3428+
(arena, start, stop) = all[i][:3]
3429+
(narena, nstart, nstop) = all[i+1][:3]
3430+
if arena != narena:
3431+
# Two different arenas
3432+
self.assertEqual(stop, heap._arenas[arena].size) # last block
3433+
self.assertEqual(nstart, 0) # first block
3434+
else:
3435+
# Same arena: two adjacent blocks
3436+
self.assertEqual(stop, nstart)
3437+
3438+
# test free'ing all blocks
3439+
random.shuffle(blocks)
3440+
while blocks:
3441+
blocks.pop()
3442+
3443+
self.assertEqual(heap._n_frees, heap._n_mallocs)
3444+
self.assertEqual(len(heap._pending_free_blocks), 0)
3445+
self.assertEqual(len(heap._arenas), 0)
3446+
self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks)
3447+
self.assertEqual(len(heap._len_to_seq), 0)
34133448

34143449
def test_free_from_gc(self):
34153450
# Check that freeing of blocks by the garbage collector doesn't deadlock
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Free unused arenas in multiprocessing.heap.

0 commit comments

Comments
 (0)