Skip to content

Commit 7a3c7e5

Browse files
committed
Moved SharedMemoryManager to managers module, tweak to fragile test.
1 parent 05e26dd commit 7a3c7e5

File tree

4 files changed

+156
-147
lines changed

4 files changed

+156
-147
lines changed

Doc/library/multiprocessing.shared_memory.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ and management of shared memory to be accessed by one or more processes
2020
on a multicore or symmetric multiprocessor (SMP) machine. To assist with
2121
the life-cycle management of shared memory especially across distinct
2222
processes, a :class:`~multiprocessing.managers.BaseManager` subclass,
23-
:class:`SharedMemoryManager`, is also provided.
23+
:class:`SharedMemoryManager`, is also provided in the
24+
``multiprocessing.managers`` module.
2425

2526
In this module, shared memory refers to "System V style" shared memory blocks
2627
(though is not necessarily implemented explicitly as such) and does not refer

Lib/multiprocessing/managers.py

Lines changed: 149 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Module providing the `SyncManager` class for dealing
2+
# Module providing manager classes for dealing
33
# with shared objects
44
#
55
# multiprocessing/managers.py
@@ -8,7 +8,8 @@
88
# Licensed to PSF under a Contributor Agreement.
99
#
1010

11-
__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
11+
__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token',
12+
'SharedMemoryManager' ]
1213

1314
#
1415
# Imports
@@ -19,6 +20,7 @@
1920
import array
2021
import queue
2122
import time
23+
from os import getpid
2224

2325
from traceback import format_exc
2426

@@ -28,6 +30,11 @@
2830
from . import process
2931
from . import util
3032
from . import get_context
33+
try:
34+
from . import shared_memory
35+
HAS_SHMEM = True
36+
except ImportError:
37+
HAS_SHMEM = False
3138

