diff --git a/src/flake8/checker.py b/src/flake8/checker.py index d993cb9..ea8b5d3 100644 --- a/src/flake8/checker.py +++ b/src/flake8/checker.py @@ -9,7 +9,7 @@ import tokenize from typing import Dict, List, Optional, Tuple try: - import multiprocessing + import multiprocessing.pool except ImportError: multiprocessing = None # type: ignore @@ -262,18 +262,16 @@ class Manager(object): results_found += len(results) return (results_found, results_reported) - def run_parallel(self): + def run_parallel(self): # type: () -> None """Run the checkers in parallel.""" # fmt: off final_results = collections.defaultdict(list) # type: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] # noqa: E501 - final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, None]] # noqa: E501 + final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, int]] # noqa: E501 # fmt: on - try: - pool = multiprocessing.Pool(self.jobs, _pool_init) - except OSError as oserr: - if oserr.errno not in SERIAL_RETRY_ERRNOS: - raise + pool = _try_initialize_processpool(self.jobs) + + if pool is None: self.run_serial() return @@ -303,12 +301,12 @@ class Manager(object): checker.results = final_results[filename] checker.statistics = final_statistics[filename] - def run_serial(self): + def run_serial(self): # type: () -> None """Run the checkers in serial.""" for checker in self.checkers: checker.run_checks() - def run(self): + def run(self): # type: () -> None """Run all the checkers. This will intelligently decide whether to run the checks in parallel @@ -634,11 +632,25 @@ class FileChecker(object): self.run_physical_checks(line + "\n") -def _pool_init(): +def _pool_init(): # type: () -> None """Ensure correct signaling of ^C using multiprocessing.Pool.""" signal.signal(signal.SIGINT, signal.SIG_IGN) +def _try_initialize_processpool(job_count): + # type: (int) -> Optional[multiprocessing.pool.Pool] + """Return a new process pool instance if we are able to create one.""" + try: + return multiprocessing.Pool(job_count, _pool_init) + except OSError as err: + if err.errno not in SERIAL_RETRY_ERRNOS: + raise + except ImportError: + pass + + return None + + def calculate_pool_chunksize(num_checkers, num_jobs): """Determine the chunksize for the multiprocessing Pool. diff --git a/tests/integration/test_checker.py b/tests/integration/test_checker.py index 0acdb6e..836b543 100644 --- a/tests/integration/test_checker.py +++ b/tests/integration/test_checker.py @@ -246,3 +246,38 @@ def test_report_order(results, expected_order): with mock.patch.object(manager, '_handle_results', handler): assert manager.report() == (len(results), len(results)) handler.assert_called_once_with('placeholder', expected_results) + + +def test_acquire_when_multiprocessing_pool_can_initialize(): + """Verify successful importing of hardware semaphore support. + + Mock the behaviour of a platform that has a hardware sem_open + implementation, and then attempt to initialize a multiprocessing + Pool object. + + This simulates the behaviour on most common platforms. + """ + with mock.patch("multiprocessing.Pool") as pool: + result = checker._try_initialize_processpool(2) + + pool.assert_called_once_with(2, checker._pool_init) + assert result is pool.return_value + + +def test_acquire_when_multiprocessing_pool_can_not_initialize(): + """Verify unsuccessful importing of hardware semaphore support. + + Mock the behaviour of a platform that has not got a hardware sem_open + implementation, and then attempt to initialize a multiprocessing + Pool object. + + This scenario will occur on platforms such as Termux and on some + more exotic devices. + + https://github.com/python/cpython/blob/4e02981de0952f54bf87967f8e10d169d6946b40/Lib/multiprocessing/synchronize.py#L30-L33 + """ + with mock.patch("multiprocessing.Pool", side_effect=ImportError) as pool: + result = checker._try_initialize_processpool(2) + + pool.assert_called_once_with(2, checker._pool_init) + assert result is None