From a5573fc8643de1df2bfe7429c81fcb0de42bd784 Mon Sep 17 00:00:00 2001 From: Craig Silverstein Date: Fri, 2 Mar 2018 20:49:31 -0800 Subject: [PATCH 1/2] Move all uses of `pool` inside `run_parallel()`. This includes creating the pool, tearing it down under normal use, and tearing it down in case of exception. Doing this makes it harder to leak processes, as for instance was happening in #410. Fixes #410 --- src/flake8/checker.py | 61 +++++++++++++++--------------- src/flake8/main/application.py | 1 - tests/unit/test_checker_manager.py | 6 ++- 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/src/flake8/checker.py b/src/flake8/checker.py index 7a18ce5..97c5ea1 100644 --- a/src/flake8/checker.py +++ b/src/flake8/checker.py @@ -74,7 +74,6 @@ class Manager(object): self.checks = checker_plugins self.jobs = self._job_count() self.using_multiprocessing = self.jobs > 1 - self.pool = None self.processes = [] self.checkers = [] self.statistics = { @@ -84,14 +83,6 @@ class Manager(object): 'tokens': 0, } - if self.using_multiprocessing: - try: - self.pool = multiprocessing.Pool(self.jobs, _pool_init) - except OSError as oserr: - if oserr.errno not in SERIAL_RETRY_ERRNOS: - raise - self.using_multiprocessing = False - def _process_statistics(self): for checker in self.checkers: for statistic in defaults.STATISTIC_NAMES: @@ -268,30 +259,40 @@ class Manager(object): results_found += len(results) return (results_found, results_reported) - def _force_cleanup(self): - if self.pool is not None: - self.pool.terminate() - self.pool.join() - def run_parallel(self): """Run the checkers in parallel.""" final_results = collections.defaultdict(list) final_statistics = collections.defaultdict(dict) - pool_map = self.pool.imap_unordered( - _run_checks, - self.checkers, - chunksize=calculate_pool_chunksize( - len(self.checkers), - self.jobs, - ), - ) - for ret in pool_map: - filename, results, statistics = ret - final_results[filename] = results - final_statistics[filename] = statistics - self.pool.close() - self.pool.join() - self.pool = None + + try: + pool = multiprocessing.Pool(self.jobs, _pool_init) + except OSError as oserr: + if oserr.errno not in SERIAL_RETRY_ERRNOS: + raise + self.using_multiprocessing = False + self.run_serial() + return + + try: + pool_map = pool.imap_unordered( + _run_checks, + self.checkers, + chunksize=calculate_pool_chunksize( + len(self.checkers), + self.jobs, + ), + ) + for ret in pool_map: + filename, results, statistics = ret + final_results[filename] = results + final_statistics[filename] = statistics + pool.close() + pool.join() + pool = None + finally: + if pool is not None: + pool.terminate() + pool.join() for checker in self.checkers: filename = checker.display_name @@ -328,8 +329,6 @@ class Manager(object): except KeyboardInterrupt: LOG.warning('Flake8 was interrupted by the user') raise exceptions.EarlyQuit('Early quit while running checks') - finally: - self._force_cleanup() def start(self, paths=None): """Start checking files. diff --git a/src/flake8/main/application.py b/src/flake8/main/application.py index aed6175..9c15629 100644 --- a/src/flake8/main/application.py +++ b/src/flake8/main/application.py @@ -405,7 +405,6 @@ class Application(object): print('... stopped') LOG.critical('Caught keyboard interrupt from user') LOG.exception(exc) - self.file_checker_manager._force_cleanup() self.catastrophic_failure = True except exceptions.ExecutionError as exc: print('There was a critical error during execution of Flake8:') diff --git a/tests/unit/test_checker_manager.py b/tests/unit/test_checker_manager.py index 82fa3f8..e3d2379 100644 --- a/tests/unit/test_checker_manager.py +++ b/tests/unit/test_checker_manager.py @@ -22,17 +22,19 @@ def test_oserrors_cause_serial_fall_back(): style_guide = style_guide_mock() with mock.patch('_multiprocessing.SemLock', side_effect=err): manager = checker.Manager(style_guide, [], []) + manager.run() assert manager.using_multiprocessing is False @mock.patch('flake8.utils.is_windows', return_value=False) def test_oserrors_are_reraised(is_windows): - """Verify that OSErrors will cause the Manager to fallback to serial.""" + """Verify that unexpected OSErrors will cause the Manager to reraise.""" err = OSError(errno.EAGAIN, 'Ominous message') style_guide = style_guide_mock() with mock.patch('_multiprocessing.SemLock', side_effect=err): with pytest.raises(OSError): - checker.Manager(style_guide, [], []) + manager = checker.Manager(style_guide, [], []) + manager.run() def test_multiprocessing_is_disabled(): From c16c0c904150c924f3920eb67ae7d242f9775277 Mon Sep 17 00:00:00 2001 From: Craig Silverstein Date: Fri, 2 Mar 2018 21:01:47 -0800 Subject: [PATCH 2/2] Do better testing that we actually call run_serial(). --- tests/unit/test_checker_manager.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_checker_manager.py b/tests/unit/test_checker_manager.py index e3d2379..02397f0 100644 --- a/tests/unit/test_checker_manager.py +++ b/tests/unit/test_checker_manager.py @@ -22,7 +22,9 @@ def test_oserrors_cause_serial_fall_back(): style_guide = style_guide_mock() with mock.patch('_multiprocessing.SemLock', side_effect=err): manager = checker.Manager(style_guide, [], []) - manager.run() + with mock.patch.object(manager, 'run_serial') as serial: + manager.run() + assert serial.call_count == 1 assert manager.using_multiprocessing is False @@ -34,7 +36,9 @@ def test_oserrors_are_reraised(is_windows): with mock.patch('_multiprocessing.SemLock', side_effect=err): with pytest.raises(OSError): manager = checker.Manager(style_guide, [], []) - manager.run() + with mock.patch.object(manager, 'run_serial') as serial: + manager.run() + assert serial.call_count == 0 def test_multiprocessing_is_disabled():