Wrap up multiprocessing work

This commit is contained in:
Ian Cordasco 2016-03-15 12:18:45 -05:00
parent c0659d1a8c
commit 19062c5e9c
2 changed files with 45 additions and 12 deletions

View file

@ -65,6 +65,18 @@ class Manager(object):
self.process_queue = multiprocessing.Queue() self.process_queue = multiprocessing.Queue()
self.results_queue = multiprocessing.Queue() self.results_queue = multiprocessing.Queue()
@staticmethod
def _cleanup_queue(q):
while not q.empty():
q.get_nowait()
def _force_cleanup(self):
if self.using_multiprocessing:
for proc in self.processes:
proc.join(0.2)
self._cleanup_queue(self.process_queue)
self._cleanup_queue(self.results_queue)
def _job_count(self): def _job_count(self):
# type: () -> Union[int, NoneType] # type: () -> Union[int, NoneType]
# First we walk through all of our error cases: # First we walk through all of our error cases:
@ -116,10 +128,29 @@ class Manager(object):
# it to an integer # it to an integer
return int(jobs) return int(jobs)
def _results(self):
seen_done = 0
while True:
LOG.info('Retrieving results')
result = self.results_queue.get()
if result == 'DONE':
seen_done += 1
if seen_done >= self.jobs:
break
continue
yield result
def _report_after_parallel(self): def _report_after_parallel(self):
style_guide = self.style_guide style_guide = self.style_guide
for (filename, results) in iter(self.results_queue.get, 'DONE'): final_results = {}
results = sorted(results, key=lambda tup: (tup[2], tup[3])) for (filename, results) in self._results():
final_results[filename] = results
for checker in self.checkers:
filename = checker.filename
results = sorted(final_results[filename],
key=lambda tup: (tup[1], tup[2]))
for (error_code, line_number, column, text) in results: for (error_code, line_number, column, text) in results:
style_guide.handle_error( style_guide.handle_error(
code=error_code, code=error_code,
@ -148,6 +179,7 @@ class Manager(object):
for checker in iter(self.process_queue.get, 'DONE'): for checker in iter(self.process_queue.get, 'DONE'):
LOG.debug('Running checker for file "%s"', checker.filename) LOG.debug('Running checker for file "%s"', checker.filename)
checker.run_checks(self.results_queue) checker.run_checks(self.results_queue)
self.results_queue.put('DONE')
def is_path_excluded(self, path): def is_path_excluded(self, path):
# type: (str) -> bool # type: (str) -> bool
@ -195,9 +227,7 @@ class Manager(object):
This iterates over each of the checkers and reports the errors sorted This iterates over each of the checkers and reports the errors sorted
by line number. by line number.
""" """
if self.using_multiprocessing: if not self.using_multiprocessing:
self._report_after_parallel()
else:
self._report_after_serial() self._report_after_serial()
def run(self): def run(self):
@ -215,9 +245,13 @@ class Manager(object):
proc.daemon = True proc.daemon = True
proc.start() proc.start()
self.processes.append(proc) self.processes.append(proc)
proc = multiprocessing.Process(target=self._report_after_parallel)
proc.start()
LOG.info('Started process to report errors')
self.processes.append(proc)
else: else:
for checker in self.checkers: for checker in self.checkers:
checker.run_checks() checker.run_checks(self.results_queue)
def start(self): def start(self):
"""Start checking files.""" """Start checking files."""
@ -235,11 +269,8 @@ class Manager(object):
for i in range(self.jobs): for i in range(self.jobs):
self.process_queue.put('DONE') self.process_queue.put('DONE')
LOG.info('Joining process workers') for proc in self.processes:
for process in self.processes: proc.join()
process.join()
LOG.info('Processes joined')
self.results_queue.put('DONE')
class FileChecker(object): class FileChecker(object):
@ -371,7 +402,7 @@ class FileChecker(object):
exc.error_message) exc.error_message)
if results_queue is not None: if results_queue is not None:
results_queue.put_nowait((self.filename, self.results)) results_queue.put((self.filename, self.results))
def handle_comment(self, token, token_text): def handle_comment(self, token, token_text):
"""Handle the logic when encountering a comment token.""" """Handle the logic when encountering a comment token."""

View file

@ -264,11 +264,13 @@ class Application(object):
"""Run the actual checks with the FileChecker Manager.""" """Run the actual checks with the FileChecker Manager."""
self.file_checker_manager.start() self.file_checker_manager.start()
self.file_checker_manager.run() self.file_checker_manager.run()
LOG.info('Finished running')
self.file_checker_manager.stop() self.file_checker_manager.stop()
def report_errors(self): def report_errors(self):
# type: () -> NoneType # type: () -> NoneType
"""Report all the errors found by flake8 3.0.""" """Report all the errors found by flake8 3.0."""
LOG.info('Reporting errors')
self.file_checker_manager.report() self.file_checker_manager.report()
def _run(self, argv): def _run(self, argv):