3239
#
3340
# Register some things for pickling
@@ -1200,3 +1207,143 @@ class SyncManager(BaseManager):
12001207
# types returned by methods of PoolProxy
12011208
SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
12021209
SyncManager.register('AsyncResult', create_method=False)
1210+
1211+
#
1212+
# Definition of SharedMemoryManager and SharedMemoryServer
1213+
#
1214+
1215+
if HAS_SHMEM:
1216+
class _SharedMemoryTracker:
1217+
"Manages one or more shared memory segments."
1218+
1219+
def __init__(self, name, segment_names=[]):
1220+
self.shared_memory_context_name = name
1221+
self.segment_names = segment_names
1222+
1223+
def register_segment(self, segment_name):
1224+
"Adds the supplied shared memory block name to tracker."
1225+
util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1226+
self.segment_names.append(segment_name)
1227+
1228+
def destroy_segment(self, segment_name):
1229+
"""Calls unlink() on the shared memory block with the supplied name
1230+
and removes it from the list of blocks being tracked."""
1231+
util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1232+
self.segment_names.remove(segment_name)
1233+
segment = shared_memory.SharedMemory(segment_name)
1234+
segment.close()
1235+
segment.unlink()
1236+
1237+
def unlink(self):
1238+
"Calls destroy_segment() on all tracked shared memory blocks."
1239+
for segment_name in self.segment_names[:]:
1240+
self.destroy_segment(segment_name)
1241+
1242+
def __del__(self):
1243+
util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1244+
self.unlink()
1245+
1246+
def __getstate__(self):
1247+
return (self.shared_memory_context_name, self.segment_names)
1248+
1249+
def __setstate__(self, state):
1250+
self.__init__(*state)
1251+
1252+
1253+
class SharedMemoryServer(Server):
1254+
1255+
public = Server.public + \
1256+
['track_segment', 'release_segment', 'list_segments']
1257+
1258+
def __init__(self, *args, **kwargs):
1259+
Server.__init__(self, *args, **kwargs)
1260+
self.shared_memory_context = \
1261+
_SharedMemoryTracker(f"shmm_{self.address}_{getpid()}")
1262+
util.debug(f"SharedMemoryServer started by pid {getpid()}")
1263+
1264+
def create(self, c, typeid, *args, **kwargs):
1265+
"""Create a new distributed-shared object (not backed by a shared
1266+
memory block) and return its id to be used in a Proxy Object."""
1267+
# Unless set up as a shared proxy, don't make shared_memory_context
1268+
# a standard part of kwargs. This makes things easier for supplying
1269+
# simple functions.
1270+
if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1271+
kwargs['shared_memory_context'] = self.shared_memory_context
1272+
return Server.create(self, c, typeid, *args, **kwargs)
1273+
1274+
def shutdown(self, c):
1275+
"Call unlink() on all tracked shared memory, terminate the Server."
1276+
self.shared_memory_context.unlink()
1277+
return Server.shutdown(self, c)
1278+
1279+
def track_segment(self, c, segment_name):
1280+
"Adds the supplied shared memory block name to Server's tracker."
1281+
self.shared_memory_context.register_segment(segment_name)
1282+
1283+
def release_segment(self, c, segment_name):
1284+
"""Calls unlink() on the shared memory block with the supplied name
1285+
and removes it from the tracker instance inside the Server."""
1286+
self.shared_memory_context.destroy_segment(segment_name)
1287+
1288+
def list_segments(self, c):
1289+
"""Returns a list of names of shared memory blocks that the Server
1290+
is currently tracking."""
1291+
return self.shared_memory_context.segment_names
1292+
1293+
1294+
class SharedMemoryManager(BaseManager):
1295+
"""Like SyncManager but uses SharedMemoryServer instead of Server.
1296+
1297+
It provides methods for creating and returning SharedMemory instances
1298+
and for creating a list-like object (ShareableList) backed by shared
1299+
memory. It also provides methods that create and return Proxy Objects
1300+
that support synchronization across processes (i.e. multi-process-safe
1301+
locks and semaphores).
1302+
"""
1303+
1304+
_Server = SharedMemoryServer
1305+
1306+
def __init__(self, *args, **kwargs):
1307+
BaseManager.__init__(self, *args, **kwargs)
1308+
util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1309+
1310+
def __del__(self):
1311+
util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1312+
pass
1313+
1314+
def get_server(self):
1315+
'Better than monkeypatching for now; merge into Server ultimately'
1316+
if self._state.value != State.INITIAL:
1317+
if self._state.value == State.STARTED:
1318+
raise ProcessError("Already started SharedMemoryServer")
1319+
elif self._state.value == State.SHUTDOWN:
1320+
raise ProcessError("SharedMemoryManager has shut down")
1321+
else:
1322+
raise ProcessError(
1323+
"Unknown state {!r}".format(self._state.value))
1324+
return self._Server(self._registry, self._address,
1325+
self._authkey, self._serializer)
1326+
1327+
def SharedMemory(self, size):
1328+
"""Returns a new SharedMemory instance with the specified size in
1329+
bytes, to be tracked by the manager."""
1330+
with self._Client(self._address, authkey=self._authkey) as conn:
1331+
sms = shared_memory.SharedMemory(None, create=True, size=size)
1332+
try:
1333+
dispatch(conn, None, 'track_segment', (sms.name,))
1334+
except BaseException as e:
1335+
sms.unlink()
1336+
raise e
1337+
return sms
1338+
1339+
def ShareableList(self, sequence):
1340+
"""Returns a new ShareableList instance populated with the values
1341+
from the input sequence, to be tracked by the manager."""
1342+
with self._Client(self._address, authkey=self._authkey) as conn:
1343+
sl = shared_memory.ShareableList(sequence)
1344+
try:
1345+
dispatch(conn, None, 'track_segment', (sl.shm.name,))
1346+
except BaseException as e:
1347+
sl.shm.unlink()
1348+
raise e
1349+
return sl

Lib/multiprocessing/shared_memory.py

