[diffoscope] 01/03: parallel: adds comparison_pool class

Juliana Oliveira jwnx-guest at moszumanska.debian.org
Fri Mar 9 05:11:33 CET 2018


This is an automated email from the git hooks/post-receive script.

jwnx-guest pushed a commit to branch jwnx_parallel_diffoscope
in repository diffoscope.

commit 9bf20f0ff516ed66294ffdc723e098849c168506
Author: Juliana Oliveira <juliana.orod at gmail.com>
Date:   Tue Jan 9 00:56:36 2018 -0200

    parallel: adds comparison_pool class
    
    ComparisonPool aims to centralize the logic behind pool and process
    handling. It has a .map method, used to parallelize a given command
    between its pools.
    
    So far, ComparisonPool's pool size defaults to cpu_count().
    
    Signed-off-by: Juliana Oliveira <juliana.orod at gmail.com>
---
 diffoscope/parallel/__init__.py        |  0
 diffoscope/parallel/comparison_pool.py | 56 ++++++++++++++++++++++++++++++++++
 2 files changed, 56 insertions(+)

diff --git a/diffoscope/parallel/__init__.py b/diffoscope/parallel/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/diffoscope/parallel/comparison_pool.py b/diffoscope/parallel/comparison_pool.py
new file mode 100644
index 0000000..72f98eb
--- /dev/null
+++ b/diffoscope/parallel/comparison_pool.py
@@ -0,0 +1,56 @@
+import logging
+import dill
+
+from multiprocess import Pool
+from diffoscope.config import Config
+from functools import partial
+from pickle import PickleError
+
+
+logger = logging.getLogger(__name__)
+
+
+class CommandFailedToExecute(Exception):
+    def __init__(self, err):
+        self.err = err
+    def __str__(self):
+        return repr(self.err)
+
+
+class ComparisonPool(object):
+
+  def __init__(self):
+    self._pool = Pool()
+    logger.debug("ComparisonPool initialized.")
+
+
+  def map(self, fun, args=[], callback=None):
+    logger.debug("Invoking parallel map for function %s", fun)
+
+    pool = self._pool
+    jobs = []
+
+    def _callback(result, index):
+      callback[index] = result
+
+
+    for index, arg in enumerate(args):
+      logger.debug("Adding new process for %s (%d: %s)", fun, index, arg)
+      new_callback = partial(_callback, index=index)
+      jobs.append(pool.apply_async(fun, args=(arg,), callback= new_callback))
+
+    for job in jobs:
+      try:
+        job.get()
+      except PickleError as e:
+        raise CommandFailedToExecute(e)
+
+      if not job.successful():
+        raise CommandFailedToExecute(job._err_callback)
+
+    logger.debug("Closing Pools")
+    pool.close()
+    pool.join()
+    jobs = None
+    logger.debug("Ending ComparisonPool.map")
+

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/reproducible/diffoscope.git


More information about the diffoscope mailing list