|
20 | 20 | import shutil
|
21 | 21 | import subprocess
|
22 | 22 | import sys
|
| 23 | +from multiprocessing import Pool, Lock, cpu_count |
23 | 24 | from contextlib import contextmanager
|
24 | 25 |
|
25 | 26 | from . import diagnostics
|
@@ -172,3 +173,70 @@ def copytree(src, dest, dry_run=None, echo=True):
|
172 | 173 | if dry_run:
|
173 | 174 | return
|
174 | 175 | shutil.copytree(src, dest)
|
| 176 | + |
| 177 | + |
| 178 | +def run(*args, **kwargs): |
| 179 | + repo_path = os.getcwd() |
| 180 | + echo_output = kwargs.pop('echo', False) |
| 181 | + dry_run = kwargs.pop('dry_run', False) |
| 182 | + env = kwargs.pop('env', None) |
| 183 | + allow_non_zero_exit = kwargs.pop('allow_non_zero_exit', False) |
| 184 | + if dry_run: |
| 185 | + _echo_command(dry_run, *args, env=env) |
| 186 | + return(None, 0, args) |
| 187 | + |
| 188 | + my_pipe = subprocess.Popen(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs) |
| 189 | + (stdout, stderr) = my_pipe.communicate() |
| 190 | + ret = my_pipe.wait() |
| 191 | + |
| 192 | + lock.acquire() |
| 193 | + if echo_output: |
| 194 | + print(repo_path) |
| 195 | + _echo_command(dry_run, *args, env=env) |
| 196 | + if stdout: |
| 197 | + print(stdout, end="") |
| 198 | + if stderr: |
| 199 | + print(stderr, end="") |
| 200 | + print() |
| 201 | + lock.release() |
| 202 | + |
| 203 | + if ret != 0: |
| 204 | + eout = Exception() |
| 205 | + eout.ret = ret |
| 206 | + eout.args = args |
| 207 | + eout.repo_path = repo_path |
| 208 | + eout.stderr = stderr |
| 209 | + raise eout |
| 210 | + return (stdout, 0, args) |
| 211 | + |
| 212 | + |
| 213 | +def run_parallel(fn, pool_args, n_processes=0): |
| 214 | + def init(l): |
| 215 | + global lock |
| 216 | + lock = l |
| 217 | + |
| 218 | + if n_processes == 0: |
| 219 | + n_processes = cpu_count() * 2 |
| 220 | + |
| 221 | + l = Lock() |
| 222 | + print("Running ``%s`` with up to %d processes." % (fn.__name__, n_processes)) |
| 223 | + pool = Pool(processes=n_processes, initializer=init, initargs=(l,)) |
| 224 | + results = pool.map_async(func=fn, iterable=pool_args).get(9999999) |
| 225 | + pool.close() |
| 226 | + pool.join() |
| 227 | + return results |
| 228 | + |
| 229 | + |
| 230 | +def check_parallel_results(results, op): |
| 231 | + fail_count = 0 |
| 232 | + if results is None: |
| 233 | + return 0 |
| 234 | + for r in results: |
| 235 | + if r is not None: |
| 236 | + if fail_count == 0: |
| 237 | + print("======%s FAILURES======" % op) |
| 238 | + print("%s failed (ret=%d): %s" % (r.repo_path, r.ret, r)) |
| 239 | + fail_count += 1 |
| 240 | + if r.stderr: |
| 241 | + print(r.stderr) |
| 242 | + return fail_count |
0 commit comments