[diffoscope] 01/03: parallel: adds comparison_pool class
Juliana Oliveira
jwnx-guest at moszumanska.debian.org
Fri Mar 9 05:11:33 CET 2018
This is an automated email from the git hooks/post-receive script.
jwnx-guest pushed a commit to branch jwnx_parallel_diffoscope
in repository diffoscope.
commit 9bf20f0ff516ed66294ffdc723e098849c168506
Author: Juliana Oliveira <juliana.orod at gmail.com>
Date: Tue Jan 9 00:56:36 2018 -0200
parallel: adds comparison_pool class
ComparisonPool aims to centralize the logic behind pool and process
handling. It has a .map method, used to parallelize a given command
between its pools.
So far, ComparisonPool's pool size defaults to cpu_count().
Signed-off-by: Juliana Oliveira <juliana.orod at gmail.com>
---
diffoscope/parallel/__init__.py | 0
diffoscope/parallel/comparison_pool.py | 56 ++++++++++++++++++++++++++++++++++
2 files changed, 56 insertions(+)
diff --git a/diffoscope/parallel/__init__.py b/diffoscope/parallel/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/diffoscope/parallel/comparison_pool.py b/diffoscope/parallel/comparison_pool.py
new file mode 100644
index 0000000..72f98eb
--- /dev/null
+++ b/diffoscope/parallel/comparison_pool.py
@@ -0,0 +1,56 @@
+import logging
+import dill
+
+from multiprocess import Pool
+from diffoscope.config import Config
+from functools import partial
+from pickle import PickleError
+
+
+logger = logging.getLogger(__name__)
+
+
+class CommandFailedToExecute(Exception):
+ def __init__(self, err):
+ self.err = err
+ def __str__(self):
+ return repr(self.err)
+
+
+class ComparisonPool(object):
+
+ def __init__(self):
+ self._pool = Pool()
+ logger.debug("ComparisonPool initialized.")
+
+
+ def map(self, fun, args=[], callback=None):
+ logger.debug("Invoking parallel map for function %s", fun)
+
+ pool = self._pool
+ jobs = []
+
+ def _callback(result, index):
+ callback[index] = result
+
+
+ for index, arg in enumerate(args):
+ logger.debug("Adding new process for %s (%d: %s)", fun, index, arg)
+ new_callback = partial(_callback, index=index)
+ jobs.append(pool.apply_async(fun, args=(arg,), callback= new_callback))
+
+ for job in jobs:
+ try:
+ job.get()
+ except PickleError as e:
+ raise CommandFailedToExecute(e)
+
+ if not job.successful():
+ raise CommandFailedToExecute(job._err_callback)
+
+ logger.debug("Closing Pools")
+ pool.close()
+ pool.join()
+ jobs = None
+ logger.debug("Ending ComparisonPool.map")
+
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/reproducible/diffoscope.git
More information about the diffoscope
mailing list