[diffoscope] 06/10: parallel: comparison_pool: reworks map function
Juliana Oliveira
jwnx-guest at moszumanska.debian.org
Sun Jan 14 21:04:35 CET 2018
This is an automated email from the git hooks/post-receive script.
jwnx-guest pushed a commit to branch jwnx_parallel_diffoscope
in repository diffoscope.
commit c28f09815b55a766418f0b335834189c54de6cec
Author: Juliana Oliveira <juliana.orod at gmail.com>
Date: Sun Jan 14 16:51:55 2018 -0200
parallel: comparison_pool: reworks map function
ComparisonPool.map function aims to simulate multiprocessing.map
function while better handling serialization and asyncronously
processing. Async processing is prefered in this case because it unlocks
file descriptors on-the-go, avoiding recursion errors.
Each process is spawned by a apply_async function and added to a
callback dictionary, where it can be orderly retrieved later. If a function
fails to execute, a CommandFailedToExecute exception is raised.
Signed-off-by: Juliana Oliveira <juliana.orod at gmail.com>
---
diffoscope/parallel/comparison_pool.py | 45 +++++++++++++++++++++++++---------
1 file changed, 33 insertions(+), 12 deletions(-)
diff --git a/diffoscope/parallel/comparison_pool.py b/diffoscope/parallel/comparison_pool.py
index 43c7594..7d72a0a 100644
--- a/diffoscope/parallel/comparison_pool.py
+++ b/diffoscope/parallel/comparison_pool.py
@@ -1,36 +1,57 @@
import logging
import dill
-from pathos.multiprocessing import ProcessingPool as Pool
+from multiprocess import Pool
from diffoscope.config import Config
-from diffoscope.parallel.thread_manager import ThreadManager
+from functools import partial
+from pickle import PickleError
logger = logging.getLogger(__name__)
+class CommandFailedToExecute(Exception):
+ def __init__(self, err):
+ self.err = err
+ def __str__(self):
+ return repr(self.err)
+
+
class ComparisonPool(object):
def __init__(self):
self._pool_size = Config().jobs
- self._pool = Pool(nodes=self._pool_size)
+ self._pool = Pool(self._pool_size)
logger.debug("ComparisonPool initialized. Pool size: %d", self._pool_size)
- def map(self, fun, args=[]):
+ def map(self, fun, args=[], callback=None):
logger.debug("Invoking parallel map for function %s", fun)
- threads = ThreadManager()
+ pool = self._pool
+ jobs = []
+
+ def _callback(result, index):
+ callback[index] = result
- for arg in args:
- threads.new(fun, arg)
- threads.join()
+ for index, arg in enumerate(args):
+ logger.debug("Adding new process for %s (%d: %s)", fun, index, arg)
+ new_callback = partial(_callback, index=index)
+ jobs.append(pool.apply_async(fun, args=(arg,), callback= new_callback))
- while not threads.ready():
- time.sleep(0.5)
+ for job in jobs:
+ try:
+ job.get()
+ except PickleError as e:
+ raise CommandFailedToExecute(e)
- return threads.result()
+ if not job.successful():
+ raise CommandFailedToExecute(job._err_callback)
+ logger.debug("Closing Pools")
+ pool.close()
+ pool.join()
+ jobs = None
+ logger.debug("Ending ComparisonPool.map")
-pool = ComparisonPool()
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/reproducible/diffoscope.git
More information about the diffoscope
mailing list