Merge branch 'master' into 'master'

Move all uses of `pool` inside `run_parallel()`.

Closes #410

See merge request pycqa/flake8!228
This commit is contained in:
Ian Stapleton Cordasco 2018-04-15 19:31:31 +00:00
commit d0db8497b9
3 changed files with 38 additions and 34 deletions

View file

@ -74,7 +74,6 @@ class Manager(object):
self.checks = checker_plugins
self.jobs = self._job_count()
self.using_multiprocessing = self.jobs > 1
self.pool = None
self.processes = []
self.checkers = []
self.statistics = {
@ -84,14 +83,6 @@ class Manager(object):
'tokens': 0,
}
if self.using_multiprocessing:
try:
self.pool = multiprocessing.Pool(self.jobs, _pool_init)
except OSError as oserr:
if oserr.errno not in SERIAL_RETRY_ERRNOS:
raise
self.using_multiprocessing = False
def _process_statistics(self):
for checker in self.checkers:
for statistic in defaults.STATISTIC_NAMES:
@ -268,30 +259,40 @@ class Manager(object):
results_found += len(results)
return (results_found, results_reported)
def _force_cleanup(self):
if self.pool is not None:
self.pool.terminate()
self.pool.join()
def run_parallel(self):
"""Run the checkers in parallel."""
final_results = collections.defaultdict(list)
final_statistics = collections.defaultdict(dict)
pool_map = self.pool.imap_unordered(
_run_checks,
self.checkers,
chunksize=calculate_pool_chunksize(
len(self.checkers),
self.jobs,
),
)
for ret in pool_map:
filename, results, statistics = ret
final_results[filename] = results
final_statistics[filename] = statistics
self.pool.close()
self.pool.join()
self.pool = None
try:
pool = multiprocessing.Pool(self.jobs, _pool_init)
except OSError as oserr:
if oserr.errno not in SERIAL_RETRY_ERRNOS:
raise
self.using_multiprocessing = False
self.run_serial()
return
try:
pool_map = pool.imap_unordered(
_run_checks,
self.checkers,
chunksize=calculate_pool_chunksize(
len(self.checkers),
self.jobs,
),
)
for ret in pool_map:
filename, results, statistics = ret
final_results[filename] = results
final_statistics[filename] = statistics
pool.close()
pool.join()
pool = None
finally:
if pool is not None:
pool.terminate()
pool.join()
for checker in self.checkers:
filename = checker.display_name
@ -328,8 +329,6 @@ class Manager(object):
except KeyboardInterrupt:
LOG.warning('Flake8 was interrupted by the user')
raise exceptions.EarlyQuit('Early quit while running checks')
finally:
self._force_cleanup()
def start(self, paths=None):
"""Start checking files.

View file

@ -405,7 +405,6 @@ class Application(object):
print('... stopped')
LOG.critical('Caught keyboard interrupt from user')
LOG.exception(exc)
self.file_checker_manager._force_cleanup()
self.catastrophic_failure = True
except exceptions.ExecutionError as exc:
print('There was a critical error during execution of Flake8:')

View file

@ -22,17 +22,23 @@ def test_oserrors_cause_serial_fall_back():
style_guide = style_guide_mock()
with mock.patch('_multiprocessing.SemLock', side_effect=err):
manager = checker.Manager(style_guide, [], [])
with mock.patch.object(manager, 'run_serial') as serial:
manager.run()
assert serial.call_count == 1
assert manager.using_multiprocessing is False
@mock.patch('flake8.utils.is_windows', return_value=False)
def test_oserrors_are_reraised(is_windows):
"""Verify that OSErrors will cause the Manager to fallback to serial."""
"""Verify that unexpected OSErrors will cause the Manager to reraise."""
err = OSError(errno.EAGAIN, 'Ominous message')
style_guide = style_guide_mock()
with mock.patch('_multiprocessing.SemLock', side_effect=err):
with pytest.raises(OSError):
checker.Manager(style_guide, [], [])
manager = checker.Manager(style_guide, [], [])
with mock.patch.object(manager, 'run_serial') as serial:
manager.run()
assert serial.call_count == 0
def test_multiprocessing_is_disabled():