diff --git a/flake8/checker.py b/flake8/checker.py index 33bbf79..0d6a386 100644 --- a/flake8/checker.py +++ b/flake8/checker.py @@ -65,6 +65,18 @@ class Manager(object): self.process_queue = multiprocessing.Queue() self.results_queue = multiprocessing.Queue() + @staticmethod + def _cleanup_queue(q): + while not q.empty(): + q.get_nowait() + + def _force_cleanup(self): + if self.using_multiprocessing: + for proc in self.processes: + proc.join(0.2) + self._cleanup_queue(self.process_queue) + self._cleanup_queue(self.results_queue) + def _job_count(self): # type: () -> Union[int, NoneType] # First we walk through all of our error cases: @@ -116,10 +128,29 @@ class Manager(object): # it to an integer return int(jobs) + def _results(self): + seen_done = 0 + while True: + LOG.info('Retrieving results') + result = self.results_queue.get() + if result == 'DONE': + seen_done += 1 + if seen_done >= self.jobs: + break + continue + + yield result + def _report_after_parallel(self): style_guide = self.style_guide - for (filename, results) in iter(self.results_queue.get, 'DONE'): - results = sorted(results, key=lambda tup: (tup[2], tup[3])) + final_results = {} + for (filename, results) in self._results(): + final_results[filename] = results + + for checker in self.checkers: + filename = checker.filename + results = sorted(final_results[filename], + key=lambda tup: (tup[1], tup[2])) for (error_code, line_number, column, text) in results: style_guide.handle_error( code=error_code, @@ -148,6 +179,7 @@ class Manager(object): for checker in iter(self.process_queue.get, 'DONE'): LOG.debug('Running checker for file "%s"', checker.filename) checker.run_checks(self.results_queue) + self.results_queue.put('DONE') def is_path_excluded(self, path): # type: (str) -> bool @@ -195,9 +227,7 @@ class Manager(object): This iterates over each of the checkers and reports the errors sorted by line number. """ - if self.using_multiprocessing: - self._report_after_parallel() - else: + if not self.using_multiprocessing: self._report_after_serial() def run(self): @@ -215,9 +245,13 @@ class Manager(object): proc.daemon = True proc.start() self.processes.append(proc) + proc = multiprocessing.Process(target=self._report_after_parallel) + proc.start() + LOG.info('Started process to report errors') + self.processes.append(proc) else: for checker in self.checkers: - checker.run_checks() + checker.run_checks(self.results_queue) def start(self): """Start checking files.""" @@ -235,11 +269,8 @@ class Manager(object): for i in range(self.jobs): self.process_queue.put('DONE') - LOG.info('Joining process workers') - for process in self.processes: - process.join() - LOG.info('Processes joined') - self.results_queue.put('DONE') + for proc in self.processes: + proc.join() class FileChecker(object): @@ -371,7 +402,7 @@ class FileChecker(object): exc.error_message) if results_queue is not None: - results_queue.put_nowait((self.filename, self.results)) + results_queue.put((self.filename, self.results)) def handle_comment(self, token, token_text): """Handle the logic when encountering a comment token.""" diff --git a/flake8/main/cli.py b/flake8/main/cli.py index ecbdde9..5bf6144 100644 --- a/flake8/main/cli.py +++ b/flake8/main/cli.py @@ -264,11 +264,13 @@ class Application(object): """Run the actual checks with the FileChecker Manager.""" self.file_checker_manager.start() self.file_checker_manager.run() + LOG.info('Finished running') self.file_checker_manager.stop() def report_errors(self): # type: () -> NoneType """Report all the errors found by flake8 3.0.""" + LOG.info('Reporting errors') self.file_checker_manager.report() def _run(self, argv):