diff --git a/flake8/checker.py b/flake8/checker.py index becde97..8a0603f 100644 --- a/flake8/checker.py +++ b/flake8/checker.py @@ -1,4 +1,5 @@ """Checker Manager and Checker classes.""" +import errno import logging import os import sys @@ -15,6 +16,21 @@ from flake8 import utils LOG = logging.getLogger(__name__) +SERIAL_RETRY_ERRNOS = set([ + # ENOSPC: Added by sigmavirus24 + # > On some operating systems (OSX), multiprocessing may cause an + # > ENOSPC error while trying to trying to create a Semaphore. + # > In those cases, we should replace the customized Queue Report + # > class with pep8's StandardReport class to ensure users don't run + # > into this problem. + # > (See also: https://gitlab.com/pycqa/flake8/issues/74) + errno.ENOSPC, + # NOTE(sigmavirus24): When adding to this list, include the reasoning + # on the lines before the error code and always append your error + # code. Further, please always add a trailing `,` to reduce the visual + # noise in diffs. +]) + class Manager(object): """Manage the parallelism and checker instances for each plugin and file. @@ -62,8 +78,6 @@ class Manager(object): if self.jobs > 1: self.using_multiprocessing = True - self.process_queue = multiprocessing.Queue() - self.results_queue = multiprocessing.Queue() @staticmethod def _cleanup_queue(q): @@ -232,28 +246,52 @@ class Manager(object): if not self.using_multiprocessing: self._report_after_serial() + def run_parallel(self): + """Run the checkers in parallel.""" + # NOTE(sigmavirus24): Initialize Queues here to handle serial retries + # in one place. + self.process_queue = multiprocessing.Queue() + self.results_queue = multiprocessing.Queue() + + LOG.info('Starting %d process workers', self.jobs - 1) + for i in range(self.jobs - 1): + proc = multiprocessing.Process( + target=self._run_checks_from_queue + ) + proc.daemon = True + proc.start() + self.processes.append(proc) + proc = multiprocessing.Process(target=self._report_after_parallel) + proc.start() + LOG.info('Started process to report errors') + self.processes.append(proc) + + def run_serial(self): + """Run the checkers in serial.""" + for checker in self.checkers: + checker.run_checks(self.results_queue) + def run(self): """Run all the checkers. - This handles starting the process workers or just simply running all - of the checks in serial. + This will intelligently decide whether to run the checks in parallel + or whether to run them in serial. + + If running the checks in parallel causes a problem (e.g., + https://gitlab.com/pycqa/flake8/issues/74) this also implements + fallback to serial processing. """ - if self.using_multiprocessing: - LOG.info('Starting %d process workers', self.jobs) - for i in range(self.jobs): - proc = multiprocessing.Process( - target=self._run_checks_from_queue - ) - proc.daemon = True - proc.start() - self.processes.append(proc) - proc = multiprocessing.Process(target=self._report_after_parallel) - proc.start() - LOG.info('Started process to report errors') - self.processes.append(proc) - else: - for checker in self.checkers: - checker.run_checks(self.results_queue) + try: + if self.using_multiprocessing: + self.run_parallel() + else: + self.run_serial() + except OSError as oserr: + if oserr.errno not in SERIAL_RETRY_ERRNOS: + LOG.exception(oserr) + raise + LOG.warning('Running in serial after OS exception, %r', oserr) + self.run_serial() def start(self): """Start checking files."""