Move all uses of pool inside run_parallel().

This includes creating the pool, tearing it down under normal use, and
tearing it down in case of exception.

Doing this makes it harder to leak processes, as for instance was
happening in #410.

Fixes #410
This commit is contained in:
Craig Silverstein 2018-03-02 20:49:31 -08:00
parent f834499726
commit a5573fc864
3 changed files with 34 additions and 34 deletions

View file

@ -74,7 +74,6 @@ class Manager(object):
self.checks = checker_plugins
self.jobs = self._job_count()
self.using_multiprocessing = self.jobs > 1
self.pool = None
self.processes = []
self.checkers = []
self.statistics = {
@ -84,14 +83,6 @@ class Manager(object):
'tokens': 0,
}
if self.using_multiprocessing:
try:
self.pool = multiprocessing.Pool(self.jobs, _pool_init)
except OSError as oserr:
if oserr.errno not in SERIAL_RETRY_ERRNOS:
raise
self.using_multiprocessing = False
def _process_statistics(self):
for checker in self.checkers:
for statistic in defaults.STATISTIC_NAMES:
@ -268,30 +259,40 @@ class Manager(object):
results_found += len(results)
return (results_found, results_reported)
def _force_cleanup(self):
if self.pool is not None:
self.pool.terminate()
self.pool.join()
def run_parallel(self):
"""Run the checkers in parallel."""
final_results = collections.defaultdict(list)
final_statistics = collections.defaultdict(dict)
pool_map = self.pool.imap_unordered(
_run_checks,
self.checkers,
chunksize=calculate_pool_chunksize(
len(self.checkers),
self.jobs,
),
)
for ret in pool_map:
filename, results, statistics = ret
final_results[filename] = results
final_statistics[filename] = statistics
self.pool.close()
self.pool.join()
self.pool = None
try:
pool = multiprocessing.Pool(self.jobs, _pool_init)
except OSError as oserr:
if oserr.errno not in SERIAL_RETRY_ERRNOS:
raise
self.using_multiprocessing = False
self.run_serial()
return
try:
pool_map = pool.imap_unordered(
_run_checks,
self.checkers,
chunksize=calculate_pool_chunksize(
len(self.checkers),
self.jobs,
),
)
for ret in pool_map:
filename, results, statistics = ret
final_results[filename] = results
final_statistics[filename] = statistics
pool.close()
pool.join()
pool = None
finally:
if pool is not None:
pool.terminate()
pool.join()
for checker in self.checkers:
filename = checker.display_name
@ -328,8 +329,6 @@ class Manager(object):
except KeyboardInterrupt:
LOG.warning('Flake8 was interrupted by the user')
raise exceptions.EarlyQuit('Early quit while running checks')
finally:
self._force_cleanup()
def start(self, paths=None):
"""Start checking files.

View file

@ -405,7 +405,6 @@ class Application(object):
print('... stopped')
LOG.critical('Caught keyboard interrupt from user')
LOG.exception(exc)
self.file_checker_manager._force_cleanup()
self.catastrophic_failure = True
except exceptions.ExecutionError as exc:
print('There was a critical error during execution of Flake8:')

View file

@ -22,17 +22,19 @@ def test_oserrors_cause_serial_fall_back():
style_guide = style_guide_mock()
with mock.patch('_multiprocessing.SemLock', side_effect=err):
manager = checker.Manager(style_guide, [], [])
manager.run()
assert manager.using_multiprocessing is False
@mock.patch('flake8.utils.is_windows', return_value=False)
def test_oserrors_are_reraised(is_windows):
"""Verify that OSErrors will cause the Manager to fallback to serial."""
"""Verify that unexpected OSErrors will cause the Manager to reraise."""
err = OSError(errno.EAGAIN, 'Ominous message')
style_guide = style_guide_mock()
with mock.patch('_multiprocessing.SemLock', side_effect=err):
with pytest.raises(OSError):
checker.Manager(style_guide, [], [])
manager = checker.Manager(style_guide, [], [])
manager.run()
def test_multiprocessing_is_disabled():