Skip to content

Change variable names #3971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 17, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions pymc3/sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ def __init__(self, steppers, parallelize, progressbar=True):
"""
self.nchains = len(steppers)
self.is_parallelized = False
self._master_ends = []
self._primary_ends = []
self._processes = []
self._steppers = steppers
if parallelize:
Expand All @@ -1017,11 +1017,11 @@ def __init__(self, steppers, parallelize, progressbar=True):
for c, stepper in (
enumerate(progress_bar(steppers)) if progressbar else enumerate(steppers)
):
slave_end, master_end = multiprocessing.Pipe()
secondary_end, primary_end = multiprocessing.Pipe()
stepper_dumps = pickle.dumps(stepper, protocol=4)
process = multiprocessing.Process(
target=self.__class__._run_slave,
args=(c, stepper_dumps, slave_end),
target=self.__class__._run_secondary,
args=(c, stepper_dumps, secondary_end),
name="ChainWalker{}".format(c),
)
# we want the child process to exit if the parent is terminated
Expand All @@ -1030,7 +1030,7 @@ def __init__(self, steppers, parallelize, progressbar=True):
# By doing it in the constructor, the sampling progress bar
# will not be confused by the process start.
process.start()
self._master_ends.append(master_end)
self._primary_ends.append(primary_end)
self._processes.append(process)
self.is_parallelized = True
except Exception:
Expand All @@ -1053,16 +1053,16 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
if len(self._processes) > 0:
try:
for master_end in self._master_ends:
master_end.send(None)
for primary_end in self._primary_ends:
primary_end.send(None)
for process in self._processes:
process.join(timeout=3)
except Exception:
_log.warning("Termination failed.")
return

@staticmethod
def _run_slave(c, stepper_dumps, slave_end):
def _run_secondary(c, stepper_dumps, secondary_end):
"""This method is started on a separate process to perform stepping of a chain.

Parameters
Expand All @@ -1071,7 +1071,7 @@ def _run_slave(c, stepper_dumps, slave_end):
number of this chain
stepper : BlockedStep
a step method such as CompoundStep
slave_end : multiprocessing.connection.PipeConnection
secondary_end : multiprocessing.connection.PipeConnection
This is our connection to the main process
"""
# re-seed each child process to make them unique
Expand All @@ -1086,7 +1086,7 @@ def _run_slave(c, stepper_dumps, slave_end):
if isinstance(sm, arraystep.PopulationArrayStepShared):
population_steppers.append(sm)
while True:
incoming = slave_end.recv()
incoming = secondary_end.recv()
# receiving a None is the signal to exit
if incoming is None:
break
Expand All @@ -1099,7 +1099,7 @@ def _run_slave(c, stepper_dumps, slave_end):
for popstep in population_steppers:
popstep.population = population
update = stepper.step(population[c])
slave_end.send(update)
secondary_end.send(update)
except Exception:
_log.exception("ChainWalker{}".format(c))
return
Expand All @@ -1122,10 +1122,10 @@ def step(self, tune_stop, population):
updates = [None] * self.nchains
if self.is_parallelized:
for c in range(self.nchains):
self._master_ends[c].send((tune_stop, population))
self._primary_ends[c].send((tune_stop, population))
# Blockingly get the step outcomes
for c in range(self.nchains):
updates[c] = self._master_ends[c].recv()
updates[c] = self._primary_ends[c].recv()
else:
for c in range(self.nchains):
if tune_stop:
Expand Down