Restructure Checker Manager for serial retries

This commit is contained in:
Ian Cordasco 2016-04-02 11:30:12 -05:00
parent 81eb3e41cc
commit 09ad1d850e

View file

@ -1,4 +1,5 @@
"""Checker Manager and Checker classes."""
import errno
import logging
import os
import sys
@ -15,6 +16,21 @@ from flake8 import utils
LOG = logging.getLogger(__name__)
SERIAL_RETRY_ERRNOS = set([
# ENOSPC: Added by sigmavirus24
# > On some operating systems (OSX), multiprocessing may cause an
# > ENOSPC error while trying to trying to create a Semaphore.
# > In those cases, we should replace the customized Queue Report
# > class with pep8's StandardReport class to ensure users don't run
# > into this problem.
# > (See also: https://gitlab.com/pycqa/flake8/issues/74)
errno.ENOSPC,
# NOTE(sigmavirus24): When adding to this list, include the reasoning
# on the lines before the error code and always append your error
# code. Further, please always add a trailing `,` to reduce the visual
# noise in diffs.
])
class Manager(object):
"""Manage the parallelism and checker instances for each plugin and file.
@ -62,8 +78,6 @@ class Manager(object):
if self.jobs > 1:
self.using_multiprocessing = True
self.process_queue = multiprocessing.Queue()
self.results_queue = multiprocessing.Queue()
@staticmethod
def _cleanup_queue(q):
@ -232,28 +246,52 @@ class Manager(object):
if not self.using_multiprocessing:
self._report_after_serial()
def run_parallel(self):
"""Run the checkers in parallel."""
# NOTE(sigmavirus24): Initialize Queues here to handle serial retries
# in one place.
self.process_queue = multiprocessing.Queue()
self.results_queue = multiprocessing.Queue()
LOG.info('Starting %d process workers', self.jobs - 1)
for i in range(self.jobs - 1):
proc = multiprocessing.Process(
target=self._run_checks_from_queue
)
proc.daemon = True
proc.start()
self.processes.append(proc)
proc = multiprocessing.Process(target=self._report_after_parallel)
proc.start()
LOG.info('Started process to report errors')
self.processes.append(proc)
def run_serial(self):
"""Run the checkers in serial."""
for checker in self.checkers:
checker.run_checks(self.results_queue)
def run(self):
"""Run all the checkers.
This handles starting the process workers or just simply running all
of the checks in serial.
This will intelligently decide whether to run the checks in parallel
or whether to run them in serial.
If running the checks in parallel causes a problem (e.g.,
https://gitlab.com/pycqa/flake8/issues/74) this also implements
fallback to serial processing.
"""
if self.using_multiprocessing:
LOG.info('Starting %d process workers', self.jobs)
for i in range(self.jobs):
proc = multiprocessing.Process(
target=self._run_checks_from_queue
)
proc.daemon = True
proc.start()
self.processes.append(proc)
proc = multiprocessing.Process(target=self._report_after_parallel)
proc.start()
LOG.info('Started process to report errors')
self.processes.append(proc)
else:
for checker in self.checkers:
checker.run_checks(self.results_queue)
try:
if self.using_multiprocessing:
self.run_parallel()
else:
self.run_serial()
except OSError as oserr:
if oserr.errno not in SERIAL_RETRY_ERRNOS:
LOG.exception(oserr)
raise
LOG.warning('Running in serial after OS exception, %r', oserr)
self.run_serial()
def start(self):
"""Start checking files."""