Fix multiprocessing work with retries

This commit is contained in:
Ian Cordasco 2016-04-02 14:02:43 -05:00
parent 09ad1d850e
commit e3707bbe08

View file

@ -72,12 +72,17 @@ class Manager(object):
self.jobs = self._job_count() self.jobs = self._job_count()
self.process_queue = None self.process_queue = None
self.results_queue = None self.results_queue = None
self.using_multiprocessing = False self.using_multiprocessing = self.jobs > 1
self.processes = [] self.processes = []
self.checkers = [] self.checkers = []
if self.jobs > 1: try:
self.using_multiprocessing = True self.process_queue = multiprocessing.Queue()
self.results_queue = multiprocessing.Queue()
except OSError as oserr:
if oserr.errno not in SERIAL_RETRY_ERRNOS:
raise
self.using_multiprocessing = False
@staticmethod @staticmethod
def _cleanup_queue(q): def _cleanup_queue(q):
@ -144,8 +149,8 @@ class Manager(object):
def _results(self): def _results(self):
seen_done = 0 seen_done = 0
LOG.info('Retrieving results')
while True: while True:
LOG.info('Retrieving results')
result = self.results_queue.get() result = self.results_queue.get()
if result == 'DONE': if result == 'DONE':
seen_done += 1 seen_done += 1
@ -248,19 +253,15 @@ class Manager(object):
def run_parallel(self): def run_parallel(self):
"""Run the checkers in parallel.""" """Run the checkers in parallel."""
# NOTE(sigmavirus24): Initialize Queues here to handle serial retries
# in one place.
self.process_queue = multiprocessing.Queue()
self.results_queue = multiprocessing.Queue()
LOG.info('Starting %d process workers', self.jobs - 1) LOG.info('Starting %d process workers', self.jobs - 1)
for i in range(self.jobs - 1): for i in range(self.jobs):
proc = multiprocessing.Process( proc = multiprocessing.Process(
target=self._run_checks_from_queue target=self._run_checks_from_queue
) )
proc.daemon = True proc.daemon = True
proc.start() proc.start()
self.processes.append(proc) self.processes.append(proc)
proc = multiprocessing.Process(target=self._report_after_parallel) proc = multiprocessing.Process(target=self._report_after_parallel)
proc.start() proc.start()
LOG.info('Started process to report errors') LOG.info('Started process to report errors')
@ -310,6 +311,7 @@ class Manager(object):
self.process_queue.put('DONE') self.process_queue.put('DONE')
for proc in self.processes: for proc in self.processes:
LOG.info('Joining %s to the main process', proc.name)
proc.join() proc.join()