diff --git a/flake8/checker.py b/flake8/checker.py index 8a0603f..17c01cb 100644 --- a/flake8/checker.py +++ b/flake8/checker.py @@ -72,12 +72,17 @@ class Manager(object): self.jobs = self._job_count() self.process_queue = None self.results_queue = None - self.using_multiprocessing = False + self.using_multiprocessing = self.jobs > 1 self.processes = [] self.checkers = [] - if self.jobs > 1: - self.using_multiprocessing = True + try: + self.process_queue = multiprocessing.Queue() + self.results_queue = multiprocessing.Queue() + except OSError as oserr: + if oserr.errno not in SERIAL_RETRY_ERRNOS: + raise + self.using_multiprocessing = False @staticmethod def _cleanup_queue(q): @@ -144,8 +149,8 @@ class Manager(object): def _results(self): seen_done = 0 + LOG.info('Retrieving results') while True: - LOG.info('Retrieving results') result = self.results_queue.get() if result == 'DONE': seen_done += 1 @@ -248,19 +253,15 @@ class Manager(object): def run_parallel(self): """Run the checkers in parallel.""" - # NOTE(sigmavirus24): Initialize Queues here to handle serial retries - # in one place. - self.process_queue = multiprocessing.Queue() - self.results_queue = multiprocessing.Queue() - LOG.info('Starting %d process workers', self.jobs - 1) - for i in range(self.jobs - 1): + for i in range(self.jobs): proc = multiprocessing.Process( target=self._run_checks_from_queue ) proc.daemon = True proc.start() self.processes.append(proc) + proc = multiprocessing.Process(target=self._report_after_parallel) proc.start() LOG.info('Started process to report errors') @@ -310,6 +311,7 @@ class Manager(object): self.process_queue.put('DONE') for proc in self.processes: + LOG.info('Joining %s to the main process', proc.name) proc.join()