Simplify result handling

If we start collecting the results in run_parallel *immediately* after
starting the worker processes, we do not need another process to handle
the results. This also allows us to store all of the results on a the
FileChecker class and process results uniformly. This also means we can
count the number of errors and warnings in a run and use that to exit
appropriately (using SystemExit).
This commit is contained in:
Ian Cordasco 2016-05-07 19:28:54 -05:00
parent e32476b671
commit d8635bb92a
2 changed files with 26 additions and 27 deletions

View file

@ -173,22 +173,6 @@ class Manager(object):
physical_line=physical_line,
)
def _report_after_parallel(self):
final_results = {}
for (filename, results) in self._results():
final_results[filename] = results
for checker in self.checkers:
filename = checker.filename
results = sorted(final_results.get(filename, []),
key=lambda tup: (tup[1], tup[2]))
self._handle_results(filename, results)
def _report_after_serial(self):
for checker in self.checkers:
results = sorted(checker.results, key=lambda tup: (tup[2], tup[3]))
self._handle_results(checker.filename, results)
def _run_checks_from_queue(self):
LOG.info('Running checks in parallel')
for checker in iter(self.process_queue.get, 'DONE'):
@ -242,12 +226,16 @@ class Manager(object):
This iterates over each of the checkers and reports the errors sorted
by line number.
"""
if not self.using_multiprocessing:
self._report_after_serial()
results_found = 0
for checker in self.checkers:
results = sorted(checker.results, key=lambda tup: (tup[2], tup[3]))
self._handle_results(checker.filename, results)
results_found += len(results)
return results_found
def run_parallel(self):
"""Run the checkers in parallel."""
LOG.info('Starting %d process workers', self.jobs - 1)
LOG.info('Starting %d process workers', self.jobs)
for i in range(self.jobs):
proc = multiprocessing.Process(
target=self._run_checks_from_queue
@ -256,10 +244,14 @@ class Manager(object):
proc.start()
self.processes.append(proc)
proc = multiprocessing.Process(target=self._report_after_parallel)
proc.start()
LOG.info('Started process to report errors')
self.processes.append(proc)
final_results = {}
for (filename, results) in self._results():
final_results[filename] = results
for checker in self.checkers:
filename = checker.filename
checker.results = sorted(final_results.get(filename, []),
key=lambda tup: (tup[1], tup[2]))
def run_serial(self):
"""Run the checkers in serial."""
@ -299,11 +291,11 @@ class Manager(object):
for checker in self.checkers:
self.process_queue.put(checker)
def stop(self):
"""Stop checking files."""
for i in range(self.jobs):
self.process_queue.put('DONE')
def stop(self):
"""Stop checking files."""
for proc in self.processes:
LOG.info('Joining %s to the main process', proc.name)
proc.join()

View file

@ -195,6 +195,7 @@ class Application(object):
self.options = None
self.args = None
self.result_count = 0
def find_plugins(self):
# type: () -> NoneType
@ -274,7 +275,7 @@ class Application(object):
# type: () -> NoneType
"""Report all the errors found by flake8 3.0."""
LOG.info('Reporting errors')
self.file_checker_manager.report()
self.result_count = self.file_checker_manager.report()
def _run(self, argv):
self.find_plugins()
@ -289,7 +290,12 @@ class Application(object):
def run(self, argv=None):
# type: (Union[NoneType, List[str]]) -> NoneType
"""Run our application."""
"""Run our application.
This method will also handle KeyboardInterrupt exceptions for the
entirety of the flake8 application. If it sees a KeyboardInterrupt it
will forcibly clean up the :class:`~flake8.checker.Manager`.
"""
try:
self._run(argv)
except KeyboardInterrupt as exc:
@ -303,3 +309,4 @@ def main(argv=None):
"""Main entry-point for the flake8 command-line tool."""
app = Application()
app.run(argv)
raise SystemExit(app.result_count > 0)