[diffoscope] 06/10: parallel: comparison_pool: reworks map function

Juliana Oliveira jwnx-guest at moszumanska.debian.org
Sun Jan 14 21:04:35 CET 2018


This is an automated email from the git hooks/post-receive script.

jwnx-guest pushed a commit to branch jwnx_parallel_diffoscope
in repository diffoscope.

commit c28f09815b55a766418f0b335834189c54de6cec
Author: Juliana Oliveira <juliana.orod at gmail.com>
Date:   Sun Jan 14 16:51:55 2018 -0200

    parallel: comparison_pool: reworks map function
    
    ComparisonPool.map function aims to simulate multiprocessing.map
    function while better handling serialization and asyncronously
    processing. Async processing is prefered in this case because it unlocks
    file descriptors on-the-go, avoiding recursion errors.
    
    Each process is spawned by a apply_async function and added to a
    callback dictionary, where it can be orderly retrieved later. If a function
    fails to execute, a CommandFailedToExecute exception is raised.
    
    Signed-off-by: Juliana Oliveira <juliana.orod at gmail.com>
---
 diffoscope/parallel/comparison_pool.py | 45 +++++++++++++++++++++++++---------
 1 file changed, 33 insertions(+), 12 deletions(-)

diff --git a/diffoscope/parallel/comparison_pool.py b/diffoscope/parallel/comparison_pool.py
index 43c7594..7d72a0a 100644
--- a/diffoscope/parallel/comparison_pool.py
+++ b/diffoscope/parallel/comparison_pool.py
@@ -1,36 +1,57 @@
 import logging
 import dill
 
-from pathos.multiprocessing import ProcessingPool as Pool
+from multiprocess import Pool
 from diffoscope.config import Config
-from diffoscope.parallel.thread_manager import ThreadManager
+from functools import partial
+from pickle import PickleError
 
 
 logger = logging.getLogger(__name__)
 
 
+class CommandFailedToExecute(Exception):
+    def __init__(self, err):
+        self.err = err
+    def __str__(self):
+        return repr(self.err)
+
+
 class ComparisonPool(object):
 
   def __init__(self):
     self._pool_size = Config().jobs
-    self._pool = Pool(nodes=self._pool_size)
+    self._pool = Pool(self._pool_size)
     logger.debug("ComparisonPool initialized. Pool size: %d", self._pool_size)
 
 
-  def map(self, fun, args=[]):
+  def map(self, fun, args=[], callback=None):
     logger.debug("Invoking parallel map for function %s", fun)
 
-    threads = ThreadManager()
+    pool = self._pool
+    jobs = []
+
+    def _callback(result, index):
+      callback[index] = result
 
-    for arg in args:
-      threads.new(fun, arg)
 
-    threads.join()
+    for index, arg in enumerate(args):
+      logger.debug("Adding new process for %s (%d: %s)", fun, index, arg)
+      new_callback = partial(_callback, index=index)
+      jobs.append(pool.apply_async(fun, args=(arg,), callback= new_callback))
 
-    while not threads.ready():
-      time.sleep(0.5)
+    for job in jobs:
+      try:
+        job.get()
+      except PickleError as e:
+        raise CommandFailedToExecute(e)
 
-    return threads.result()
+      if not job.successful():
+        raise CommandFailedToExecute(job._err_callback)
 
+    logger.debug("Closing Pools")
+    pool.close()
+    pool.join()
+    jobs = None
+    logger.debug("Ending ComparisonPool.map")
 
-pool = ComparisonPool()

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/reproducible/diffoscope.git


More information about the diffoscope mailing list