Lines changed: 2 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,11 @@
55
"""
66

77

8-
__all__ = [ 'SharedMemory', 'ShareableList',
9-
'SharedMemoryServer', 'SharedMemoryManager' ]
8+
__all__ = [ 'SharedMemory', 'ShareableList' ]
109

1110

1211
from functools import partial
1312
import mmap
14-
from .managers import dispatch, BaseManager, Server, State, ProcessError
15-
from . import util
1613
import os
1714
import errno
1815
import struct
@@ -53,7 +50,7 @@ class SharedMemory:
5350
shared memory block.
5451
5552
Every shared memory block is assigned a unique name. This enables
56-
one process to create a shared memory block with a particular name
53+
one process to create a shared memory block with a particular name
5754
so that a different process can attach to that same shared memory
5855
block using that same name.
5956
@@ -500,139 +497,3 @@ def index(self, value):
500497
return position
501498
else:
502499
raise ValueError(f"{value!r} not in this container")
503-
504-
505-
class _SharedMemoryTracker:
506-
"Manages one or more shared memory segments."
507-
508-
def __init__(self, name, segment_names=[]):
509-
self.shared_memory_context_name = name
510-
self.segment_names = segment_names
511-
512-
def register_segment(self, segment_name):
513-
"Adds the supplied shared memory block name to tracker."
514-
util.debug(f"Registering segment {segment_name!r} in pid {os.getpid()}")
515-
self.segment_names.append(segment_name)
516-
517-
def destroy_segment(self, segment_name):
518-
"""Calls unlink() on the shared memory block with the supplied name
519-
and removes it from the list of blocks being tracked."""
520-
util.debug(f"Destroying segment {segment_name!r} in pid {os.getpid()}")
521-
self.segment_names.remove(segment_name)
522-
segment = SharedMemory(segment_name)
523-
segment.close()
524-
segment.unlink()
525-
526-
def unlink(self):
527-
"Calls destroy_segment() on all currently tracked shared memory blocks."
528-
for segment_name in self.segment_names[:]:
529-
self.destroy_segment(segment_name)
530-
531-
def __del__(self):
532-
util.debug(f"Called {self.__class__.__name__}.__del__ in {os.getpid()}")
533-
self.unlink()
534-
535-
def __getstate__(self):
536-
return (self.shared_memory_context_name, self.segment_names)
537-
538-
def __setstate__(self, state):
539-
self.__init__(*state)
540-
541-
542-
class SharedMemoryServer(Server):
543-
544-
public = Server.public + \
545-
['track_segment', 'release_segment', 'list_segments']
546-
547-
def __init__(self, *args, **kwargs):
548-
Server.__init__(self, *args, **kwargs)
549-
self.shared_memory_context = \
550-
_SharedMemoryTracker(f"shmm_{self.address}_{os.getpid()}")
551-
util.debug(f"SharedMemoryServer started by pid {os.getpid()}")
552-
553-
def create(self, c, typeid, *args, **kwargs):
554-
"""Create a new distributed-shared object (not backed by a shared
555-
memory block) and return its id to be used in a Proxy Object."""
556-
# Unless set up as a shared proxy, don't make shared_memory_context
557-
# a standard part of kwargs. This makes things easier for supplying
558-
# simple functions.
559-
if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
560-
kwargs['shared_memory_context'] = self.shared_memory_context
561-
return Server.create(self, c, typeid, *args, **kwargs)
562-
563-
def shutdown(self, c):
564-
"Call unlink() on all tracked shared memory then terminate the Server."
565-
self.shared_memory_context.unlink()
566-
return Server.shutdown(self, c)
567-
568-
def track_segment(self, c, segment_name):
569-
"Adds the supplied shared memory block name to Server's tracker."
570-
self.shared_memory_context.register_segment(segment_name)
571-
572-
def release_segment(self, c, segment_name):
573-
"""Calls unlink() on the shared memory block with the supplied name
574-
and removes it from the tracker instance inside the Server."""
575-
self.shared_memory_context.destroy_segment(segment_name)
576-
577-
def list_segments(self, c):
578-
"""Returns a list of names of shared memory blocks that the Server
579-
is currently tracking."""
580-
return self.shared_memory_context.segment_names
581-
582-
583-
class SharedMemoryManager(BaseManager):
584-
"""Like SyncManager but uses SharedMemoryServer instead of Server.
585-
586-
It provides methods for creating and returning SharedMemory instances
587-
and for creating a list-like object (ShareableList) backed by shared
588-
memory. It also provides methods that create and return Proxy Objects
589-
that support synchronization across processes (i.e. multi-process-safe
590-
locks and semaphores).
591-
"""
592-
593-
_Server = SharedMemoryServer
594-
595-
def __init__(self, *args, **kwargs):
596-
BaseManager.__init__(self, *args, **kwargs)
597-
util.debug(f"{self.__class__.__name__} created by pid {os.getpid()}")
598-
599-
def __del__(self):
600-
util.debug(f"{self.__class__.__name__} told die by pid {os.getpid()}")
601-
pass
602-
603-
def get_server(self):
604-
'Better than monkeypatching for now; merge into Server ultimately'
605-
if self._state.value != State.INITIAL:
606-
if self._state.value == State.STARTED:
607-
raise ProcessError("Already started SharedMemoryServer")
608-
elif self._state.value == State.SHUTDOWN:
609-
raise ProcessError("SharedMemoryManager has shut down")
610-
else:
611-
raise ProcessError(
612-
"Unknown state {!r}".format(self._state.value))
613-
return self._Server(self._registry, self._address,
614-
self._authkey, self._serializer)
615-
616-
def SharedMemory(self, size):
617-
"""Returns a new SharedMemory instance with the specified size in
618-
bytes, to be tracked by the manager."""
619-
with self._Client(self._address, authkey=self._authkey) as conn:
620-
sms = SharedMemory(None, create=True, size=size)
621-
try:
622-
dispatch(conn, None, 'track_segment', (sms.name,))
623-
except BaseException as e:
624-
sms.unlink()
625-
raise e
626-
return sms
627-
628-
def ShareableList(self, sequence):
629-
"""Returns a new ShareableList instance populated with the values
630-
from the input sequence, to be tracked by the manager."""
631-
with self._Client(self._address, authkey=self._authkey) as conn:
632-
sl = ShareableList(sequence)
633-
try:
634-
dispatch(conn, None, 'track_segment', (sl.shm.name,))
635-
except BaseException as e:
636-
sl.shm.unlink()
637-
raise e
638-
return sl

Lib/test/_test_multiprocessing.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3652,7 +3652,7 @@ def test_shared_memory_basics(self):
36523652

36533653
# Attach to existing shared memory segment but specify a new size.
36543654
same_sms = shared_memory.SharedMemory('test01_tsmb', size=20*sms.size)
3655-
self.assertEqual(same_sms.size, sms.size) # Size was ignored.
3655+
self.assertLess(same_sms.size, 20*sms.size) # Size was ignored.
36563656
same_sms.close()
36573657

36583658
if shared_memory._USE_POSIX:
@@ -3737,7 +3737,7 @@ def test_shared_memory_across_processes(self):
37373737
sms.close()
37383738

37393739
def test_shared_memory_SharedMemoryManager_basics(self):
3740-
smm1 = shared_memory.SharedMemoryManager()
3740+
smm1 = multiprocessing.managers.SharedMemoryManager()
37413741
with self.assertRaises(ValueError):
37423742
smm1.SharedMemory(size=9) # Fails if SharedMemoryServer not started
37433743
smm1.start()
@@ -3756,7 +3756,7 @@ def test_shared_memory_SharedMemoryManager_basics(self):
37563756
# No longer there to be attached to again.
37573757
absent_shm = shared_memory.SharedMemory(name=held_name)
37583758

3759-
with shared_memory.SharedMemoryManager() as smm2:
3759+
with multiprocessing.managers.SharedMemoryManager() as smm2:
37603760
sl = smm2.ShareableList("howdy")
37613761
shm = smm2.SharedMemory(size=128)
37623762
held_name = sl.shm.name

0 commit comments

Comments
 (0)